19 #include <events.rad.hpp>
21 #include <rad/actionCallback.hpp>
22 #include <rad/dbAdapterRedis.hpp>
23 #include <rad/exceptions.hpp>
24 #include <rad/mal/replier.hpp>
25 #include <rad/mal/utils.hpp>
26 #include <rad/mal/publisher.hpp>
27 #include <rad/smAdapter.hpp>
29 #include <scxml4cpp/Context.h>
30 #include <scxml4cpp/EventQueue.h>
32 #include <boost/asio.hpp>
33 #include <boost/exception/diagnostic_information.hpp>
34 #include <boost/filesystem.hpp>
36 #include <fmt/format.h>
50 std::string instrument,
51 std::string
const& output_path,
53 std::shared_ptr<mal::Mal> mal,
54 rad::cii::Replier& replier,
55 rad::cii::Publisher<ocmif::DaqStatus>& publisher,
56 rad::SMAdapter& state_machine)
57 : m_name(std::move(name))
58 , m_manager(executor, std::move(instrument))
59 , m_mal(std::move(mal))
61 executor.get_io_context(), *m_mal, m_manager, output_path))
63 , m_publisher(publisher)
64 , m_state_machine(state_machine)
65 , m_is_active(false) {
66 m_sample = m_publisher.CreateTopic();
71 m_replier.RegisterService(m_name, std::static_pointer_cast<ocmif::AsyncOcmDaq>(m_service));
74 m_replier.UnregisterService(m_name);
88 fmt::format(
"Publishing DAQ status for DAQ {}", status.
GetId()));
90 if (state == daq::State::NotStarted && !m_is_active) {
93 m_state_machine.ProcessEvent(Events::AnyDaqActive{});
98 std::all_of(daqs.begin(), daqs.end(), [](
auto const shrd_daq) ->
bool {
100 return daq::IsFinalState(shrd_daq->GetState());
105 m_state_machine.ProcessEvent(Events::AllDaqInactive{});
110 m_publisher.Publish(*m_sample);
111 }
catch (std::exception
const& e) {
113 fmt::format(
"Failed to publish status: {}", e.what()));
120 std::shared_ptr<mal::Mal> m_mal;
121 std::shared_ptr<OcmDaqService> m_service;
122 rad::cii::Replier& m_replier;
123 rad::cii::Publisher<ocmif::DaqStatus>& m_publisher;
124 rad::SMAdapter& m_state_machine;
127 boost::signals2::connection m_connection;
128 std::shared_ptr<ocmif::DaqStatus> m_sample;
138 int main(
int argc,
char* argv[]) {
139 namespace fs = boost::filesystem;
141 auto log_initializer = rad::LogInitializer();
149 elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
150 auto mal = elt::mal::loadMal(
"zpb", {});
151 factory.registerMal(
"zpb", mal);
160 log_initializer.Configure(rad::Helper::FindFile(config.
GetLogProperties()));
163 auto const msg =
"Output path not configured. Define environment "
164 "variable $DATAROOT or configuration cfg.dataroot";
166 std::cerr << msg << std::endl;
172 fmt::format(
"Output dir '{}' is not a directory. Creating it now",
175 fs::permissions(config.
m_out_path, fs::others_read | fs::owner_all | fs::group_all);
185 auto std_status_publisher =
186 std::make_unique<rad::cii::Publisher<stdif::StatusTopic>>(
188 auto daq_status_publisher =
189 std::make_unique<rad::cii::Publisher<ocmif::DaqStatus>>(
193 rad::DbAdapterRedis redis_db;
203 boost::asio::io_context io_ctx;
210 scxml4cpp::EventQueue external_events;
211 scxml4cpp::Context state_machine_ctx;
214 rad::SMAdapter state_machine(io_ctx, &state_machine_ctx, external_events);
222 *daq_status_publisher,
230 action_mgr.AddAction(
new rad::ActionCallback(
"OcmDaq.RegisterService",
231 [&](
auto) { daq_service.
Register(); }));
232 action_mgr.AddAction(
new rad::ActionCallback(
"OcmDaq.UnregisterService",
240 state_machine.RegisterDefaultRequestRejectHandler<Events::Init>();
241 state_machine.RegisterDefaultRequestRejectHandler<Events::Enable>();
242 state_machine.RegisterDefaultRequestRejectHandler<Events::Disable>();
245 state_machine.SetStatusPublisher([&](std::string
const& status) {
247 .value_or(
"NotOperational;Undefined");
249 auto sample = std_status_publisher->CreateTopic();
250 sample->setStatus(ics_status);
251 std_status_publisher->Publish(*sample);
257 mal_replier.RegisterService<stdif::AsyncStdCmds>(
258 "std", std::make_shared<server::StdCmdsImpl>(state_machine));
263 state_machine.Start();
265 state_machine.Stop();
266 }
catch (rad::Exception& e) {
270 LOG4CPLUS_ERROR(
server::GetLogger(), boost::current_exception_diagnostic_information());