13 #include <boost/asio.hpp>
14 #include <boost/program_options.hpp>
15 #include <ciiLogManager.hpp>
16 #include <log4cplus/configurator.h>
17 #include <log4cplus/logger.h>
18 #include <log4cplus/loggingmacros.h>
20 #include <rad/logger.hpp>
21 #include <rad/mal/publisher.hpp>
22 #include <rad/mal/replier.hpp>
23 #include <rad/oldbAsyncWriter.hpp>
46 namespace po = boost::program_options;
50 boost::asio::io_context io_ctx;
55 if (ws_path.is_relative()) {
74 [](boost::asio::io_context& io_ctx,
79 return std::make_unique<RsyncAsyncProcess>(
80 io_ctx, std::move(source), std::move(dest), opts, dry_run);
84 std::vector<std::string> args) {
85 return std::make_unique<AsyncProcess>(io_ctx, std::move(args));
91 [&](std::unique_ptr<DaqWorkspace> workspace,
92 Resources& resources) -> std::unique_ptr<DaqController> {
93 return std::make_unique<DaqControllerImpl>(exec,
98 daq_controller_options);
103 std::optional<rad::cii::Replier> replier;
104 std::shared_ptr<elt::mal::Mal>
mal;
105 std::unique_ptr<rad::cii::Publisher<daqif::DaqStatus>> daq_status_publisher;
106 std::optional<rad::OldbAsyncWriter> oldb_writer;
108 mal = elt::mal::loadMal(
"zpb", {});
109 auto dpm_service = std::make_shared<DpmService>(exec, *
mal, workspace, scheduler);
114 elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
115 factory.registerMal(
"zpb",
mal);
120 replier.emplace(elt::mal::Uri(config.
rr_uri));
121 replier->RegisterService<daqif::AsyncDpmControl>(
"dpm", dpm_service);
122 replier->RegisterService<daqif::AsyncDpmDaqControl>(
"daq", dpm_service);
123 daq_status_publisher = std::make_unique<rad::cii::Publisher<daqif::DaqStatus>>(
126 auto sample = daq_status_publisher->CreateTopic();
127 scheduler.ConnectStatus([&, sample = std::move(sample)](
Status const& status) {
130 daq_status_publisher->Publish(*sample);
131 }
catch (std::exception
const& e) {
132 LOG4CPLUS_ERROR(logger, fmt::format(
"Failed to publish status: {}", e.what()));
138 oldb_writer.emplace(config.
db_timeout, std::chrono::milliseconds(250));
140 if (prefix.empty()) {
141 throw std::invalid_argument(
"OLDB prefix is invalid (empty)");
143 if (prefix.back() !=
'/') {
144 prefix.push_back(
'/');
150 oldb_writer->StartWriter();
153 LOG4CPLUS_INFO(logger,
154 "Note: Running in standalone mode without interprocess communication."
155 " Stop process using CTRL-C/SIGINT");
159 LOG4CPLUS_INFO(logger, fmt::format(
"Application daqDpmServer started as '{}'", config.
name));
164 LOG4CPLUS_INFO(logger,
165 "Note: Polling once due to (--poll-once) to start operations, wait for "
166 "those to complete and then then exit");
177 int main(
int argc,
char* argv[]) {
179 auto log_initializer = rad::LogInitializer();
195 elt::log::CiiLogManager::Configure(
196 rad::GetDefaultLogProperties(cfg_mgr.
GetConfig().
name +
".log", level));
199 auto resolved = rad::Helper::FindFile(log_cfg);
200 if (resolved.empty()) {
201 LOG4CPLUS_ERROR(logger,
202 "Configured log property file not found: '"
204 <<
"', $CFGPATH=" << rad::Helper::GetEnvVar(
"CFGPATH"));
212 LOG4CPLUS_INFO(logger,
"Application terminating with code " << rc);
214 }
catch (po::error
const& e) {
217 }
catch (std::exception
const& ex) {
222 std::cerr <<
"Unknown error during log initialization\n";
Metadata const & metadata
CII representation of real value.
DPM Server specific configuration manager.
Configuration const & GetConfig() const
void LoadConfig()
Load configuration file and update configuration.
bool ParseArguments(int argc, char *argv[])
Parse configuration from command line arguments.
void Visit(Func &&func)
Visit all configuration parameters.
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string > const &)> ProcFactory
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string const &, std::string const &, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Implementation of daq::dpm::Workspace.
Adapter object intended to be used in contexts without direct access to the output-stream object.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Contains support functions for daqif.
daq::dpm::DpmService implements MAL services daqif::DpmControl and daqif::DpmDaqControl.
int main(int argc, char **argv)
std::string canonical_name
unsigned short daq
Limits how many DAQs overall can be scheduled concurrently.
std::string name
Process instance name.
log4cplus::LogLevel log_level
std::string rsync_bin
rsync application name.
unsigned short net_receive
Maximum number of concurrent input transfers = 0.
struct daq::dpm::SchedulerOptions::ConcurrencyLimits concurrency_limits
std::filesystem::path dataroot
Dataroot, normally this should be configured from environment variable $DATAROOT.
std::string merge_bin
Merge application name.
std::filesystem::path workspace
Workspace.
int Entrypoint(log4cplus::Logger const &logger, ConfigManager &cfg_mgr)
unsigned short net_send
Maximum number of concurrent output transfers.
std::chrono::seconds db_timeout
bool no_ipc
If true (set by command line option only) it disables MAL service registration.
std::string rr_uri
Request/reply URI.
std::string ps_uri
Pub/sub URI.
unsigned short merge
Maximum number of concurrent transfers.
const std::string LOGGER_NAME
std::string log_config
Log file, defaults to nothing.
bool poll_once
Run scheduler once.
std::filesystem::path config_file
Configuration file.
uint16_t limit_net_receive
Represents active configuration.
Options for DaqController.
Options controlling scheduler operations.
void ReportNestedExceptions(std::ostream &os) noexcept
Options controlling rsync invocation.
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
@ InvalidProgramArgument
Invalid program arguments.
daq::dpm::Scheduler and related class declarations.
Contains declaration for Status and ObservableStatus.
Non observable status object that keeps stores status of data acquisition.
Contains URI support functions for daqif.