12 #ifndef RTCTK_METADATACOLLECTOR_ACQUISITOR_HPP
13 #define RTCTK_METADATACOLLECTOR_ACQUISITOR_HPP
15 #include <Metadaqif.hpp>
16 #include <Rtctkif.hpp>
17 #include <rtctk/metadataCollector/acquisitor.rad.hpp>
18 #include <rtctk/componentFramework/events.rad.hpp>
27 template <
typename Super>
30 template <
typename Super>
37 std::string
const& session_id,
38 std::string
const& state_id);
47 std::string getId()
const override;
49 void setId(std::string
const&
id)
override;
51 bool hasKey()
const override;
53 bool keyEquals(metadaqif::DaqReply
const& other)
const override;
55 std::unique_ptr<metadaqif::DaqReply> clone()
const;
57 std::unique_ptr<metadaqif::DaqReply> cloneKey()
const;
68 std::vector<std::string>
const& files,
69 std::string
const& kw);
71 std::string getId()
const override;
73 void setId(std::string
const&
id)
override;
75 std::string getKeywords()
const override;
77 void setKeywords(std::string
const& kw)
override;
79 std::vector<std::string> getFiles()
const override;
81 void setFiles(std::vector<std::string>
const& fs)
override;
83 bool hasKey()
const override;
85 bool keyEquals(metadaqif::DaqStopReply
const& other)
const override;
87 std::unique_ptr<metadaqif::DaqStopReply> clone()
const;
89 std::unique_ptr<metadaqif::DaqStopReply> cloneKey()
const;
93 std::vector<std::string> m_files;
102 std::string getId()
const override;
104 void setId(std::string
const&
id)
override;
106 std::string getKeywords()
const override;
108 void setKeywords(std::string
const& kw)
override;
110 std::string getMessage()
const override;
112 void setMessage(std::string
const& msg)
override;
114 double getTimestamp()
const override;
116 void setTimestamp(
double ts)
override;
118 metadaqif::DaqState getState()
const override;
120 void setState(metadaqif::DaqState st)
override;
122 std::vector<std::string> getFiles()
const override;
124 void setFiles(std::vector<std::string>
const& fs)
override;
126 bool hasKey()
const override;
128 bool keyEquals(metadaqif::DaqStatus
const& other)
const override;
130 std::unique_ptr<metadaqif::DaqStatus> clone()
const;
132 std::unique_ptr<metadaqif::DaqStatus> cloneKey()
const;
138 metadaqif::DaqState m_st;
139 std::vector<std::string> m_files;
152 ::elt::mal::future<std::shared_ptr<metadaqif::DaqReply>> StartDaq(std::string
const&
id)
override;
154 ::elt::mal::future<std::shared_ptr<metadaqif::DaqStopReply>> StopDaq(std::string
const&
id)
override;
156 ::elt::mal::future<std::shared_ptr<metadaqif::DaqReply>> AbortDaq(std::string
const&
id)
override;
158 ::elt::mal::future<std::shared_ptr<metadaqif::DaqStatus>> GetDaqStatus(std::string
const&
id)
override;
169 template <
typename Super>
171 static_assert(std::is_base_of_v<RtcComponent, Super>,
"'Acquisitor' requires 'RtcComponent'");
172 static_assert(not is_base_of_template_v<Runnable, Super>,
"'Acquisitor' excludes 'Runnable'");
173 static_assert(not is_base_of_template_v<Loopaware, Super>,
"'Acquisitor' excludes 'Loopaware'");
188 using Super::InputStage::InputStage;
191 Super::InputStage::Start();
203 events::StartDaq::ID, [](
const rad::AnyEvent& event, std::string
const&
state) {
204 if (
auto req = rad::GetPayloadNothrow<events::StartDaq>(event); req) {
205 auto id = req->GetRequestPayload();
211 events::StopDaq::ID, [](
const rad::AnyEvent& event, std::string
const&
state) {
212 if (
auto req = rad::GetPayloadNothrow<events::StopDaq>(event); req) {
213 auto id = req->GetRequestPayload();
219 events::AbortDaq::ID, [](
const rad::AnyEvent& event, std::string
const&
state) {
220 if (
auto req = rad::GetPayloadNothrow<events::AbortDaq>(event); req) {
221 auto id = req->GetRequestPayload();
228 this->m_no_disable_in_states.push_back(
"On:Operational:Preparing");
229 this->m_no_disable_in_states.push_back(
"On:Operational:Acquiring");
230 this->m_no_disable_in_states.push_back(
"On:Operational:Finalising");
231 this->m_no_disable_in_states.push_back(
"On:Operational:Aborting");
232 this->m_no_disable_in_states.push_back(
"On:Operational:Recovering");
233 this->m_no_disable_in_states.push_back(
"On:Operational:Error");
237 this->m_no_update_in_states.push_back(
"On:Operational:Preparting");
238 this->m_no_update_in_states.push_back(
"On:Operational:Acquiring");
239 this->m_no_update_in_states.push_back(
"On:Operational:Finalising");
240 this->m_no_update_in_states.push_back(
"On:Operational:Aborting");
241 this->m_no_update_in_states.push_back(
"On:Operational:Recovering");
242 this->m_no_update_in_states.push_back(
"On:Operational:Error");
247 [
this](
auto c) { m_acquisition_history.clear(); });
251 engine.
RegisterGuard(
"GuardPreparingAllowed", [
this](
auto c) {
252 auto req = GetPayloadNothrow<events::StartDaq>(c);
253 auto id = req->GetRequestPayload();
255 #ifdef FEATURE_REQUIRE_UNIQUE_ID_FOR_NEW_SESSION
256 return m_acquisition_history.count(
id) == 0;
263 m_tmp_start_request = GetPayloadNothrow<events::StartDaq>(c);
264 m_active_id = m_tmp_start_request->GetRequestPayload();
265 std::string
id = m_active_id.value();
269 status.
setState(metadaqif::Acquiring);
271 m_acquisition_history[id] = status;
275 if (m_tmp_start_request) {
276 std::string
id = m_active_id.value();
277 m_tmp_start_request->SetReplyValue(std::make_shared<DaqReply>(
id));
278 m_tmp_start_request =
nullptr;
283 if (m_tmp_start_request) {
284 std::string
id = m_active_id.value();
285 auto eptr = GetPayloadNothrow<events::Error>(c);
287 m_acquisition_history.at(
id).setState(metadaqif::Failed);
288 m_acquisition_history.at(
id).setTimestamp(GetCurrentTime());
289 m_acquisition_history.at(
id).setMessage(msg);
290 m_tmp_start_request->SetException(metadaqif::DaqException(
id, msg));
291 m_tmp_start_request =
nullptr;
297 engine.
RegisterGuard(
"GuardFinalisingAllowed", [
this](
auto c) {
298 auto req = GetPayloadNothrow<events::StopDaq>(c);
299 auto id = req->GetRequestPayload();
300 return m_active_id.value() == id;
304 m_tmp_stop_request = GetPayloadNothrow<events::StopDaq>(c);
308 if (m_tmp_stop_request) {
309 std::string
id = m_active_id.value();
310 m_acquisition_history.at(
id).setState(metadaqif::Succeeded);
311 m_acquisition_history.at(
id).setTimestamp(GetCurrentTime());
313 m_tmp_stop_request->SetReplyValue(
314 std::make_shared<DaqStopReply>(m_acquisition_history.at(
id).getId(),
315 m_acquisition_history.at(
id).getFiles(),
316 m_acquisition_history.at(
id).getKeywords()));
317 m_tmp_stop_request =
nullptr;
322 if (m_tmp_stop_request) {
323 std::string
id = m_active_id.value();
324 auto eptr = GetPayloadNothrow<events::Error>(c);
326 m_acquisition_history.at(
id).setState(metadaqif::Failed);
327 m_acquisition_history.at(
id).setTimestamp(GetCurrentTime());
328 m_acquisition_history.at(
id).setMessage(msg);
329 m_tmp_stop_request->SetException(metadaqif::DaqException(
id, msg));
330 m_tmp_stop_request =
nullptr;
336 engine.
RegisterGuard(
"GuardAbortingAllowed", [
this](
auto c) {
337 auto req = GetPayloadNothrow<events::AbortDaq>(c);
338 auto id = req->GetRequestPayload();
339 return m_active_id.value() == id;
343 m_tmp_abort_request = GetPayloadNothrow<events::AbortDaq>(c);
347 if (m_tmp_abort_request) {
348 std::string
id = this->m_active_id.value();
349 m_acquisition_history.at(
id).setState(metadaqif::Aborted);
350 m_acquisition_history.at(
id).setTimestamp(GetCurrentTime());
351 m_tmp_abort_request->SetReplyValue(std::make_shared<DaqReply>(
id));
352 m_tmp_abort_request =
nullptr;
359 auto request = GetPayloadNothrow<events::GetDaqStatus>(c);
360 auto id = request->GetRequestPayload();
362 if (m_acquisition_history.count(
id)) {
363 request->SetReplyValue(std::move(m_acquisition_history.at(
id).clone()));
365 auto status = std::make_shared<DaqStatus>();
367 status->setState(metadaqif::NotStarted);
368 status->setTimestamp(0);
369 request->SetReplyValue(status);
376 std::string
id = m_active_id.value();
377 auto eptr = GetPayloadNothrow<events::Error>(c);
379 m_acquisition_history.at(
id).setState(metadaqif::Failed);
380 m_acquisition_history.at(
id).setTimestamp(GetCurrentTime());
381 m_acquisition_history.at(
id).setMessage(msg);
389 std::string
id = m_active_id.value();
390 static_cast<BizLogicIf&
>(this->m_logic).ActivityPreparing(stop_token,
id);
392 this->m_success_handler,
393 this->m_error_handler);
396 "ActivityFinalising",
398 std::string
id = m_active_id.value();
400 static_cast<BizLogicIf&
>(this->m_logic).ActivityFinalising(stop_token);
401 m_acquisition_history.at(
id).setId(result.getId());
402 m_acquisition_history.at(
id).setFiles(result.getFiles());
403 m_acquisition_history.at(
id).setKeywords(result.getKeywords());
405 this->m_success_handler,
406 this->m_error_handler);
411 static_cast<BizLogicIf&
>(this->m_logic).ActivityAcquiring(stop_token);
414 this->m_error_handler);
419 static_cast<BizLogicIf&
>(this->m_logic).ActivityAborting(stop_token);
421 this->m_success_handler,
422 this->m_error_handler);
425 "ActivityRecovering",
427 static_cast<BizLogicIf&
>(this->m_logic).ActivityRecovering(stop_token);
429 this->m_success_handler,
430 this->m_error_handler);
434 double GetCurrentTime() {
435 using namespace std::chrono;
436 auto t = high_resolution_clock::now();
437 return duration_cast<nanoseconds>(t.time_since_epoch()).count();
440 std::shared_ptr<rad::cii::Request<std::shared_ptr<metadaqif::DaqReply>, std::string>>
442 std::shared_ptr<rad::cii::Request<std::shared_ptr<metadaqif::DaqStopReply>, std::string>>
444 std::shared_ptr<rad::cii::Request<std::shared_ptr<metadaqif::DaqReply>, std::string>>
446 std::map<std::string, DaqStatus> m_acquisition_history;
448 std::optional<std::string> m_active_id;
455 this->RegisterLayer({
"Acquisitor", {}});
457 const std::string parent_region =
"On:Operational:RegionAcquisitor";
463 "On:Operational:Initial",
466 "On:Operational:Idle",
469 "On:Operational:Acquiring",
471 "ActivityAcquiring");
473 "On:Operational:Recovering",
475 "ActivityRecovering",
476 "ActionRecoveringEntry");
478 "On:Operational:Preparing",
481 "ActionPreparingEntry");
483 "On:Operational:Finalising",
485 "ActivityFinalising",
486 "ActionFinalisingEntry");
488 "On:Operational:Aborting",
491 "ActionAbortingEntry");
493 this->mm.AddTrans(
"On:Operational:Initial",
494 "On:Operational:Idle",
497 "ActionClearHistory");
498 this->mm.AddTrans(parent_region,
500 "events.GetDaqStatus",
502 "ActionGetDaqStatus");
503 this->mm.AddTrans(
"On:Operational:Idle",
504 "On:Operational:Preparing",
506 "GuardPreparingAllowed");
507 this->mm.AddTrans(
"On:Operational:Preparing",
508 "On:Operational:Acquiring",
511 "ActionPreparingDone");
512 this->mm.AddTrans(
"On:Operational:Preparing",
513 "On:Operational:Idle",
516 "ActionPreparingFailed");
517 this->mm.AddTrans(
"On:Operational:Acquiring",
518 "On:Operational:Finalising",
520 "GuardFinalisingAllowed");
521 this->mm.AddTrans(
"On:Operational:Finalising",
522 "On:Operational:Idle",
525 "ActionFinalisingDone");
526 this->mm.AddTrans(
"On:Operational:Finalising",
527 "On:Operational:Idle",
530 "ActionFinalisingFailed");
531 this->mm.AddTrans(
"On:Operational:Acquiring",
532 "On:Operational:Aborting",
534 "GuardAbortingAllowed");
535 this->mm.AddTrans(
"On:Operational:Aborting",
536 "On:Operational:Idle",
539 "ActionAbortingDone");
540 this->mm.AddTrans(
"On:Operational:Acquiring",
541 "On:Operational:Recovering",
543 this->mm.AddTrans(
"On:Operational:Recovering",
544 "On:Operational:Idle",
552 #endif // RTCTK_COMPONENTFRAMEWORK_ACQUISITOR_HPP