ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
scheduler.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_dpm
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daq::dpm::Scheduler implementation
9  */
10 #include <daq/dpm/scheduler.hpp>
11 
12 #include <algorithm>
13 
14 #include <boost/asio/post.hpp>
15 #include <fmt/format.h>
16 #include <fmt/ostream.h>
17 #include <log4cplus/loggingmacros.h>
18 
19 #include <daq/dpm/config.hpp>
20 #include <daq/dpm/workspace.hpp>
21 #include <daq/error/report.hpp>
22 #include <daq/log4cplus.hpp>
23 
24 namespace daq::dpm {
25 
26 namespace {
27 void PrintArgs(log4cplus::Logger& logger, std::vector<std::string> const& args) {
28  std::stringstream ss;
29  ss << "{";
30  bool first = true;
31  for (auto const& token : args) {
32  if (!first) {
33  ss << ", ";
34  }
35  ss << token;
36  first = false;
37  }
38  ss << "}";
39  LOG4CPLUS_DEBUG(logger, "Executing merger with args: " << ss.str());
40 }
41 } // namespace
42 
43 ResourceToken::ResourceToken(Resource* resource) noexcept : m_resource(resource) {
44 }
45 
47  m_resource->Release();
48 }
49 
50 void Resource::Release() noexcept {
51  m_used--;
52  // If limit is 0 (infinite resouces) then there's no point in signalling as nothing will ever be
53  // blocked by missing resource.
54  if (m_limit > 0 && m_used < m_limit) {
55  m_signal();
56  }
57 }
58 
59 std::ostream& operator<<(std::ostream& os, DaqController const& daq) {
60  os << "DAQ{" << daq.GetStatus() << "}";
61  return os;
62 }
63 
65  Workspace& workspace,
66  DaqControllerFactory daq_controller_factory,
67  SchedulerOptions const& options)
68  : m_executor(executor)
69  , m_workspace(workspace)
70  , m_daq_controller_factory(std::move(daq_controller_factory))
71  , m_options(options)
72  , m_logger(log4cplus::Logger::getInstance(LOGGER_NAME_SCHEDULER))
73  , m_liveness(std::make_shared<bool>(false)) {
74  m_queue = m_workspace.LoadQueue();
75  auto slot = [liveness = std::weak_ptr<bool>(m_liveness), this]() {
76  if (auto ptr = liveness.lock(); ptr) {
77  // Scheduler still alive
78  DeferredPoll();
79  }
80  };
81  m_resources.daqs.Connect(slot);
82  m_resources.net_receive.Connect(slot);
83  m_resources.merge.Connect(slot);
84  m_resources.net_send.Connect(slot);
85 }
86 
88  m_stopped = false;
89  DeferredPoll();
90 }
91 
93  m_stopped = true;
94 }
95 
96 std::string SchedulerImpl::QueueDaq(std::string const& dp_spec_serialized) {
97  LOG4CPLUS_TRACE(m_logger, "QueueDaq()");
98  try {
99  auto json = nlohmann::json::parse(dp_spec_serialized);
100  json::DpSpec dp_spec = json::ParseDpSpec(json);
101  std::string const& id = dp_spec.id;
102  std::string const& file_id = dp_spec.target.file_id;
103  if (IsQueued(id)) {
104  LOG4CPLUS_ERROR(m_logger, "QueueDaq(): DAQ conflict detected -> aborting");
105  throw std::invalid_argument(
106  fmt::format("Scheduler: Could not queue DAQ for merging as "
107  "a Data Acquisition with same id has been queued before: '{}'",
108  id));
109  }
110  // New daq for this workspace -> Initialize persistent state.
111  try {
112  LOG4CPLUS_INFO(m_logger,
113  fmt::format("QueueDaq(): Initializing new workspace for DAQ {}", id));
114  auto daq_ws = m_workspace.InitializeDaq(id);
115  assert(daq_ws);
116  try {
117  assert(daq_ws);
118  Status initial_status;
119  initial_status.id = id;
120  initial_status.file_id = file_id;
121  // Initial state in DPM is Scheduled as that's where DPM takes over.
122  initial_status.state = State::Scheduled;
123  initial_status.error = false;
124  initial_status.timestamp = Status::TimePoint::clock::now();
125 
126  daq_ws->StoreStatus(initial_status);
127  daq_ws->StoreSpecification(dp_spec_serialized);
128 
129  try {
130  m_queue.push_back(id);
131  // Finally update the backlog
132  m_workspace.StoreQueue(m_queue);
133 
134  DeferredPoll();
135  return id;
136  } catch (...) {
137  // Undo push
138  m_queue.pop_back();
139  std::throw_with_nested(std::runtime_error("Failed to store DAQ queue"));
140  }
141  } catch (...) {
142  std::throw_with_nested(
143  std::runtime_error("Failed to write status to DAQ workspace"));
144  }
145  } catch (...) {
146  // Undo creation of DAQ workspace
147  m_workspace.RemoveDaq(id);
148  std::throw_with_nested(std::runtime_error(fmt::format(
149  "Failed to initialize DAQ workspace in {}", m_workspace.GetPath().native())));
150  }
151  } catch (json::DpSpecError const&) {
152  LOG4CPLUS_DEBUG(m_logger,
153  "Invalid Data Product Specification provided: \n"
154  << dp_spec_serialized);
155 
156  std::throw_with_nested(
157  std::invalid_argument("Scheduler: Could not queue DAQ for merging because "
158  "Data Product Specification is invalid"));
159  } catch (nlohmann::json::parse_error const&) {
160  std::throw_with_nested(
161  std::invalid_argument("Scheduler: Could not queue DAQ for merging because provided "
162  "data product specification is invalid JSON"));
163  }
164 }
165 
166 bool SchedulerImpl::IsQueued(std::string const& id) const noexcept {
167  return std::find(m_queue.begin(), m_queue.end(), id) != m_queue.end();
168 }
169 
170 Status SchedulerImpl::GetDaqStatus(std::string const& id) const try {
171  LOG4CPLUS_TRACE(m_logger, fmt::format("GetDaqStatus({})", id));
172  if (!IsQueued(id)) {
173  LOG4CPLUS_INFO(m_logger, fmt::format("GetDaqStatus({}): No such ID", id));
174  throw std::invalid_argument(fmt::format("DAQ with id='{}' not found", id));
175  }
176  return m_workspace.LoadDaq(id)->LoadStatus();
177 } catch (...) {
178  std::throw_with_nested(std::runtime_error("Scheduler: GetDaqStatus() failed"));
179 }
180 
181 std::vector<std::string> SchedulerImpl::GetQueue() const noexcept {
182  return m_queue;
183 }
184 
185 void SchedulerImpl::AbortDaq(std::string const& id) try {
186  LOG4CPLUS_TRACE(m_logger, fmt::format("AbortDaq({})", id));
187  // - If DAQ is either being released or is already in final state it cannot be aborted.
188  // - If DAQ is active must first be made inactive by destroying DaqController which will also
189  // terminate started processes.
190  // - If DAQ is not active aborting simply means to erase DAQ from filesystem and queue.
191  if (!IsQueued(id)) {
192  LOG4CPLUS_INFO(m_logger, fmt::format("AbortDaq({}): No such ID", id));
193  throw std::invalid_argument(fmt::format("DAQ with id='{}' not found", id));
194  }
195 
196  // If DAQ is active/in-progress we must first delete it
197  auto it = std::find_if(m_active.begin(), m_active.end(), [&id](Active const& active) {
198  assert(active.daq);
199  return id == active.daq->GetId();
200  });
201 
202  // Status to update with final aborted state
203  auto pub = [&, status = m_workspace.LoadDaq(id)->LoadStatus()](State new_state) mutable {
204  status.state = State::AbortingMerging;
205  m_status_signal(status);
206  };
207 
208  if (it != m_active.end()) {
209  auto state = it->daq->GetState();
210  if (IsFinalState(state) || state == State::Releasing) {
211  LOG4CPLUS_DEBUG(m_logger,
212  fmt::format("AbortDaq({}): Cannot abort in state {}", id, state));
213  }
214  // Erase if active
215  LOG4CPLUS_DEBUG(
216  m_logger,
217  fmt::format("AbortDaq({}): Erasing active DAQ currently in state {}", id, state));
218  m_active.erase(it);
219  }
220 
221  // Publish that we're aborting
223 
224  // Erase id from queue
225  LOG4CPLUS_DEBUG(m_logger, fmt::format("AbortDaq({}): Removing DAQ from merge queue", id));
226  m_queue.erase(std::find(m_queue.begin(), m_queue.end(), id));
227  m_workspace.StoreQueue(m_queue);
228 
229  // Delete workspace
230  LOG4CPLUS_DEBUG(m_logger, fmt::format("AbortDaq({}): Removing DAQ workspace", id));
231  m_workspace.RemoveDaq(id);
232 
233  // Publish that DAQ is aborted
234  pub(State::Aborted);
235 
236  // Queue has changed -> Poll
237  DeferredPoll();
238  LOG4CPLUS_INFO(m_logger, fmt::format("AbortDaq({}): Aborted and removed DAQ", id));
239 } catch (...) {
240  std::throw_with_nested(std::runtime_error("Scheduler: AbortDaq() failed"));
241 }
242 
243 boost::signals2::connection SchedulerImpl::ConnectStatus(StatusSignal::slot_type const& slot) {
244  return m_status_signal.connect(slot);
245 }
246 
247 void SchedulerImpl::Poll() {
248  LOG4CPLUS_TRACE(m_logger, "Poll()");
249  ArchiveCompleted();
250  ActivateFromQueue();
251 }
252 
253 void SchedulerImpl::ArchiveCompleted() {
254  // Archive completed DAQs
255  for (auto& active : m_active) { // auto it = remove_it; it != m_active.end(); ++it) {
256  try {
257  if (!IsFinalState(active.daq->GetState())) {
258  continue;
259  }
260  auto id = active.daq->GetId();
261 
262  auto archive_path = m_workspace.ArchiveDaq(id);
263  LOG4CPLUS_INFO(
264  m_logger,
265  fmt::format("DAQ {} is in final state {} -> moved workspace to archive: {}",
266  id,
267  active.daq->GetState(),
268  archive_path.native()));
269 
270  // Delete controller
271  active.daq.reset();
272 
273  // Remove ID from queue
274  m_queue.erase(std::find(m_queue.begin(), m_queue.end(), id));
275  m_workspace.StoreQueue(m_queue);
276  } catch (...) {
277  // @todo: Decide what to do here...
278  LOG4CPLUS_ERROR(m_logger,
279  "Failed to archive DAQ workspace:\n"
280  << error::NestedExceptionReporter(std::current_exception()));
281  }
282  }
283 
284  // Erase removed daqs
285  auto remove_it = std::remove_if(
286  m_active.begin(), m_active.end(), [&](auto const& active) { return !active.daq; });
287  if (remove_it != m_active.end()) {
288  m_active.erase(remove_it, m_active.end());
289  // Since this freed resources we schedule poll() to be run again.
290  DeferredPoll();
291  }
292 }
293 
294 void SchedulerImpl::ActivateFromQueue() {
295  // Update active DAQs
296  // See if a new DAQ can be active.
297  if (m_queue.empty()) {
298  LOG4CPLUS_INFO(m_logger, "Merge queue empty - all done!");
299  return;
300  }
301 
302  auto candidates = GetCandidates();
303  if (candidates.empty()) {
304  LOG4CPLUS_INFO(m_logger, "All DAQ merge candidates are active/in-progress");
305  return;
306  }
307  // Schedule as many candidates as allowed by resource limits
308  try {
309  for (auto const& id : candidates) {
310  LOG4CPLUS_TRACE(m_logger, fmt::format("{}: Attempting to schedule DAQ", id));
311  auto maybe_token = m_resources.daqs.Acquire();
312  if (!maybe_token) {
313  LOG4CPLUS_INFO(m_logger,
314  fmt::format("Limit reached: Cannot schedule '{}' "
315  "Currently active DAQs at/exceed limit. "
316  "current: {}, limit: {}, queue size: {}",
317  id,
318  m_resources.daqs.GetUsed(),
319  m_resources.daqs.GetLimit(),
320  m_queue.size()));
321  return;
322  }
323 
324  try {
325  // @todo: Store token with DAQ?
326  auto ws = m_workspace.LoadDaq(id);
327  auto daq_controller = m_daq_controller_factory(std::move(ws), m_resources);
328 
329  // Aggregate signal
330  // both DaqController and slot are owned by Scheduler so we don't need
331  // to manage connection lifetime.
332  daq_controller->GetStatus().ConnectStatus([&](ObservableStatus const& s) {
333  // Completed DAQs should be archived so we schedule a poll in that case.
334  if (IsFinalState(s.GetState())) {
335  DeferredPoll();
336  }
337  // Just forward status
338  m_status_signal(s.GetStatus());
339  });
340  // Tell DaqController to start
341  daq_controller->Start();
342 
343  m_active.emplace_back(std::move(daq_controller), std::move(*maybe_token));
344 
345  LOG4CPLUS_DEBUG(m_logger, fmt::format("{}: DAQ scheduled for merging", id));
346  // Schedule new Poll() as queue has been modified.
347  DeferredPoll();
348  } catch (...) {
349  std::throw_with_nested(
350  std::runtime_error(fmt::format("{}: Failed to activate DAQ for merging.", id)));
351  }
352  }
353  } catch (...) {
354  std::throw_with_nested(std::runtime_error("Failed to load "));
355  }
356 }
357 
358 void SchedulerImpl::DeferredPoll() {
359  if (m_stopped || *m_liveness) {
360  // Stopped or already scheduled
361  return;
362  }
363  boost::asio::post(m_executor.get_io_context().get_executor(),
364  [liveness = std::weak_ptr<bool>(m_liveness), this]() {
365  if (auto ptr = liveness.lock(); ptr) {
366  // Scheduler still alive
367  // Clear flag before invoking Poll so that another
368  // poll can be scheduled.
369  *ptr = false;
370  Poll();
371  }
372  });
373 }
374 
375 std::vector<std::string> SchedulerImpl::GetCandidates() const {
376  // Return all DAQs in m_queue that is not in m_active.
377  std::vector<std::string> candidates;
378  for (auto const& id : m_queue) {
379  auto it = std::find_if(m_active.begin(), m_active.end(), [&](auto const& active) {
380  assert(active.daq);
381  return active.daq->GetId() == id;
382  });
383  if (it == m_active.end()) {
384  candidates.push_back(id);
385  }
386  }
387  return candidates;
388 }
389 
390 DaqControllerImpl::DaqControllerImpl(rad::IoExecutor& executor,
391  std::unique_ptr<DaqWorkspace> workspace,
392  Resources& resources,
393  RsyncFactory rsync_factory,
394  ProcFactory proc_factory,
395  DaqControllerOptions options)
396  : m_executor(executor)
397  , m_workspace(std::move(workspace))
398  , m_resources(resources)
399  , m_rsync_factory(std::move(rsync_factory))
400  , m_proc_factory(std::move(proc_factory))
401  , m_options(std::move(options))
402  , m_dpspec(m_workspace->LoadSpecification())
403  , m_result()
404  , m_state_ctx(Scheduled{}) // m_state_ctx is updated in body with actual state.
405  , m_status(m_workspace->LoadStatus())
406  , m_status_connection()
407  , m_liveness(std::make_shared<bool>(false))
408  , m_stopped(true)
409  , m_logger(log4cplus::Logger::getInstance(LOGGER_NAME_CONTROLLER)) {
410  if (m_options.merge_bin.empty()) {
411  throw std::invalid_argument("Specified merge application name is empty");
412  }
413  if (m_options.rsync_bin.empty()) {
414  throw std::invalid_argument("Specified rsync application name is empty");
415  }
416 
417  // If result has already been created it is contained in the stored status.
418  // Otherwise we set to-be-used absolute path
419  if (m_status.GetStatus().result.empty()) {
420  m_result = m_workspace->GetResultPath() /
421  (m_dpspec.target.file_prefix + m_dpspec.target.file_id + ".fits");
422  } else {
423  m_result = m_status.GetStatus().result;
424  }
425 
426  // Update m_state from loaded status.
427  auto error = m_status.GetError();
428  switch (m_status.GetState()) {
429  case State::Scheduled:
430  SetState(Scheduled{}, error);
431  break;
432  case State::Transferring:
433  SetState(Transferring{SourceResolver(m_workspace->LoadSourceLookup())}, error);
434  break;
435  case State::Merging:
436  SetState(Merging(), error);
437  break;
438  case State::Completed:
439  SetState(Completed{}, error);
440  break;
441  default:
442  throw std::runtime_error("Not implemented");
443  };
444 
445  m_status_connection = m_status.ConnectStatus(
446  [prev = m_status.GetStatus(), this](ObservableStatus const& status) mutable {
447  if (prev == status.GetStatus()) {
448  // No changes -> don't write
449  return;
450  }
451  prev = status.GetStatus();
452  LOG4CPLUS_TRACE(m_logger,
453  fmt::format("DaqController: Updating workspace status file: {}", prev));
454  m_workspace->StoreStatus(prev);
455  });
456 }
457 
459  m_stopped = false;
460  DeferredPoll();
461 }
462 
464  m_stopped = true;
465 }
466 
467 auto DaqControllerImpl::IsStopped() const noexcept -> bool {
468  return m_stopped;
469 }
470 
471 auto DaqControllerImpl::GetId() const noexcept -> std::string const& {
472  return m_status.GetId();
473 }
474 
475 auto DaqControllerImpl::GetErrorFlag() const noexcept -> bool {
476  return m_status.GetError();
477 }
478 
479 auto DaqControllerImpl::GetState() const noexcept -> State {
480  return m_status.GetState();
481 }
482 
484  return m_status;
485 }
486 
487 auto DaqControllerImpl::GetStatus() const noexcept -> ObservableStatus const& {
488  return m_status;
489 }
490 
491 void DaqControllerImpl::DeferredPoll() {
492  if (m_stopped || *m_liveness) {
493  // Stopped or already scheduled
494  return;
495  }
496  boost::asio::post(m_executor.get_io_context().get_executor(),
497  [liveness = std::weak_ptr<bool>(m_liveness), this]() {
498  if (auto ptr = liveness.lock(); ptr) {
499  // Scheduler still alive
500  // Clear flag before invoking Poll so that another
501  // poll can be scheduled.
502  *ptr = false;
503  Poll();
504  }
505  });
506 }
507 
509  LOG4CPLUS_TRACE(m_logger, "Poll()");
510  if (m_stopped) {
511  LOG4CPLUS_DEBUG(
512  m_logger,
513  fmt::format("{}: Poll() - DaqController is stopped so nothing will be done", *this));
514  // Stopped or already scheduled
515  return;
516  }
517  std::visit([this](auto& state) { Poll(state); }, m_state_ctx);
518 }
519 
520 void DaqControllerImpl::Poll(DaqControllerImpl::Scheduled&) {
521  LOG4CPLUS_TRACE(m_logger, "Poll(Scheduled)");
522  try {
523  // Create list of remote sources and build mapping to local files.
524  std::vector<json::FitsFileSource> sources;
525  if (m_dpspec.target.source) {
526  sources.push_back(*m_dpspec.target.source);
527  }
528  for (auto const& s : m_dpspec.sources) {
529  if (!std::holds_alternative<json::FitsFileSource>(s)) {
530  continue;
531  }
532  sources.push_back(std::get<json::FitsFileSource>(s));
533  }
534 
535  auto sources_path = m_workspace->GetSourcesPath();
536 
537  SourceResolver resolver;
538  unsigned index = 0;
539  for (auto const& s : sources) {
540  json::Location location = json::ParseSourceLocation(s.location);
541  // Use <index>_<source>_<filename> as local filename.
542  auto local_path =
543  sources_path /
544  fmt::format("{}_{}_{}", index, s.source_name, location.path.filename().native());
545  LOG4CPLUS_INFO(m_logger,
546  fmt::format("Poll(Scheduled): Source file '{}' from source \"{}\" "
547  "on host \"{}\" will be stored in {}",
548  location.path,
549  s.source_name,
550  !location.host.empty() ? location.host.c_str() : "<n/a>",
551  local_path));
552 
553  resolver.Add({s.source_name, s.location}, local_path);
554  }
555 
556  // Store source resolution mapping.
557  m_workspace->StoreSourceLookup(resolver.GetMapping());
558 
559  SetState(Transferring(std::move(resolver)));
560  } catch (...) {
561  auto msg =
562  fmt::format("{}: Failed to collect and store list of required sources", m_status);
563  LOG4CPLUS_ERROR(m_logger, fmt::format("Poll(Scheduled): Failed to process DAQ: {}", msg));
564  // No point in retrying these kinds of errors.
565  Stop();
566  std::throw_with_nested(std::runtime_error(msg));
567  }
568 }
569 
570 void DaqControllerImpl::Poll(DaqControllerImpl::Transferring& ctx) {
571  LOG4CPLUS_TRACE(m_logger, "Poll(Transferring)");
572  try {
573  // Check if any source is missing and start transfers if possible
574  // If all transfers are done transition to State::Merging.
575 
576  auto const& sources = ctx.resolver.GetMapping();
577  auto missing = sources.size();
578  auto root = m_workspace->GetPath();
579  for (auto const& source : sources) {
580  if (ctx.HasTransfer(source.first)) {
581  // Transfer already in progress
582  continue;
583  }
584  // [recovery]
585  if (m_workspace->Exists(source.second)) {
586  missing--;
587  // File already exist
588  continue;
589  }
590  if (m_soft_stop) {
591  // If there are errors we don't initiate new transfers
592  LOG4CPLUS_TRACE(m_logger,
593  "Poll(Transferring): Soft stop is enabled"
594  "-> won't start new transfer");
595  continue;
596  }
597  // We need to transfer the file, try to start.
598  auto token = m_resources.net_receive.Acquire();
599  if (token) {
600  // We got the token, so we can start transfer
601  json::Location location = json::ParseSourceLocation(source.first.location);
602  using Proc = std::shared_ptr<RsyncAsyncProcessIf>;
603  RsyncOptions opts;
604  opts.rsync = m_options.rsync_bin;
605  Proc proc = m_rsync_factory(m_executor.get_io_context(),
606  location.RsyncPath(),
607  root / source.second,
608  opts,
610 
611  // Connect signals (stdout is ignored for now as it merely contains the progress
612  // which will be handled differently later on).
613  proc->ConnectStderr(
614  [logger = log4cplus::Logger::getInstance(LOGGER_NAME_TRANSFER)](
615  pid_t pid, std::string const& line) {
616  LOG4CPLUS_INFO(logger, pid << ": " << Trim(line));
617  });
618  // Start transfer and set up completion handler
619  proc->Initiate().then(
620  m_executor,
621  [liveness = std::weak_ptr<bool>(m_liveness),
622  source = source.first,
623  dest = source.second,
624  proc = proc,
625  this](boost::future<int> f) {
626  if (liveness.expired()) {
627  LOG4CPLUS_ERROR("dpm.daqcontroller",
628  fmt::format("DaqController abandoned -> ignoring "
629  "result from rsync for transfer of {}",
630  source));
631  return;
632  }
633  TransferComplete(source, dest, std::move(f));
634  });
635  ctx.transfers.emplace_back(Transferring::Transfer{
636  source.first, source.second, std::move(proc), std::move(*token)});
637  } else {
638  // Out of tokens -> bail out
639  LOG4CPLUS_TRACE(m_logger,
640  fmt::format("Poll(Transferring): Could not start transfer due to "
641  "resource limit reached: {}",
642  source.first));
643  return;
644  }
645  }
646 
647  if (missing == 0) {
648  // If all transfers are complete we can transition to next state
649  SetState(Merging(), false);
650  return;
651  }
652  } catch (...) {
653  SetError(true);
654  // No point in retrying these kinds of errors.
655  auto msg = fmt::format("{}: Failed to transfer required sources", m_status);
656  LOG4CPLUS_ERROR(m_logger,
657  fmt::format("Poll(Transferring): Failed to process DAQ: {}", msg));
658  Stop();
659  std::throw_with_nested(std::runtime_error(msg));
660  }
661 }
662 
663 void DaqControllerImpl::TransferComplete(SourceResolver::SourceFile const& source,
664  std::filesystem::path const& local_path,
665  boost::future<int> result) noexcept {
666  LOG4CPLUS_TRACE(m_logger,
667  fmt::format("{}: TransferComplete: {} -> {}", *this, source, local_path));
668  auto* ctx = std::get_if<Transferring>(&m_state_ctx);
669  // Multiple changes can be made here so we defer signalling until we've done them all.
670  auto defer = ObservableStatus::DeferSignal(&m_status);
671  try {
672  if (ctx) {
673  ctx->EraseTransfer(source);
674  }
675 
676  int return_code = result.get();
677  auto alert_id = MakeAlertId(alert::TRANSFERRING_RSYNC, local_path);
678  if (return_code != 0) {
679  m_status.SetAlert(MakeAlert(
680  alert_id, fmt::format("rsync file transfer failed for remote file {}", source)));
681  LOG4CPLUS_ERROR(m_logger, fmt::format("rync file transfer failed: {}", source));
682  throw std::runtime_error("rsync failed to transfer file");
683  }
684  // Opportunistically clear alert in case it as set from previous attempt
685  m_status.ClearAlert(alert_id);
686  LOG4CPLUS_INFO(m_logger,
687  fmt::format("{}: Poll(Transferring): Successfully transfer file: {} -> {}",
688  *this,
689  source,
690  local_path));
691 
692  // Transfer ok
693  DeferredPoll();
694  return;
695  } catch (...) {
696  SetError(true);
697  LOG4CPLUS_ERROR(m_logger,
698  fmt::format("{}: Poll(Transferring): Failed to transfer file: {} -> {}",
699  *this,
700  source,
701  local_path));
702  // Stopping DAQ can be done when there's no more pending file transfers (we let them
703  // complete).
704  if (ctx && ctx->transfers.empty()) {
705  LOG4CPLUS_ERROR(
706  m_logger,
707  fmt::format("{}: Poll(Transferring): All pending transfers have completed "
708  "(with or without error) so we stop DAQ",
709  *this));
710  Stop();
711  }
712  }
713 }
714 
715 bool DaqControllerImpl::Transferring::HasTransfer(
716  SourceResolver::SourceFile const& source) const noexcept {
717  auto it = std::find_if(transfers.cbegin(), transfers.cend(), [&](Transfer const& t) {
718  return t.source == source;
719  });
720  return it != transfers.cend();
721 }
722 
723 DaqControllerImpl::Transferring::Transfer*
724 DaqControllerImpl::Transferring::GetTransfer(SourceResolver::SourceFile const& source) noexcept {
725  auto it = std::find_if(
726  transfers.begin(), transfers.end(), [&](Transfer const& t) { return t.source == source; });
727 
728  if (it != transfers.end()) {
729  return &(*it);
730  }
731  return nullptr;
732 }
733 
734 void DaqControllerImpl::Transferring::EraseTransfer(SourceResolver::SourceFile const& source) {
735  auto it = std::find_if(transfers.cbegin(), transfers.cend(), [&](Transfer const& t) {
736  return t.source == source;
737  });
738  if (it != transfers.end()) {
739  transfers.erase(it);
740  }
741 }
742 
743 DaqControllerImpl::Transferring::~Transferring() noexcept {
744  for (auto& transfer : transfers) {
745  if (transfer.proc && transfer.proc->IsRunning()) {
746  LOG4CPLUS_DEBUG(
747  "dpm",
748  "DaqController::Transferring::~Transferring: Aborting running transfer process");
749  transfer.proc->Abort();
750  }
751  }
752 }
753 
754 DaqControllerImpl::Merging::~Merging() noexcept {
755  if (merger && merger->IsRunning()) {
756  LOG4CPLUS_DEBUG("dpm", "DaqController::Merging::~Merging: Aborting running merge process");
757  merger->Abort();
758  }
759 }
760 
761 void DaqControllerImpl::Poll(DaqControllerImpl::Merging& ctx) {
762  LOG4CPLUS_TRACE(m_logger, "Poll(Merging)");
763  auto defer = ObservableStatus::DeferSignal(&m_status);
764  try {
765  if (ctx.merger) {
766  // Merge is already in progress
767  return;
768  }
769  // [recovery]
770  if (m_workspace->Exists(m_result)) {
771  SetState(Completed{});
772  LOG4CPLUS_TRACE(m_logger,
773  fmt::format("{}: Poll(Merging): "
774  "Recovered from error automatically (manual merge)",
775  *this));
776  return;
777  }
778  auto token = m_resources.merge.Acquire();
779  if (!token) {
780  LOG4CPLUS_TRACE(m_logger,
781  fmt::format("{}: Poll(Merging): Could not start merging due to "
782  "resource limit reached: {}",
783  *this,
784  m_resources.merge.GetLimit()));
785 
786  return;
787  }
788 
789  // Launch merge
790  // note: daqDpmMerge will atomically move result into output so we don't have
791  // to do that ourselves.
792  std::vector<std::string> args{m_options.merge_bin};
793  args.emplace_back("--json");
794  args.emplace_back("--root");
795  args.emplace_back(m_workspace->GetPath());
796  args.emplace_back("--resolver");
797  args.emplace_back(m_workspace->GetSourceLookupPath());
798  args.emplace_back("-o");
799  args.emplace_back(m_result.native());
800  // Positional argument for the specification which is passed as absolute path.
801  args.emplace_back(m_workspace->GetPath() / m_workspace->GetSpecificationPath());
802 
803  PrintArgs(m_logger, args);
804 
805  ctx.merger = m_proc_factory(m_executor.get_io_context(), args);
806 
807  ctx.merger->ConnectStdout([logger = log4cplus::Logger::getInstance(LOGGER_NAME_MERGER),
808  this](pid_t pid, std::string const& line) {
809  LOG4CPLUS_DEBUG(logger, pid << ": " << Trim(line));
810  HandleMergeMessage(line);
811  });
812  ctx.merger->ConnectStderr([logger = log4cplus::Logger::getInstance(LOGGER_NAME_MERGER)](
813  pid_t pid, std::string const& line) {
814  LOG4CPLUS_INFO(logger, pid << ": " << Trim(line));
815  });
816 
817  ctx.merger->Initiate().then(
818  m_executor,
819  [liveness = std::weak_ptr<bool>(m_liveness),
820  id = m_status.GetId(),
821  proc = ctx.merger,
822  this](boost::future<int> f) {
823  if (liveness.expired()) {
824  LOG4CPLUS_ERROR(LOGGER_NAME_SCHEDULER,
825  fmt::format("{}: DaqController abandoned -> ignoring "
826  "result from merger",
827  id));
828  return;
829  }
830  MergeComplete(std::move(f));
831  });
832  ctx.token = std::move(*token);
833  } catch (...) {
834  Stop();
835  auto msg = fmt::format("{}: Failed to initiate merging", *this);
836  LOG4CPLUS_ERROR(m_logger, msg);
837  std::throw_with_nested(std::runtime_error(msg));
838  }
839 }
840 
841 void DaqControllerImpl::MergeComplete(boost::future<int> result) noexcept {
842  LOG4CPLUS_TRACE(m_logger, "MergeComplete()");
843  // Defer signal until all changes have been made
844  auto defer = ObservableStatus::DeferSignal(&m_status);
845  try {
846  auto alert_id = MakeAlertId(alert::MERGING_MERGE, "");
847  auto* ctx = std::get_if<Merging>(&m_state_ctx);
848  if (ctx) {
849  ctx->Reset();
850  }
851  auto exit_code = result.get();
852  if (exit_code != 0) {
853  auto msg = fmt::format("Merging failed with code {}", exit_code);
854  m_status.SetAlert(MakeAlert(alert_id, msg));
855  throw std::runtime_error(fmt::format("Merging failed with code {}", exit_code));
856  }
857  if (!m_workspace->Exists(m_result)) {
858  auto abs_path = m_workspace->GetPath() / m_result;
859  auto msg =
860  fmt::format("Merging reported with success but file is not found: {}", abs_path);
861  m_status.SetAlert(MakeAlert(alert_id, msg));
862  LOG4CPLUS_ERROR(m_logger, msg);
863  throw std::runtime_error(msg);
864  }
865  m_status.ClearAlert(alert_id);
866 
867  // Create symlink and set result
868  m_workspace->MakeResultSymlink(m_result);
869  // @todo: Include host
870  m_status.SetResult(m_result.native());
871 
872  // All good -> transition to State::Completed
873  // @todo: Transition to Releasing rather than completed
874  SetState(Completed{});
875  LOG4CPLUS_INFO(m_logger, fmt::format("{}: Completed successfully!", *this));
876  } catch (...) {
877  SetError(true);
878  Stop();
879  LOG4CPLUS_ERROR(
880  m_logger,
881  fmt::format(
882  "{}: PollMergeComplete: Failed to create data product: {}", *this, m_result));
883  }
884 }
885 
886 void DaqControllerImpl::Poll(Completed&) {
887  LOG4CPLUS_TRACE(m_logger, "Poll(Completed)");
888  // We're done
889  Stop();
890 }
891 
892 void DaqControllerImpl::SetState(StateVariant s, bool error) {
893  m_state_ctx = std::move(s);
894  auto new_state = MakeState(m_state_ctx);
895  if (new_state == m_status.GetState() && error == m_status.GetError()) {
896  // No change
897  return;
898  }
899 
900  m_status.SetState(new_state, error);
901  DeferredPoll();
902 }
903 
904 void DaqControllerImpl::SetError(bool error) {
905  if (error == m_status.GetError()) {
906  // No change
907  return;
908  }
909 
910  m_status.SetError(error);
911  m_soft_stop = true;
912  DeferredPoll();
913 }
914 
915 State DaqControllerImpl::MakeState(StateVariant const& s) {
916  if (std::holds_alternative<Scheduled>(s)) {
917  return State::Scheduled;
918  }
919  if (std::holds_alternative<Transferring>(s)) {
920  return State::Transferring;
921  }
922  if (std::holds_alternative<Merging>(s)) {
923  return State::Merging;
924  }
925  if (std::holds_alternative<Completed>(s)) {
926  return State::Completed;
927  }
928 
929  assert(false); // NOLINT Not implemented yet
930 }
931 
932 DaqControllerImpl::Transferring::Transferring(SourceResolver resolver_arg)
933  : resolver(std::move(resolver_arg)) {
934 }
935 
936 void DaqControllerImpl::Merging::Reset() {
937  merger.reset();
938  token.reset();
939 }
940 
941 void DaqControllerImpl::HandleMergeMessage(std::string const& line) noexcept try {
942  // A valid JSON message on stdout has the basic form:
943  // {"type": "<content-type>",
944  // "timestamp": <int64 nanonseconds since epoch>,
945  // "content": <content>}
946  auto json = nlohmann::json::parse(line);
947  auto const& type = json.at("type").get<std::string>();
948  auto const& content = json.at("content");
949  if (type == "alert") {
950  // Content schema:
951  // {"id", "ID",
952  // "message", "MESSAGE"}
953  std::string id;
954  std::string message;
955  content.at("id").get_to(id);
956  content.at("message").get_to(message);
957  auto alert_id = MakeAlertId(alert::MERGING_MERGE, id);
958  m_status.SetAlert(MakeAlert(alert_id, message));
959  }
960 } catch (...) {
961  LOG4CPLUS_DEBUG(m_logger, "Failed to parse JSON message from merger");
962 }
963 
964 } // namespace daq::dpm
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:165
State GetState() const noexcept
Definition: status.cpp:215
void SetError(bool error) noexcept
Set error flag for data acquisition.
Definition: status.cpp:236
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
Definition: status.cpp:274
void SetState(State s, std::optional< bool > error=std::nullopt) noexcept
Set state of data acquisition.
Definition: status.cpp:223
bool GetError() const noexcept
Definition: status.cpp:219
friend class DeferSignal
Definition: status.hpp:324
boost::signals2::connection ConnectStatus(Signal::slot_type const &slot)
Connect observer that is invoked when state is modified.
Definition: status.hpp:314
std::string const & GetId() const noexcept
Definition: status.cpp:203
auto IsStopped() const noexcept -> bool override
Definition: scheduler.cpp:467
auto GetState() const noexcept -> State override
Definition: scheduler.cpp:479
auto GetId() const noexcept -> std::string const &override
Definition: scheduler.cpp:471
void Start() override
Start/stop operations.
Definition: scheduler.cpp:458
auto GetErrorFlag() const noexcept -> bool override
Definition: scheduler.cpp:475
auto GetStatus() noexcept -> ObservableStatus &override
Definition: scheduler.cpp:483
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string > const &)> ProcFactory
Definition: scheduler.hpp:490
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string const &, std::string const &, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Definition: scheduler.hpp:487
Controller for specific DAQ.
Definition: scheduler.hpp:59
~ResourceToken() noexcept
Definition: scheduler.cpp:46
ResourceToken(Resource *) noexcept
Definition: scheduler.cpp:43
boost::signals2::connection Connect(Signal::slot_type const &slot)
Connect to signal that is emitted when a resource become available.
Definition: scheduler.hpp:214
std::optional< ResourceToken > Acquire() noexcept
Definition: scheduler.hpp:193
void Release() noexcept
Definition: scheduler.cpp:50
unsigned GetUsed() const noexcept
Definition: scheduler.hpp:207
unsigned GetLimit() const noexcept
Definition: scheduler.hpp:204
std::function< std::unique_ptr< DaqController >(std::unique_ptr< DaqWorkspace >, Resources &)> DaqControllerFactory
Definition: scheduler.hpp:349
std::string QueueDaq(std::string const &dp_spec) override
Queues DAQ for processing.
Definition: scheduler.cpp:96
boost::signals2::connection ConnectStatus(StatusSignal::slot_type const &slot) override
Signals.
Definition: scheduler.cpp:243
SchedulerImpl(rad::IoExecutor &executor, Workspace &workspace, DaqControllerFactory daq_controller_factory, SchedulerOptions const &options)
Constructs a scheduler loading information from workspace ws.
Definition: scheduler.cpp:64
void Stop() override
Definition: scheduler.cpp:92
void AbortDaq(std::string const &) override
Abort merging DAQ identified by id.
Definition: scheduler.cpp:185
std::vector< std::string > GetQueue() const noexcept override
Queries current DAQ queue.
Definition: scheduler.cpp:181
void Start() override
Start/stop operations.
Definition: scheduler.cpp:87
Status GetDaqStatus(std::string const &id) const override
Queries current DAQ status, possibly from last recorded status in workspace.
Definition: scheduler.cpp:170
bool IsQueued(std::string const &id) const noexcept override
Queries if DAQ with ID has been queued before in the current workspace.
Definition: scheduler.cpp:166
Provides location of fits source file.
Interface to interact with DPM workspace.
Definition: workspace.hpp:98
virtual void RemoveDaq(std::string const &daq_id)=0
Removes workspace and all containing files for DAQ without archiving it.
virtual auto ArchiveDaq(std::string const &daq_id) -> std::filesystem::path=0
Archives specified DAQ witout deleting any files, typically by moving it to a specific location in th...
virtual auto LoadDaq(std::string const &daq_id) -> std::unique_ptr< DaqWorkspace >=0
Loads a previously initialized DAQ workspace.
virtual auto InitializeDaq(std::string const &daq_id) -> std::unique_ptr< DaqWorkspace >=0
Initializes new DAQ Workspace.
virtual auto GetPath() const -> std::filesystem::path=0
virtual void StoreQueue(std::vector< std::string > const &queue) const =0
virtual auto LoadQueue() const -> std::vector< std::string >=0
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
Definition: ioExecutor.hpp:41
DPM server config.
daq::dpm::Workspace interface and implementation declaration
Declaration of log4cplus helpers.
constexpr std::string_view TRANSFERRING_RSYNC
Failure during rsync source copy.
Definition: status.hpp:37
constexpr std::string_view MERGING_MERGE
Merging failed.
Definition: status.hpp:46
std::ostream & operator<<(std::ostream &os, DaqController const &daq)
Definition: scheduler.cpp:59
const std::string LOGGER_NAME_CONTROLLER
Definition: config.hpp:24
const std::string LOGGER_NAME_SCHEDULER
Definition: config.hpp:23
const std::string LOGGER_NAME_MERGER
Definition: config.hpp:26
const std::string LOGGER_NAME_TRANSFER
Definition: config.hpp:25
Options for DaqController.
Definition: scheduler.hpp:162
Limited resources.
Definition: scheduler.hpp:231
Options controlling scheduler operations.
Definition: scheduler.hpp:132
std::string id
Definition: dpSpec.hpp:42
Location ParseSourceLocation(std::string const &location_str)
Parse location string from DpSpec into component parts.
Definition: dpSpec.cpp:90
DpSpec ParseDpSpec(Json const &json)
Parse JSON to construct the DpSpec structure.
Definition: dpSpec.cpp:47
std::optional< FitsFileSource > source
Definition: dpSpec.hpp:37
std::vector< SourceTypes > sources
Definition: dpSpec.hpp:44
std::string file_prefix
Optioal user chosen file prefix to make it easier to identify the produced file.
Definition: dpSpec.hpp:36
Close representation of the JSON structure but with stronger types.
Definition: dpSpec.hpp:30
AlertId MakeAlertId(std::string_view category, std::string key)
Definition: status.cpp:49
State
Observable states of the data acquisition process.
Definition: state.hpp:39
@ Completed
Completed DAQ.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Releasing
Releasing Data Product to receivers.
@ Aborted
Data acquisition has been aborted by user.
@ Merging
DAQ is being merged.
@ AbortingMerging
Transitional state for aborting during merging.
@ Transferring
Input files are being transferred.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
Definition: state.cpp:15
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:39
Definition: main.cpp:23
daq::dpm::Scheduler and related class declarations.
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:124
State state
Definition: status.hpp:142
std::string id
Definition: status.hpp:140
std::string result
Path to resulting data product.
Definition: status.hpp:152
std::string file_id
Definition: status.hpp:141
bool error
Definition: status.hpp:143
TimePoint timestamp
Definition: status.hpp:153