ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
main.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_dpm_server
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daqDpmServer entrypoint.
9  */
10 #include <daq/config.hpp>
11 #include <iostream>
12 
13 #include <boost/asio.hpp>
14 #include <boost/program_options.hpp>
15 #include <ciiLogManager.hpp>
16 #include <log4cplus/configurator.h>
17 #include <log4cplus/logger.h>
18 #include <log4cplus/loggingmacros.h>
19 #include <rad/ioExecutor.hpp>
20 #include <rad/logger.hpp>
21 #include <rad/mal/publisher.hpp>
22 #include <rad/mal/replier.hpp>
23 #include <rad/oldbAsyncWriter.hpp>
24 
25 #include <daq/conversion.hpp>
26 #include <daq/dpm/dpmService.hpp>
27 #include <daq/dpm/scheduler.hpp>
28 #include <daq/error/report.hpp>
29 #include <daq/status.hpp>
30 
31 #include <daqif/uri.hpp>
32 
33 #include "configManager.hpp"
34 
35 namespace error {
36 /**
37  * Error codes
38  */
39 enum {
40  /** Invalid program arguments */
42  Unknown = 255,
43 };
44 } // namespace error
45 
46 namespace po = boost::program_options;
47 namespace daq::dpm {
48 
49 int Entrypoint(log4cplus::Logger const& logger, ConfigManager& cfg_mgr) {
50  boost::asio::io_context io_ctx;
51  rad::IoExecutor exec(io_ctx);
52  Configuration const& config = cfg_mgr.GetConfig();
53 
54  auto ws_path = config.workspace;
55  if (ws_path.is_relative()) {
56  ws_path = config.dataroot / ws_path;
57  }
58 
59  // Construction also initializes/verifies workspace and may throw.
60  WorkspaceImpl workspace(ws_path);
61 
62  // Scheduler
63  SchedulerOptions scheduler_options;
64  scheduler_options.concurrency_limits.daq = config.limit_daq;
65  scheduler_options.concurrency_limits.merge = config.limit_merge;
66  scheduler_options.concurrency_limits.net_send = config.limit_net_send;
67  scheduler_options.concurrency_limits.net_receive = config.limit_net_receive;
68 
69  DaqControllerOptions daq_controller_options;
70  daq_controller_options.merge_bin = config.merge_bin;
71  daq_controller_options.rsync_bin = config.rsync_bin;
72 
73  DaqControllerImpl::RsyncFactory rsync_factory =
74  [](boost::asio::io_context& io_ctx,
75  std::string source, // source
76  std::string dest, // dest
77  RsyncOptions const& opts,
78  RsyncAsyncProcess::DryRun dry_run) -> std::unique_ptr<RsyncAsyncProcessIf> {
79  return std::make_unique<RsyncAsyncProcess>(
80  io_ctx, std::move(source), std::move(dest), opts, dry_run);
81  };
82 
83  DaqControllerImpl::ProcFactory proc_factory = [](boost::asio::io_context& io_ctx,
84  std::vector<std::string> args) {
85  return std::make_unique<AsyncProcess>(io_ctx, std::move(args));
86  };
87 
88  SchedulerImpl scheduler(
89  exec,
90  workspace,
91  [&](std::unique_ptr<DaqWorkspace> workspace,
92  Resources& resources) -> std::unique_ptr<DaqController> {
93  return std::make_unique<DaqControllerImpl>(exec,
94  std::move(workspace),
95  resources,
96  rsync_factory,
97  proc_factory,
98  daq_controller_options);
99  },
100  scheduler_options);
101 
102  // MAL variables that have an empty value if --no-ipc is used.
103  std::optional<rad::cii::Replier> replier;
104  std::shared_ptr<elt::mal::Mal> mal;
105  std::unique_ptr<rad::cii::Publisher<daqif::DaqStatus>> daq_status_publisher;
106  std::optional<rad::OldbAsyncWriter> oldb_writer;
107  if (!config.no_ipc) {
108  mal = elt::mal::loadMal("zpb", {});
109  auto dpm_service = std::make_shared<DpmService>(exec, *mal, workspace, scheduler);
110  /*
111  * Load CII/MAL middleware here because it resets
112  * the log4cplus configuration!
113  */
114  elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
115  factory.registerMal("zpb", mal);
116 
117  /*
118  * Register CII/MAL replier
119  */
120  replier.emplace(elt::mal::Uri(config.rr_uri));
121  replier->RegisterService<daqif::AsyncDpmControl>("dpm", dpm_service);
122  replier->RegisterService<daqif::AsyncDpmDaqControl>("daq", dpm_service);
123  daq_status_publisher = std::make_unique<rad::cii::Publisher<daqif::DaqStatus>>(
124  daqif::MakeServiceUri(config.ps_uri, "daq/status"));
125  {
126  auto sample = daq_status_publisher->CreateTopic();
127  scheduler.ConnectStatus([&, sample = std::move(sample)](Status const& status) {
128  try {
129  *sample << status;
130  daq_status_publisher->Publish(*sample);
131  } catch (std::exception const& e) {
132  LOG4CPLUS_ERROR(logger, fmt::format("Failed to publish status: {}", e.what()));
133  }
134  });
135  }
136  // Write configuration
137  {
138  oldb_writer.emplace(config.db_timeout, std::chrono::milliseconds(250));
139  auto prefix = config.db_prefix;
140  if (prefix.empty()) {
141  throw std::invalid_argument("OLDB prefix is invalid (empty)");
142  }
143  if (prefix.back() != '/') {
144  prefix.push_back('/');
145  }
146 
147  cfg_mgr.Visit([&](daq::config::Manager::CiiValue const& param) {
148  oldb_writer->Set(prefix + param.metadata.canonical_name, param.value);
149  });
150  oldb_writer->StartWriter();
151  }
152  } else {
153  LOG4CPLUS_INFO(logger,
154  "Note: Running in standalone mode without interprocess communication."
155  " Stop process using CTRL-C/SIGINT");
156  }
157 
158  // Run until requested to exit
159  LOG4CPLUS_INFO(logger, fmt::format("Application daqDpmServer started as '{}'", config.name));
160  scheduler.Start();
161  if (config.poll_once) {
162  // First poll may initiate async activities, after which we Stop scheduler and let it run to
163  // completion.
164  LOG4CPLUS_INFO(logger,
165  "Note: Polling once due to (--poll-once) to start operations, wait for "
166  "those to complete and then then exit");
167  io_ctx.poll();
168  scheduler.Stop();
169  io_ctx.restart();
170  } else {
171  io_ctx.run();
172  }
173  return 0;
174 }
175 } // namespace daq::dpm
176 
177 int main(int argc, char* argv[]) {
178  try {
179  auto log_initializer = rad::LogInitializer();
180  log4cplus::Logger logger = log4cplus::Logger::getInstance(daq::dpm::LOGGER_NAME);
181  try {
183  // @todo Uses default configuration at the moment. Load from command line and
184  // configuration file and environment variables as well.
185  daq::dpm::ConfigManager cfg_mgr(logger);
186  if (!cfg_mgr.ParseArguments(argc, argv)) {
187  // --help was invoked
188  return 0;
189  }
190 
191  if (!cfg_mgr.GetConfig().config_file.empty()) {
192  cfg_mgr.LoadConfig();
193  }
194  auto level = log4cplus::getLogLevelManager().toString(cfg_mgr.GetConfig().log_level);
195  elt::log::CiiLogManager::Configure(
196  rad::GetDefaultLogProperties(cfg_mgr.GetConfig().name + ".log", level));
197 
198  if (auto log_cfg = cfg_mgr.GetConfig().log_config; !log_cfg.empty()) {
199  auto resolved = rad::Helper::FindFile(log_cfg);
200  if (resolved.empty()) {
201  LOG4CPLUS_ERROR(logger,
202  "Configured log property file not found: '"
203  << log_cfg
204  << "', $CFGPATH=" << rad::Helper::GetEnvVar("CFGPATH"));
205  return EXIT_FAILURE;
206  }
207  } else {
208  log4cplus::Logger::getRoot().setLogLevel(cfg_mgr.GetConfig().log_level);
209  }
210 
211  int rc = daq::dpm::Entrypoint(logger, cfg_mgr);
212  LOG4CPLUS_INFO(logger, "Application terminating with code " << rc);
213  return rc;
214  } catch (po::error const& e) {
217  } catch (std::exception const& ex) {
218  LOG4CPLUS_ERROR(logger, daq::error::NestedExceptionReporter(ex));
219  return error::Unknown;
220  }
221  } catch (...) {
222  std::cerr << "Unknown error during log initialization\n";
223  return error::Unknown;
224  }
225 }
rad::cii::OldbType value
Definition: manager.hpp:197
Metadata const & metadata
Definition: manager.hpp:199
CII representation of real value.
Definition: manager.hpp:196
DPM Server specific configuration manager.
Configuration const & GetConfig() const
void LoadConfig()
Load configuration file and update configuration.
bool ParseArguments(int argc, char *argv[])
Parse configuration from command line arguments.
void Visit(Func &&func)
Visit all configuration parameters.
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string > const &)> ProcFactory
Definition: scheduler.hpp:490
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string const &, std::string const &, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Definition: scheduler.hpp:487
Implementation of daq::dpm::Workspace.
Definition: workspace.hpp:175
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
DPM server config.
Contains support functions for daqif.
daq::dpm::DpmService implements MAL services daqif::DpmControl and daqif::DpmDaqControl.
int main(int argc, char **argv)
Definition: main.cpp:68
std::string canonical_name
Definition: manager.hpp:71
unsigned short daq
Limits how many DAQs overall can be scheduled concurrently.
Definition: scheduler.hpp:140
std::string name
Process instance name.
Definition: config.hpp:37
log4cplus::LogLevel log_level
Definition: config.hpp:68
std::string rsync_bin
rsync application name.
Definition: config.hpp:82
unsigned short net_receive
Maximum number of concurrent input transfers = 0.
Definition: scheduler.hpp:150
struct daq::dpm::SchedulerOptions::ConcurrencyLimits concurrency_limits
std::filesystem::path dataroot
Dataroot, normally this should be configured from environment variable $DATAROOT.
Definition: config.hpp:42
std::string merge_bin
Merge application name.
Definition: config.hpp:77
std::filesystem::path workspace
Workspace.
Definition: config.hpp:49
int Entrypoint(log4cplus::Logger const &logger, ConfigManager &cfg_mgr)
Definition: main.cpp:49
unsigned short net_send
Maximum number of concurrent output transfers.
Definition: scheduler.hpp:145
std::chrono::seconds db_timeout
Definition: config.hpp:67
bool no_ipc
If true (set by command line option only) it disables MAL service registration.
Definition: config.hpp:64
std::string rr_uri
Request/reply URI.
Definition: config.hpp:54
std::string ps_uri
Pub/sub URI.
Definition: config.hpp:59
unsigned short merge
Maximum number of concurrent transfers.
Definition: scheduler.hpp:155
const std::string LOGGER_NAME
Definition: config.hpp:22
std::string db_prefix
Definition: config.hpp:66
std::string log_config
Log file, defaults to nothing.
Definition: config.hpp:72
bool poll_once
Run scheduler once.
Definition: config.hpp:94
uint16_t limit_net_send
Definition: config.hpp:98
std::filesystem::path config_file
Configuration file.
Definition: config.hpp:89
uint16_t limit_net_receive
Definition: config.hpp:99
Represents active configuration.
Definition: config.hpp:33
Options for DaqController.
Definition: scheduler.hpp:162
Limited resources.
Definition: scheduler.hpp:231
Options controlling scheduler operations.
Definition: scheduler.hpp:132
void ReportNestedExceptions(std::ostream &os) noexcept
Definition: report.cpp:50
Options controlling rsync invocation.
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
Definition: uri.cpp:19
Definition: main.cpp:23
@ Unknown
Definition: main.cpp:46
@ InvalidProgramArgument
Invalid program arguments.
Definition: main.cpp:29
daq::dpm::Scheduler and related class declarations.
Contains declaration for Status and ObservableStatus.
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:124
Contains URI support functions for daqif.