ifw-daq  1.0.0
IFW Data Acquisition modules
main.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup server
4  * @copyright ESO - European Southern Observatory
5  * @author
6  *
7  * @brief main source file.
8  */
9 
10 #include "actionMgr.hpp"
11 #include "actionsStd.hpp"
12 #include "dataContext.hpp"
13 #include "dbInterface.hpp"
14 #include "logger.hpp"
15 #include "ocmDaqService.hpp"
16 #include "stdCmdsImpl.hpp"
17 
18 #include <Stdif.hpp>
19 #include <events.rad.hpp>
20 
21 #include <rad/actionCallback.hpp>
22 #include <rad/dbAdapterRedis.hpp>
23 #include <rad/exceptions.hpp>
24 #include <rad/mal/replier.hpp>
25 #include <rad/mal/utils.hpp>
26 #include <rad/mal/publisher.hpp>
27 #include <rad/smAdapter.hpp>
28 
29 #include <scxml4cpp/Context.h>
30 #include <scxml4cpp/EventQueue.h>
31 
32 #include <boost/asio.hpp>
33 #include <boost/exception/diagnostic_information.hpp>
34 #include <boost/filesystem.hpp>
35 
36 #include <fmt/format.h>
37 
38 #include <functional>
39 #include <memory>
40 
41 #include <daq/manager.hpp>
42 #include <daq/daqController.hpp>
43 #include <ocmif/conversion.hpp>
44 #include <ocmif/uri.hpp>
45 
46 
47 class DaqService {
48 public:
49  DaqService(std::string name,
50  std::string instrument,
51  std::string const& output_path,
52  rad::IoExecutor& executor,
53  std::shared_ptr<mal::Mal> mal,
54  rad::cii::Replier& replier,
55  rad::cii::Publisher<ocmif::DaqStatus>& publisher,
56  rad::SMAdapter& state_machine)
57  : m_name(std::move(name))
58  , m_manager(executor, std::move(instrument))
59  , m_mal(std::move(mal))
60  , m_service(std::make_shared<OcmDaqService>(
61  executor.get_io_context(), *m_mal, m_manager, output_path))
62  , m_replier(replier)
63  , m_publisher(publisher)
64  , m_state_machine(state_machine)
65  , m_is_active(false) {
66  m_sample = m_publisher.CreateTopic();
67  m_connection = m_manager.GetStatusSignal().ConnectObserver(
68  [this](auto const& status) { this->DaqStatusUpdate(status); });
69  }
70  void Register() {
71  m_replier.RegisterService(m_name, std::static_pointer_cast<ocmif::AsyncOcmDaq>(m_service));
72  }
73  void Unregister() {
74  m_replier.UnregisterService(m_name);
75  }
76 
77  /**
78  * Is notified of any DAQ status change and will post events to SM on DAQ activity flank
79  * changes.
80  *
81  * @note Current implementation assumes all DAQs begin in state NotStarted. When ocm supports
82  * loading old status from persistent storage this will no longer work as number of active daqs
83  * is not starting at zero and state is not starting at NotStarted.
84  */
85  void DaqStatusUpdate(daq::ObservableStatus const& status) {
86  try {
87  LOG4CPLUS_DEBUG(server::GetLogger(),
88  fmt::format("Publishing DAQ status for DAQ {}", status.GetId()));
89  auto state = status.GetState();
90  if (state == daq::State::NotStarted && !m_is_active) {
91  // New data acquisition
92  m_is_active = true;
93  m_state_machine.ProcessEvent(Events::AnyDaqActive{});
94  } else if (m_is_active && daq::IsFinalState(status.GetState())) {
95  // At least one DAQ is completed. Check if all DAQs are completed.
96  auto daqs = m_manager.GetDaqControllers();
97  auto all_final =
98  std::all_of(daqs.begin(), daqs.end(), [](auto const shrd_daq) -> bool {
99  assert(shrd_daq);
100  return daq::IsFinalState(shrd_daq->GetState());
101  });
102  if (all_final) {
103  m_is_active = false;
104  // Active -> Idle
105  m_state_machine.ProcessEvent(Events::AllDaqInactive{});
106  }
107  }
108  // publish state
109  *m_sample << status.GetStatus();
110  m_publisher.Publish(*m_sample);
111  } catch (std::exception const& e) {
112  LOG4CPLUS_ERROR(server::GetLogger(),
113  fmt::format("Failed to publish status: {}", e.what()));
114  }
115  }
116 
117 private:
118  std::string m_name;
119  daq::ManagerImpl m_manager;
120  std::shared_ptr<mal::Mal> m_mal;
121  std::shared_ptr<OcmDaqService> m_service;
122  rad::cii::Replier& m_replier;
123  rad::cii::Publisher<ocmif::DaqStatus>& m_publisher;
124  rad::SMAdapter& m_state_machine;
125  bool m_is_active;
126 
127  boost::signals2::connection m_connection;
128  std::shared_ptr<ocmif::DaqStatus> m_sample;
129 };
130 
131 
132 /**
133  * Application main.
134  *
135  * @param[in] argc Number of command line options.
136  * @param[in] argv Command line options.
137  */
138 int main(int argc, char* argv[]) {
139  namespace fs = boost::filesystem;
140 
141  auto log_initializer = rad::LogInitializer();
142  LOG4CPLUS_INFO(server::GetLogger(), "Application ocm-server started.");
143 
144  try {
145  /*
146  * Load CII/MAL middleware here because it resets
147  * the log4cplus configuration!
148  */
149  elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
150  auto mal = elt::mal::loadMal("zpb", {});
151  factory.registerMal("zpb", mal);
152 
153  /* Read only configuration */
154  server::Config config;
155  if (config.ParseOptions(argc, argv) == false) {
156  // request for help
157  return EXIT_SUCCESS;
158  }
159  config.LoadConfig();
160  log_initializer.Configure(rad::Helper::FindFile(config.GetLogProperties()));
161 
162  if (config.m_out_path.empty()) {
163  auto const msg = "Output path not configured. Define environment "
164  "variable $DATAROOT or configuration cfg.dataroot";
165  LOG4CPLUS_ERROR(server::GetLogger(), msg);
166  std::cerr << msg << std::endl;
167  return -1;
168  }
169  // Create out path if necessary with permissive permissions
170  if (!fs::is_directory(config.m_out_path)) {
171  LOG4CPLUS_INFO(server::GetLogger(),
172  fmt::format("Output dir '{}' is not a directory. Creating it now",
173  config.m_out_path));
174  fs::create_directories(config.m_out_path);
175  fs::permissions(config.m_out_path, fs::others_read | fs::owner_all | fs::group_all);
176  }
177 
178 
179  /*
180  * LAN 2020-07-09 EICSSW-717
181  * Create CII/MAL replier as soon as possible to avoid problems when
182  * an exceptions is thrown from and Action/Guard.
183  */
184  rad::cii::Replier mal_replier(ocmif::MakeServerUri(config.GetMsgReplierEndpoint()));
185  auto std_status_publisher =
186  std::make_unique<rad::cii::Publisher<stdif::StatusTopic>>(
187  ocmif::MakeServiceUri(config.GetPubEndpoint(), "std/status"));
188  auto daq_status_publisher =
189  std::make_unique<rad::cii::Publisher<ocmif::DaqStatus>>(
190  ocmif::MakeServiceUri(config.GetPubEndpoint(), "daq/status"));
191 
192  /* Runtime DB */
193  rad::DbAdapterRedis redis_db;
194 
195  /* Runtime data context */
196  server::DataContext data_ctx(config, redis_db);
197 
198  /*
199  * Create event loop
200  */
201 
202  // event loop
203  boost::asio::io_context io_ctx;
204  rad::IoExecutor executor(io_ctx);
205 
206  /*
207  * State Machine related objects
208  */
209  // SM event queue and context
210  scxml4cpp::EventQueue external_events;
211  scxml4cpp::Context state_machine_ctx;
212 
213  // State Machine facade
214  rad::SMAdapter state_machine(io_ctx, &state_machine_ctx, external_events);
215 
216  DaqService daq_service("daq",
217  config.m_instrument_id,
218  config.m_out_path,
219  executor,
220  mal,
221  mal_replier,
222  *daq_status_publisher,
223  state_machine);
224 
225 
226  // actions and activities
227  server::ActionMgr action_mgr;
228  action_mgr.CreateActions(io_ctx, state_machine, data_ctx);
229  action_mgr.CreateActivities(state_machine, data_ctx);
230  action_mgr.AddAction(new rad::ActionCallback("OcmDaq.RegisterService",
231  [&](auto) { daq_service.Register(); }));
232  action_mgr.AddAction(new rad::ActionCallback("OcmDaq.UnregisterService",
233  [&](auto) { daq_service.Unregister(); }));
234 
235  // Load SM model
236  state_machine.Load(
237  config.GetSmScxmlFilename(), &action_mgr.GetActions(), &action_mgr.GetActivities());
238 
239  // Register handlers to reject events
240  state_machine.RegisterDefaultRequestRejectHandler<Events::Init>();
241  state_machine.RegisterDefaultRequestRejectHandler<Events::Enable>();
242  state_machine.RegisterDefaultRequestRejectHandler<Events::Disable>();
243 
244  // Register publisher to export state information
245  state_machine.SetStatusPublisher([&](std::string const& status) {
246  auto ics_status = server::MakeStatusString(state_machine.GetActiveStates())
247  .value_or("NotOperational;Undefined");
248 
249  auto sample = std_status_publisher->CreateTopic();
250  sample->setStatus(ics_status);
251  std_status_publisher->Publish(*sample);
252  data_ctx.GetDbInterface().SetControlState(status);
253  });
254  /*
255  * Register CII/MAL replier
256  */
257  mal_replier.RegisterService<stdif::AsyncStdCmds>(
258  "std", std::make_shared<server::StdCmdsImpl>(state_machine));
259 
260  /*
261  * Start event loop
262  */
263  state_machine.Start();
264  io_ctx.run();
265  state_machine.Stop();
266  } catch (rad::Exception& e) {
267  LOG4CPLUS_ERROR(server::GetLogger(), e.what());
268  return EXIT_FAILURE;
269  } catch (...) {
270  LOG4CPLUS_ERROR(server::GetLogger(), boost::current_exception_diagnostic_information());
271  return EXIT_FAILURE;
272  }
273 
274  // to avoid valgrind warnings on potential memory loss
275  //google::protobuf::ShutdownProtobufLibrary();
276 
277  LOG4CPLUS_INFO(server::GetLogger(), "Application ocm-server terminated.");
278  return EXIT_SUCCESS;
279 }
ocmif::MakeServerUri
network::uri MakeServerUri(std::string uri)
Creates a server URI.
Definition: uri.cpp:13
server::ActionMgr::CreateActions
void CreateActions(boost::asio::io_service &ios, rad::SMAdapter &sm, DataContext &the_data)
Method to instantiate the action objects.
Definition: actionMgr.cpp:31
dataContext.hpp
DataContext class header file.
daq::IsFinalState
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
Definition: state.cpp:14
stdCmdsImpl.hpp
StdCmds Interface implementation header file.
server::Config::m_instrument_id
std::string m_instrument_id
Definition: config.hpp:130
ocmif::MakeServiceUri
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
Definition: uri.cpp:19
server::Config::LoadConfig
void LoadConfig(const std::string &filename="")
This method load from a configuration file the application configuration overriding the initializatio...
Definition: config.cpp:157
daq::ManagerImpl
Implements daq::Manager.
Definition: manager.hpp:211
rad::IoExecutor
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
manager.hpp
Declaration of daq::Manager
daq::ObservableStatus::GetState
State GetState() const noexcept
Definition: status.cpp:67
server::MakeStatusString
std::optional< std::string > MakeStatusString(std::set< scxml4cpp::State * > &&states)
Make a status string from active states (as returned from scxml4cpp::Executor::getStatus()).
Definition: actionsStd.cpp:28
daq::ObservableStatus
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:68
conversion.hpp
Contains support functions for ocmif.
server::Config::m_out_path
std::string m_out_path
Definition: config.hpp:139
DaqService
Definition: main.cpp:47
daq::StatusSignal::ConnectObserver
boost::signals2::connection ConnectObserver(Observer o)
Definition: manager.hpp:51
daq::ObservableStatus::GetStatus
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
Definition: status.cpp:125
DaqService::DaqStatusUpdate
void DaqStatusUpdate(daq::ObservableStatus const &status)
Is notified of any DAQ status change and will post events to SM on DAQ activity flank changes.
Definition: main.cpp:85
server::Config::ParseOptions
bool ParseOptions(int argc, char *argv[])
This method parses the command line parameters overriding the initialization done in the constructor.
Definition: config.cpp:71
OcmDaqService
Implements the MAL interface ocmif::OcmDaq (async version).
Definition: ocmDaqService.hpp:88
server::Config::GetPubEndpoint
const std::string & GetPubEndpoint() const
Definition: config.cpp:260
server::Config::GetLogProperties
const std::string & GetLogProperties() const
Definition: config.cpp:296
DaqService::DaqService
DaqService(std::string name, std::string instrument, std::string const &output_path, rad::IoExecutor &executor, std::shared_ptr< mal::Mal > mal, rad::cii::Replier &replier, rad::cii::Publisher< ocmif::DaqStatus > &publisher, rad::SMAdapter &state_machine)
Definition: main.cpp:49
daq::ObservableStatus::GetId
std::string const & GetId() const noexcept
Definition: status.cpp:63
server::ActionMgr::CreateActivities
void CreateActivities(rad::SMAdapter &sm, DataContext &the_data)
Method to instantiate activity objects.
Definition: actionMgr.cpp:107
daq::ManagerImpl::GetStatusSignal
StatusSignal & GetStatusSignal() override
Definition: manager.cpp:223
server::DbInterface::SetControlState
void SetControlState(const std::string &value)
Definition: dbInterface.cpp:44
server::Config::GetSmScxmlFilename
const std::string & GetSmScxmlFilename() const
Definition: config.cpp:276
daqController.hpp
Contains declaration for for DaqController.
dbInterface.hpp
DbInterface class header file.
ocmDaqService.hpp
Declaration of OcmDaqService.
server::DataContext::GetDbInterface
DbInterface & GetDbInterface()
Definition: dataContext.cpp:55
server::ActionMgr
This class is responsible for the life-cycle management of actions and activities.
Definition: actionMgr.hpp:28
server::DataContext
This class provide access to the application run-time data including the in-memory DB.
Definition: dataContext.hpp:21
server::GetLogger
log4cplus::Logger & GetLogger()
Definition: logger.cpp:14
logger.hpp
Default logger name.
DaqService::Unregister
void Unregister()
Definition: main.cpp:73
daq::ManagerImpl::GetDaqControllers
std::vector< std::shared_ptr< DaqController const > > GetDaqControllers() override
Definition: manager.cpp:227
server::Config::GetMsgReplierEndpoint
const std::string & GetMsgReplierEndpoint() const
Definition: config.cpp:255
actionsStd.hpp
ActionsStd class header file.
uri.hpp
Contains URI support functions for ocmif.
main
int main(int argc, char *argv[])
Application main.
Definition: main.cpp:138
server::Config
This class provide access to the command line options and the configuration parameters stored in the ...
Definition: config.hpp:41
actionMgr.hpp
ActionMgr class header file.
DaqService::Register
void Register()
Definition: main.cpp:70