ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
makeDpSpec.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_libdaq
4  * @copyright
5  * (c) Copyright ESO 2022
6  * All Rights Reserved
7  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
8  */
9 #include <daq/makeDpSpec.hpp>
10 
11 #include <fmt/format.h>
12 #include <log4cplus/loggingmacros.h>
13 
14 #include <daq/fits/json.hpp>
15 #include <daq/json/dpSpec.hpp>
16 
17 namespace daq {
18 
19 std::string MakeOcmName(DaqContext const& ctx) {
20  return fmt::format("@{}", ctx.process_name);
21 }
22 
23 /**
24  * Make OCM keywords source.
25  */
26 json::FitsKeywordsSource MakeOcmKeywords(DaqContext const& ctx, log4cplus::Logger& logger) {
28  kws.keywords = ctx.keywords;
29  kws.source_name = MakeOcmName(ctx);
30  return kws;
31 }
32 
33 namespace v1 {
34 
35 json::DpSpec MakeDataProductSpecification(DaqContext const& ctx, log4cplus::Logger& logger) {
36  json::DpSpec dp_spec;
37  dp_spec.id = ctx.id;
38  dp_spec.target.file_id = ctx.file_id;
39  dp_spec.target.file_prefix = ctx.dp_name_prefix;
40 
41  // Add OCM keywords
42  if (!ctx.keywords.empty()) {
43  dp_spec.sources.emplace_back(MakeOcmKeywords(ctx, logger));
44  }
45 
46  // V1 heuristics for when DaqContext has no specification provided by user.
47  // If
48  // - number of primary sources == 1
49  // - number of files from that primary == 1
50  // then we automatically designate it the *in-place* target.
51  std::optional<std::string> target_source_name;
52 
53  if (1 == ctx.prim_sources.size() &&
54  1 == std::count_if(ctx.results.begin(),
55  ctx.results.end(),
56  [source = ctx.prim_sources[0].name](DpPart const& part) {
57  return part.SourceName() == source;
58  })) {
59  auto it = std::find_if(ctx.results.begin(),
60  ctx.results.end(),
61  [source = ctx.prim_sources[0].name](DpPart const& part) {
62  return part.SourceName() == source;
63  });
64  assert(it != ctx.results.end());
65  if (std::holds_alternative<std::string>(it->Part())) {
66  // At this point we have:
67  // 1 primary source
68  // 1 output fits file
69  auto const& path = std::get<std::string>(it->Part());
70  target_source_name = ctx.prim_sources[0].name;
71  LOG4CPLUS_DEBUG(logger,
72  fmt::format("{}: Heuristics resulted in using the file "
73  "{} from {} as *in-place* merge target.",
74  ctx.id,
75  path,
76  *target_source_name));
77  auto& source = dp_spec.target.source.emplace();
78  source.source_name = it->SourceName();
79  source.location = path;
80  }
81  }
82 
83  if (ctx.results.empty()) {
84  throw boost::enable_current_exception(
85  std::invalid_argument("Cannot create data product specification with no results"));
86  }
87 
88  for (DpPart const& r : ctx.results) {
89  if (target_source_name && *target_source_name == r.SourceName()) {
90  // Skip target file
91  continue;
92  }
93  if (std::holds_alternative<fits::KeywordVector>(r.Part())) {
94  // keywords
96  s.source_name = r.SourceName();
97  s.keywords = std::get<fits::KeywordVector>(r.Part());
98  dp_spec.sources.push_back(s);
99  } else if (std::holds_alternative<std::string>(r.Part())) {
101  s.source_name = r.SourceName();
102  s.location = std::get<std::string>(r.Part());
103  dp_spec.sources.push_back(s);
104  }
105  }
106 
107  return dp_spec;
108 }
109 
110 } // namespace v1
111 
112 namespace v2 {
113 
114 /**
115  * Per data source common specification that is only used for more efficient lookup.
116  */
118  /**
119  * Position index in original specification, used to order sources.
120  */
121  std::size_t index = std::numeric_limits<std::size_t>::max();
122  std::optional<json::InitialKeywords> initial_keywords = std::nullopt;
124 };
125 
126 std::unordered_map<std::string, CommonSourceSpecifications>
128  // NOTE: index 0 is reserved for internal OCM keywords
129  std::size_t index = 1;
130  std::unordered_map<std::string, CommonSourceSpecifications> lookup_map;
131 
132  for (auto const& source : spec.sources) {
134  common.index = index++;
135  std::string source_name;
136  std::visit(
137  [&](auto const& v) {
138  source_name = v.source_name;
139  common.initial_keywords = v.initial_keywords;
140  common.keyword_rules = v.keyword_rules;
141  },
142  source);
143 
144  lookup_map.emplace(std::move(source_name), std::move(common));
145  }
146  return lookup_map;
147 }
148 
149 /**
150  * Creates and returns the `/sources` and `/target` structures using DaqContext::specification.
151  *
152  * Assumptions:
153  * - source names are unique (DAQ result `DpPart` is uniqely identifying where it comes from.
154  */
155 json::DpSpec MakeDataProductSpecification(DaqContext const& ctx, log4cplus::Logger& logger) {
156  json::DpSpec dp_spec;
157  dp_spec.id = ctx.id;
158  dp_spec.target.file_id = ctx.file_id;
159  dp_spec.target.file_prefix = ctx.dp_name_prefix;
160 
161  // Add OCM keywords
162  if (!ctx.keywords.empty()) {
163  dp_spec.sources.emplace_back(MakeOcmKeywords(ctx, logger));
164  }
165 
166  auto lookup = [lookup_map = MakeCommonSpecifications(*ctx.specification)](
167  std::string const& source_name) -> CommonSourceSpecifications const* {
168  auto it = lookup_map.find(source_name);
169  if (it != std::end(lookup_map)) {
170  return &it->second;
171  }
172  return nullptr;
173  };
174 
175  // Objectives:
176  // - Use order from specification.sources
177  // - Use for each result use the specified keywordRules from specification.sources.
178  // - Use specified target (use lookup by sourceName)
179 
180  for (DpPart const& r : ctx.results) {
181  LOG4CPLUS_DEBUG(logger, "Adding DpPart " << r);
182  auto* common = lookup(r.SourceName());
183 
184  if (ctx.specification->merge_target.has_value() &&
185  ctx.specification->merge_target->source_name == r.SourceName() &&
186  std::holds_alternative<std::string>(r.Part())) {
187  LOG4CPLUS_DEBUG(logger, "Considering merge target " << r);
188  // Source is a file (as identified by std::string) that matches requested merge_target
189  // name -> Add as merge target!
190 
191  if (!dp_spec.target.source.has_value()) {
193  s.source_name = r.SourceName();
194  s.location = std::get<std::string>(r.Part());
195  dp_spec.target.source = s;
196  // NOTE keyword rules are not used for target yet.
197  LOG4CPLUS_DEBUG(logger,
198  fmt::format("Added merge target source from {} with file {}",
199  s.source_name,
200  s.location));
201  continue;
202  } else {
203  LOG4CPLUS_WARN(logger,
204  fmt::format("Multiple source files matched as merge-target! First "
205  "one has been chosen: {}",
206  dp_spec.target.source->location));
207  }
208  }
209 
210  // Non merge-target sources which are either JSON keywords or FITS files
211  if (std::holds_alternative<fits::KeywordVector>(r.Part())) {
213  s.source_name = r.SourceName();
214  s.keywords = std::get<fits::KeywordVector>(r.Part());
215  if (common != nullptr) {
216  LOG4CPLUS_INFO(logger,
217  "Has common, and " << (!common->keyword_rules.empty()
218  ? "has keyword rules"
219  : "does NOT have keyword rules"));
220  s.initial_keywords = common->initial_keywords;
221  s.keyword_rules = common->keyword_rules;
222  }
223  dp_spec.sources.push_back(s);
224  } else if (std::holds_alternative<std::string>(r.Part())) {
226  s.source_name = r.SourceName();
227  s.location = std::get<std::string>(r.Part());
228  if (common != nullptr) {
229  LOG4CPLUS_INFO(logger,
230  "Has common, and " << (!common->keyword_rules.empty()
231  ? "has keyword rules"
232  : "does NOT have keyword rules"));
233  s.initial_keywords = common->initial_keywords;
234  s.keyword_rules = common->keyword_rules;
235  }
236  dp_spec.sources.push_back(s);
237  }
238  }
239 
240  // Lookup index of provided source.
241  auto index_of =
242  [&, ocm_name = MakeOcmName(ctx)](json::DpSpec::SourceTypes const& s) -> std::size_t {
243  auto const& name = std::visit([](auto const& t) { return t.source_name; }, s);
244  if (name == ocm_name) {
245  // Special case for OCM which needs to come first.
246  return 0;
247  }
248  auto* res = lookup(name);
249  if (res != nullptr) {
250  return res->index;
251  }
252  return std::numeric_limits<std::size_t>::max();
253  };
254 
255  // Stable sort sources according to the index order.
256  std::stable_sort(std::begin(dp_spec.sources),
257  std::end(dp_spec.sources),
258  [&](json::DpSpec::SourceTypes const& a, json::DpSpec::SourceTypes const& b) {
259  return index_of(a) < index_of(b);
260  });
261  return dp_spec;
262 }
263 
264 } // namespace v2
265 
266 json::DpSpec MakeDataProductSpecification(DaqContext const& ctx, log4cplus::Logger& logger) {
267  LOG4CPLUS_DEBUG(logger,
268  "MakeDataProductSpecification: DaqContext has a specification: "
269  << (ctx.specification.has_value() ? "yes" : "no"));
270  if (ctx.specification.has_value()) {
271  return v2::MakeDataProductSpecification(ctx, logger);
272  } else {
273  return v1::MakeDataProductSpecification(ctx, logger);
274  }
275 }
276 
277 } // namespace daq
Provides information of the location and source of a FITS file or keywords produced by a data acquisi...
Definition: dpPart.hpp:26
auto Part() const noexcept -> PartTypes const &
Holds a std::string path [[user]@host:]path or FITS keywords.
Definition: dpPart.hpp:54
auto SourceName() const noexcept -> std::string const &
Source name of the part.
Definition: dpPart.hpp:44
Contains data structure for FITS keywords.
std::string id
Definition: dpSpec.hpp:42
std::optional< FitsFileSource > source
Definition: dpSpec.hpp:37
std::vector< KeywordRuleTypes > KeywordRules
std::vector< SourceTypes > sources
Definition: dpSpec.hpp:44
std::variant< FitsKeywordsSource, FitsFileSource > SourceTypes
Definition: dpSpec.hpp:40
std::string file_prefix
Optioal user chosen file prefix to make it easier to identify the produced file.
Definition: dpSpec.hpp:36
std::vector< DataSourceTypes > sources
Definition: startDaqV2.hpp:51
Close representation of the JSON structure but with stronger types.
Definition: dpSpec.hpp:30
Structure with a close mapping from JSON representation in the StartDaqV2 MAL request.
Definition: startDaqV2.hpp:33
json::DpSpec MakeDataProductSpecification(DaqContext const &ctx, log4cplus::Logger &logger)
Definition: makeDpSpec.cpp:35
std::optional< json::InitialKeywords > initial_keywords
Definition: makeDpSpec.cpp:122
std::unordered_map< std::string, CommonSourceSpecifications > MakeCommonSpecifications(json::StartDaqV2Spec const &spec)
Definition: makeDpSpec.cpp:127
std::size_t index
Position index in original specification, used to order sources.
Definition: makeDpSpec.cpp:121
json::DpSpec MakeDataProductSpecification(DaqContext const &ctx, log4cplus::Logger &logger)
Creates and returns the /sources and /target structures using DaqContext::specification.
Definition: makeDpSpec.cpp:155
Per data source common specification that is only used for more efficient lookup.
Definition: makeDpSpec.cpp:117
json::FitsKeywordsSource MakeOcmKeywords(DaqContext const &ctx, log4cplus::Logger &logger)
Make OCM keywords source.
Definition: makeDpSpec.cpp:26
std::string MakeOcmName(DaqContext const &ctx)
Definition: makeDpSpec.cpp:19
json::DpSpec MakeDataProductSpecification(DaqContext const &ctx, log4cplus::Logger &logger)
Creates a Data Product Specification as serialized JSON from the provided DaqContext.
Definition: makeDpSpec.cpp:266
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:44
DpParts results
Results from Data Acquisition (FITS files and keywords).
Definition: daqContext.hpp:102
std::string process_name
User defined process name.
Definition: daqContext.hpp:70
std::vector< daq::fits::KeywordVariant > keywords
Keyword list provided by OCM to Data Product.
Definition: daqContext.hpp:87
std::vector< Source > prim_sources
Definition: daqContext.hpp:76
std::optional< json::StartDaqV2Spec > specification
Optional specification, if DAQ was started using StartDaqV2.
Definition: daqContext.hpp:116
std::string file_id
Data Product FileId as specified by OLAS ICD.
Definition: daqContext.hpp:65
std::string dp_name_prefix
Data product file name prefix.
Definition: daqContext.hpp:75
std::string id
DAQ identfier, possibly provided by user.
Definition: daqContext.hpp:60