14 #include <boost/asio/post.hpp>
15 #include <fmt/format.h>
16 #include <fmt/ostream.h>
17 #include <log4cplus/loggingmacros.h>
27 void PrintArgs(log4cplus::Logger& logger, std::vector<std::string>
const& args) {
31 for (
auto const& token : args) {
39 LOG4CPLUS_DEBUG(logger,
"Executing merger with args: " << ss.str());
54 if (m_limit > 0 && m_used < m_limit) {
60 os <<
"DAQ{" <<
daq.GetStatus() <<
"}";
68 : m_executor(executor)
69 , m_workspace(workspace)
70 , m_daq_controller_factory(std::move(daq_controller_factory))
73 , m_liveness(std::make_shared<bool>(false)) {
75 auto slot = [liveness = std::weak_ptr<bool>(m_liveness),
this]() {
76 if (
auto ptr = liveness.lock(); ptr) {
97 LOG4CPLUS_TRACE(m_logger,
"QueueDaq()");
99 auto json = nlohmann::json::parse(dp_spec_serialized);
101 std::string
const&
id = dp_spec.
id;
104 LOG4CPLUS_ERROR(m_logger,
"QueueDaq(): DAQ conflict detected -> aborting");
105 throw std::invalid_argument(
106 fmt::format(
"Scheduler: Could not queue DAQ for merging as "
107 "a Data Acquisition with same id has been queued before: '{}'",
112 LOG4CPLUS_INFO(m_logger,
113 fmt::format(
"QueueDaq(): Initializing new workspace for DAQ {}",
id));
119 initial_status.
id = id;
120 initial_status.
file_id = file_id;
123 initial_status.
error =
false;
124 initial_status.
timestamp = Status::TimePoint::clock::now();
126 daq_ws->StoreStatus(initial_status);
127 daq_ws->StoreSpecification(dp_spec_serialized);
130 m_queue.push_back(
id);
139 std::throw_with_nested(std::runtime_error(
"Failed to store DAQ queue"));
142 std::throw_with_nested(
143 std::runtime_error(
"Failed to write status to DAQ workspace"));
148 std::throw_with_nested(std::runtime_error(fmt::format(
149 "Failed to initialize DAQ workspace in {}", m_workspace.
GetPath().native())));
152 LOG4CPLUS_DEBUG(m_logger,
153 "Invalid Data Product Specification provided: \n"
154 << dp_spec_serialized);
156 std::throw_with_nested(
157 std::invalid_argument(
"Scheduler: Could not queue DAQ for merging because "
158 "Data Product Specification is invalid"));
159 }
catch (nlohmann::json::parse_error
const&) {
160 std::throw_with_nested(
161 std::invalid_argument(
"Scheduler: Could not queue DAQ for merging because provided "
162 "data product specification is invalid JSON"));
167 return std::find(m_queue.begin(), m_queue.end(),
id) != m_queue.end();
171 LOG4CPLUS_TRACE(m_logger, fmt::format(
"GetDaqStatus({})",
id));
173 LOG4CPLUS_INFO(m_logger, fmt::format(
"GetDaqStatus({}): No such ID",
id));
174 throw std::invalid_argument(fmt::format(
"DAQ with id='{}' not found",
id));
176 return m_workspace.LoadDaq(
id)->LoadStatus();
178 std::throw_with_nested(std::runtime_error(
"Scheduler: GetDaqStatus() failed"));
186 LOG4CPLUS_TRACE(m_logger, fmt::format(
"AbortDaq({})",
id));
192 LOG4CPLUS_INFO(m_logger, fmt::format(
"AbortDaq({}): No such ID",
id));
193 throw std::invalid_argument(fmt::format(
"DAQ with id='{}' not found",
id));
197 auto it = std::find_if(m_active.begin(), m_active.end(), [&
id](Active
const& active) {
199 return id == active.daq->GetId();
203 auto pub = [&, status = m_workspace.
LoadDaq(
id)->LoadStatus()](
State new_state)
mutable {
205 m_status_signal(status);
208 if (it != m_active.end()) {
209 auto state = it->daq->GetState();
211 LOG4CPLUS_DEBUG(m_logger,
212 fmt::format(
"AbortDaq({}): Cannot abort in state {}",
id, state));
217 fmt::format(
"AbortDaq({}): Erasing active DAQ currently in state {}",
id, state));
225 LOG4CPLUS_DEBUG(m_logger, fmt::format(
"AbortDaq({}): Removing DAQ from merge queue",
id));
226 m_queue.erase(std::find(m_queue.begin(), m_queue.end(),
id));
230 LOG4CPLUS_DEBUG(m_logger, fmt::format(
"AbortDaq({}): Removing DAQ workspace",
id));
238 LOG4CPLUS_INFO(m_logger, fmt::format(
"AbortDaq({}): Aborted and removed DAQ",
id));
240 std::throw_with_nested(std::runtime_error(
"Scheduler: AbortDaq() failed"));
244 return m_status_signal.connect(slot);
247 void SchedulerImpl::Poll() {
248 LOG4CPLUS_TRACE(m_logger,
"Poll()");
253 void SchedulerImpl::ArchiveCompleted() {
255 for (
auto& active : m_active) {
260 auto id = active.daq->GetId();
262 auto archive_path = m_workspace.
ArchiveDaq(
id);
265 fmt::format(
"DAQ {} is in final state {} -> moved workspace to archive: {}",
267 active.daq->GetState(),
268 archive_path.native()));
274 m_queue.erase(std::find(m_queue.begin(), m_queue.end(),
id));
278 LOG4CPLUS_ERROR(m_logger,
279 "Failed to archive DAQ workspace:\n"
280 << error::NestedExceptionReporter(std::current_exception()));
285 auto remove_it = std::remove_if(
286 m_active.begin(), m_active.end(), [&](
auto const& active) { return !active.daq; });
287 if (remove_it != m_active.end()) {
288 m_active.erase(remove_it, m_active.end());
294 void SchedulerImpl::ActivateFromQueue() {
297 if (m_queue.empty()) {
298 LOG4CPLUS_INFO(m_logger,
"Merge queue empty - all done!");
302 auto candidates = GetCandidates();
303 if (candidates.empty()) {
304 LOG4CPLUS_INFO(m_logger,
"All DAQ merge candidates are active/in-progress");
309 for (
auto const&
id : candidates) {
310 LOG4CPLUS_TRACE(m_logger, fmt::format(
"{}: Attempting to schedule DAQ",
id));
313 LOG4CPLUS_INFO(m_logger,
314 fmt::format(
"Limit reached: Cannot schedule '{}' "
315 "Currently active DAQs at/exceed limit. "
316 "current: {}, limit: {}, queue size: {}",
326 auto ws = m_workspace.
LoadDaq(
id);
327 auto daq_controller = m_daq_controller_factory(std::move(ws), m_resources);
332 daq_controller->GetStatus().ConnectStatus([&](ObservableStatus
const& s) {
338 m_status_signal(s.GetStatus());
341 daq_controller->Start();
343 m_active.emplace_back(std::move(daq_controller), std::move(*maybe_token));
345 LOG4CPLUS_DEBUG(m_logger, fmt::format(
"{}: DAQ scheduled for merging",
id));
349 std::throw_with_nested(
350 std::runtime_error(fmt::format(
"{}: Failed to activate DAQ for merging.",
id)));
354 std::throw_with_nested(std::runtime_error(
"Failed to load "));
358 void SchedulerImpl::DeferredPoll() {
359 if (m_stopped || *m_liveness) {
364 [liveness = std::weak_ptr<bool>(m_liveness),
this]() {
365 if (auto ptr = liveness.lock(); ptr) {
375 std::vector<std::string> SchedulerImpl::GetCandidates()
const {
377 std::vector<std::string> candidates;
378 for (
auto const&
id : m_queue) {
379 auto it = std::find_if(m_active.begin(), m_active.end(), [&](
auto const& active) {
381 return active.daq->GetId() == id;
383 if (it == m_active.end()) {
384 candidates.push_back(
id);
391 std::unique_ptr<DaqWorkspace> workspace,
396 : m_executor(executor)
397 , m_workspace(std::move(workspace))
398 , m_resources(resources)
399 , m_rsync_factory(std::move(rsync_factory))
400 , m_proc_factory(std::move(proc_factory))
401 , m_options(std::move(options))
402 , m_dpspec(m_workspace->LoadSpecification())
405 , m_status(m_workspace->LoadStatus())
406 , m_status_connection()
407 , m_liveness(std::make_shared<bool>(false))
411 throw std::invalid_argument(
"Specified merge application name is empty");
414 throw std::invalid_argument(
"Specified rsync application name is empty");
420 m_result = m_workspace->GetResultPath() /
442 throw std::runtime_error(
"Not implemented");
447 if (prev == status.GetStatus()) {
452 LOG4CPLUS_TRACE(m_logger,
453 fmt::format(
"DaqController: Updating workspace status file: {}", prev));
454 m_workspace->StoreStatus(prev);
472 return m_status.
GetId();
491 void DaqControllerImpl::DeferredPoll() {
492 if (m_stopped || *m_liveness) {
497 [liveness = std::weak_ptr<bool>(m_liveness),
this]() {
498 if (auto ptr = liveness.lock(); ptr) {
509 LOG4CPLUS_TRACE(m_logger,
"Poll()");
513 fmt::format(
"{}: Poll() - DaqController is stopped so nothing will be done", *
this));
517 std::visit([
this](
auto& state) {
Poll(state); }, m_state_ctx);
521 LOG4CPLUS_TRACE(m_logger,
"Poll(Scheduled)");
524 std::vector<json::FitsFileSource> sources;
528 for (
auto const& s : m_dpspec.
sources) {
529 if (!std::holds_alternative<json::FitsFileSource>(s)) {
532 sources.push_back(std::get<json::FitsFileSource>(s));
535 auto sources_path = m_workspace->GetSourcesPath();
537 SourceResolver resolver;
539 for (
auto const& s : sources) {
544 fmt::format(
"{}_{}_{}", index, s.source_name, location.path.filename().native());
545 LOG4CPLUS_INFO(m_logger,
546 fmt::format(
"Poll(Scheduled): Source file '{}' from source \"{}\" "
547 "on host \"{}\" will be stored in {}",
550 !location.host.empty() ? location.host.c_str() :
"<n/a>",
553 resolver.Add({s.source_name, s.location}, local_path);
557 m_workspace->StoreSourceLookup(resolver.GetMapping());
562 fmt::format(
"{}: Failed to collect and store list of required sources", m_status);
563 LOG4CPLUS_ERROR(m_logger, fmt::format(
"Poll(Scheduled): Failed to process DAQ: {}", msg));
566 std::throw_with_nested(std::runtime_error(msg));
571 LOG4CPLUS_TRACE(m_logger,
"Poll(Transferring)");
576 auto const& sources = ctx.resolver.GetMapping();
577 auto missing = sources.size();
578 auto root = m_workspace->GetPath();
579 for (
auto const& source : sources) {
580 if (ctx.HasTransfer(source.first)) {
585 if (m_workspace->Exists(source.second)) {
592 LOG4CPLUS_TRACE(m_logger,
593 "Poll(Transferring): Soft stop is enabled"
594 "-> won't start new transfer");
602 using Proc = std::shared_ptr<RsyncAsyncProcessIf>;
606 location.RsyncPath(),
607 root / source.second,
615 pid_t pid, std::string
const& line) {
616 LOG4CPLUS_INFO(logger, pid <<
": " << Trim(line));
619 proc->Initiate().then(
621 [liveness = std::weak_ptr<bool>(m_liveness),
622 source = source.first,
623 dest = source.second,
625 this](boost::future<int> f) {
626 if (liveness.expired()) {
627 LOG4CPLUS_ERROR(
"dpm.daqcontroller",
628 fmt::format(
"DaqController abandoned -> ignoring "
629 "result from rsync for transfer of {}",
633 TransferComplete(source, dest, std::move(f));
635 ctx.transfers.emplace_back(Transferring::Transfer{
636 source.first, source.second, std::move(proc), std::move(*token)});
639 LOG4CPLUS_TRACE(m_logger,
640 fmt::format(
"Poll(Transferring): Could not start transfer due to "
641 "resource limit reached: {}",
655 auto msg = fmt::format(
"{}: Failed to transfer required sources", m_status);
656 LOG4CPLUS_ERROR(m_logger,
657 fmt::format(
"Poll(Transferring): Failed to process DAQ: {}", msg));
659 std::throw_with_nested(std::runtime_error(msg));
663 void DaqControllerImpl::TransferComplete(SourceResolver::SourceFile
const& source,
664 std::filesystem::path
const& local_path,
665 boost::future<int> result) noexcept {
666 LOG4CPLUS_TRACE(m_logger,
667 fmt::format(
"{}: TransferComplete: {} -> {}", *
this, source, local_path));
668 auto* ctx = std::get_if<Transferring>(&m_state_ctx);
673 ctx->EraseTransfer(source);
676 int return_code = result.get();
678 if (return_code != 0) {
680 alert_id, fmt::format(
"rsync file transfer failed for remote file {}", source)));
681 LOG4CPLUS_ERROR(m_logger, fmt::format(
"rync file transfer failed: {}", source));
682 throw std::runtime_error(
"rsync failed to transfer file");
685 m_status.ClearAlert(alert_id);
686 LOG4CPLUS_INFO(m_logger,
687 fmt::format(
"{}: Poll(Transferring): Successfully transfer file: {} -> {}",
697 LOG4CPLUS_ERROR(m_logger,
698 fmt::format(
"{}: Poll(Transferring): Failed to transfer file: {} -> {}",
704 if (ctx && ctx->transfers.empty()) {
707 fmt::format(
"{}: Poll(Transferring): All pending transfers have completed "
708 "(with or without error) so we stop DAQ",
715 bool DaqControllerImpl::Transferring::HasTransfer(
716 SourceResolver::SourceFile
const& source)
const noexcept {
717 auto it = std::find_if(transfers.cbegin(), transfers.cend(), [&](Transfer
const& t) {
718 return t.source == source;
720 return it != transfers.cend();
723 DaqControllerImpl::Transferring::Transfer*
724 DaqControllerImpl::Transferring::GetTransfer(SourceResolver::SourceFile
const& source) noexcept {
725 auto it = std::find_if(
726 transfers.begin(), transfers.end(), [&](Transfer
const& t) { return t.source == source; });
728 if (it != transfers.end()) {
734 void DaqControllerImpl::Transferring::EraseTransfer(SourceResolver::SourceFile
const& source) {
735 auto it = std::find_if(transfers.cbegin(), transfers.cend(), [&](Transfer
const& t) {
736 return t.source == source;
738 if (it != transfers.end()) {
743 DaqControllerImpl::Transferring::~Transferring() noexcept {
744 for (
auto& transfer : transfers) {
745 if (transfer.proc && transfer.proc->IsRunning()) {
748 "DaqController::Transferring::~Transferring: Aborting running transfer process");
749 transfer.proc->Abort();
754 DaqControllerImpl::Merging::~Merging() noexcept {
755 if (merger && merger->IsRunning()) {
756 LOG4CPLUS_DEBUG(
"dpm",
"DaqController::Merging::~Merging: Aborting running merge process");
762 LOG4CPLUS_TRACE(m_logger,
"Poll(Merging)");
770 if (m_workspace->Exists(m_result)) {
771 SetState(Completed{});
772 LOG4CPLUS_TRACE(m_logger,
773 fmt::format(
"{}: Poll(Merging): "
774 "Recovered from error automatically (manual merge)",
780 LOG4CPLUS_TRACE(m_logger,
781 fmt::format(
"{}: Poll(Merging): Could not start merging due to "
782 "resource limit reached: {}",
792 std::vector<std::string> args{m_options.
merge_bin};
793 args.emplace_back(
"--json");
794 args.emplace_back(
"--root");
795 args.emplace_back(m_workspace->GetPath());
796 args.emplace_back(
"--resolver");
797 args.emplace_back(m_workspace->GetSourceLookupPath());
798 args.emplace_back(
"-o");
799 args.emplace_back(m_result.native());
801 args.emplace_back(m_workspace->GetPath() / m_workspace->GetSpecificationPath());
803 PrintArgs(m_logger, args);
807 ctx.merger->ConnectStdout([logger = log4cplus::Logger::getInstance(
LOGGER_NAME_MERGER),
808 this](pid_t pid, std::string
const& line) {
809 LOG4CPLUS_DEBUG(logger, pid <<
": " << Trim(line));
810 HandleMergeMessage(line);
812 ctx.merger->ConnectStderr([logger = log4cplus::Logger::getInstance(
LOGGER_NAME_MERGER)](
813 pid_t pid, std::string
const& line) {
814 LOG4CPLUS_INFO(logger, pid <<
": " << Trim(line));
817 ctx.merger->Initiate().then(
819 [liveness = std::weak_ptr<bool>(m_liveness),
820 id = m_status.
GetId(),
822 this](boost::future<int> f) {
823 if (liveness.expired()) {
825 fmt::format(
"{}: DaqController abandoned -> ignoring "
826 "result from merger",
830 MergeComplete(std::move(f));
832 ctx.token = std::move(*token);
835 auto msg = fmt::format(
"{}: Failed to initiate merging", *
this);
836 LOG4CPLUS_ERROR(m_logger, msg);
837 std::throw_with_nested(std::runtime_error(msg));
841 void DaqControllerImpl::MergeComplete(boost::future<int> result) noexcept {
842 LOG4CPLUS_TRACE(m_logger,
"MergeComplete()");
847 auto* ctx = std::get_if<Merging>(&m_state_ctx);
851 auto exit_code = result.get();
852 if (exit_code != 0) {
853 auto msg = fmt::format(
"Merging failed with code {}", exit_code);
854 m_status.SetAlert(
MakeAlert(alert_id, msg));
855 throw std::runtime_error(fmt::format(
"Merging failed with code {}", exit_code));
857 if (!m_workspace->Exists(m_result)) {
858 auto abs_path = m_workspace->GetPath() / m_result;
860 fmt::format(
"Merging reported with success but file is not found: {}", abs_path);
861 m_status.SetAlert(
MakeAlert(alert_id, msg));
862 LOG4CPLUS_ERROR(m_logger, msg);
863 throw std::runtime_error(msg);
865 m_status.ClearAlert(alert_id);
868 m_workspace->MakeResultSymlink(m_result);
870 m_status.SetResult(m_result.native());
874 SetState(Completed{});
875 LOG4CPLUS_INFO(m_logger, fmt::format(
"{}: Completed successfully!", *
this));
882 "{}: PollMergeComplete: Failed to create data product: {}", *
this, m_result));
887 LOG4CPLUS_TRACE(m_logger,
"Poll(Completed)");
892 void DaqControllerImpl::SetState(StateVariant s,
bool error) {
893 m_state_ctx = std::move(s);
894 auto new_state = MakeState(m_state_ctx);
904 void DaqControllerImpl::SetError(
bool error) {
915 State DaqControllerImpl::MakeState(StateVariant
const& s) {
916 if (std::holds_alternative<Scheduled>(s)) {
919 if (std::holds_alternative<Transferring>(s)) {
922 if (std::holds_alternative<Merging>(s)) {
925 if (std::holds_alternative<Completed>(s)) {
932 DaqControllerImpl::Transferring::Transferring(SourceResolver resolver_arg)
933 : resolver(std::move(resolver_arg)) {
936 void DaqControllerImpl::Merging::Reset() {
941 void DaqControllerImpl::HandleMergeMessage(std::string
const& line) noexcept
try {
946 auto json = nlohmann::json::parse(line);
947 auto const& type = json.at(
"type").get<std::string>();
948 auto const& content = json.at(
"content");
949 if (type ==
"alert") {
955 content.at(
"id").get_to(
id);
956 content.at(
"message").get_to(message);
958 m_status.SetAlert(
MakeAlert(alert_id, message));
961 LOG4CPLUS_DEBUG(m_logger,
"Failed to parse JSON message from merger");
Stores data acquisition status and allows subscription to status changes.
State GetState() const noexcept
void SetError(bool error) noexcept
Set error flag for data acquisition.
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
void SetState(State s, std::optional< bool > error=std::nullopt) noexcept
Set state of data acquisition.
bool GetError() const noexcept
boost::signals2::connection ConnectStatus(Signal::slot_type const &slot)
Connect observer that is invoked when state is modified.
std::string const & GetId() const noexcept
auto IsStopped() const noexcept -> bool override
auto GetState() const noexcept -> State override
auto GetId() const noexcept -> std::string const &override
void Start() override
Start/stop operations.
auto GetErrorFlag() const noexcept -> bool override
auto GetStatus() noexcept -> ObservableStatus &override
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string > const &)> ProcFactory
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string const &, std::string const &, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Controller for specific DAQ.
~ResourceToken() noexcept
ResourceToken(Resource *) noexcept
boost::signals2::connection Connect(Signal::slot_type const &slot)
Connect to signal that is emitted when a resource become available.
std::optional< ResourceToken > Acquire() noexcept
unsigned GetUsed() const noexcept
unsigned GetLimit() const noexcept
std::function< std::unique_ptr< DaqController >(std::unique_ptr< DaqWorkspace >, Resources &)> DaqControllerFactory
std::string QueueDaq(std::string const &dp_spec) override
Queues DAQ for processing.
boost::signals2::connection ConnectStatus(StatusSignal::slot_type const &slot) override
Signals.
SchedulerImpl(rad::IoExecutor &executor, Workspace &workspace, DaqControllerFactory daq_controller_factory, SchedulerOptions const &options)
Constructs a scheduler loading information from workspace ws.
void AbortDaq(std::string const &) override
Abort merging DAQ identified by id.
std::vector< std::string > GetQueue() const noexcept override
Queries current DAQ queue.
void Start() override
Start/stop operations.
Status GetDaqStatus(std::string const &id) const override
Queries current DAQ status, possibly from last recorded status in workspace.
bool IsQueued(std::string const &id) const noexcept override
Queries if DAQ with ID has been queued before in the current workspace.
Provides location of fits source file.
Interface to interact with DPM workspace.
virtual void RemoveDaq(std::string const &daq_id)=0
Removes workspace and all containing files for DAQ without archiving it.
virtual auto ArchiveDaq(std::string const &daq_id) -> std::filesystem::path=0
Archives specified DAQ witout deleting any files, typically by moving it to a specific location in th...
virtual auto LoadDaq(std::string const &daq_id) -> std::unique_ptr< DaqWorkspace >=0
Loads a previously initialized DAQ workspace.
virtual auto InitializeDaq(std::string const &daq_id) -> std::unique_ptr< DaqWorkspace >=0
Initializes new DAQ Workspace.
virtual auto GetPath() const -> std::filesystem::path=0
virtual void StoreQueue(std::vector< std::string > const &queue) const =0
virtual auto LoadQueue() const -> std::vector< std::string >=0
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
daq::dpm::Workspace interface and implementation declaration
Declaration of log4cplus helpers.
constexpr std::string_view TRANSFERRING_RSYNC
Failure during rsync source copy.
constexpr std::string_view MERGING_MERGE
Merging failed.
std::ostream & operator<<(std::ostream &os, DaqController const &daq)
const std::string LOGGER_NAME_CONTROLLER
const std::string LOGGER_NAME_SCHEDULER
const std::string LOGGER_NAME_MERGER
const std::string LOGGER_NAME_TRANSFER
Options for DaqController.
Options controlling scheduler operations.
Location ParseSourceLocation(std::string const &location_str)
Parse location string from DpSpec into component parts.
DpSpec ParseDpSpec(Json const &json)
Parse JSON to construct the DpSpec structure.
std::optional< FitsFileSource > source
std::vector< SourceTypes > sources
std::string file_prefix
Optioal user chosen file prefix to make it easier to identify the produced file.
Close representation of the JSON structure but with stronger types.
AlertId MakeAlertId(std::string_view category, std::string key)
State
Observable states of the data acquisition process.
@ Completed
Completed DAQ.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Releasing
Releasing Data Product to receivers.
@ Aborted
Data acquisition has been aborted by user.
@ Merging
DAQ is being merged.
@ AbortingMerging
Transitional state for aborting during merging.
@ Transferring
Input files are being transferred.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
daq::dpm::Scheduler and related class declarations.
Non observable status object that keeps stores status of data acquisition.
std::string result
Path to resulting data product.