ifw-daq  1.0.0
IFW Data Acquisition modules
main.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_serverctl
4  * @copyright ESO - European Southern Observatory
5  *
6  * @brief
7  */
8 #include <memory>
9 #include <thread>
10 #include <unordered_map>
11 #include <vector>
12 
13 #include <boost/format.hpp>
14 #include <boost/program_options.hpp>
15 
16 #include <mal/Cii.hpp>
17 #include <mal/Mal.hpp>
18 #include <mal/rr/qos/ConnectionTime.hpp>
19 #include <mal/rr/qos/ReplyTime.hpp>
20 #include <mal/utility/LoadMal.hpp>
21 #include <rad/logger.hpp>
22 
23 #include <Ocmif.hpp>
24 #include <Stdif.hpp>
25 #include <ocmif/conversion.hpp>
26 #include <ocmif/uri.hpp>
27 
28 #include "requestor.hpp"
29 
30 volatile std::sig_atomic_t g_signal_status;
31 void SignalHandler(int signal) {
32  g_signal_status = signal;
33 }
34 
35 template <class TopicType>
36 struct Subscription {
37  std::unique_ptr<elt::mal::ps::Subscriber<TopicType>> subscriber;
38  std::unique_ptr<elt::mal::ps::Subscription> subscription;
39 };
40 
41 template <class TopicType, class Func>
42 auto MakeSubcription(elt::mal::Uri const& uri, Func&& func) -> Subscription<TopicType> {
43  auto subscriber = elt::mal::CiiFactory::getInstance().getSubscriber<TopicType>(
44  elt::mal::Uri(uri), elt::mal::ps::qos::QoS::DEFAULT, elt::mal::Mal::Properties());
45  auto subscription =
46  subscriber->subscribeAsync(elt::mal::ps::DataEventFilter<TopicType>::all(), func);
47  return {std::move(subscriber), std::move(subscription)};
48 }
49 
50 int main(int argc, char** argv) {
51  namespace po = boost::program_options;
52  using elt::mal::Uri;
53 
54  log4cplus::initialize();
55 
56  // Log to stderr
57  log4cplus::BasicConfigurator(log4cplus::Logger::getDefaultHierarchy(), true).configure();
58  RAD_LOG_TO_CONSOLE(true);
59  RAD_LOG_SETLEVEL("TRACE");
60 
61  std::signal(SIGINT, SignalHandler);
62  std::signal(SIGTERM, SignalHandler);
63 
64  const std::string base_synopsis("ocmServerCtl [--status]");
65 
66  std::vector<std::string> subargs;
67  CommonArgs args;
68  args.json = false;
69 
70  std::unordered_map<std::string, std::unique_ptr<Requestor>> requestors;
71  requestors.emplace(
72  std::make_pair("std.getstatus",
73  std::make_unique<SimpleRequestor<decltype(&stdif::StdCmds::GetStatus)>>(
74  &stdif::StdCmds::GetStatus, args)));
75  requestors.emplace(
76  std::make_pair("std.getstate",
77  std::make_unique<SimpleRequestor<decltype(&stdif::StdCmds::GetState)>>(
78  &stdif::StdCmds::GetState, args)));
79  requestors.emplace(
80  std::make_pair("std.getversion",
81  std::make_unique<SimpleRequestor<decltype(&stdif::StdCmds::GetVersion)>>(
82  &stdif::StdCmds::GetVersion, args)));
83  requestors.emplace(
84  std::make_pair("std.setloglevel", std::make_unique<SetLogLevelRequestor>(args)));
85  requestors.emplace(
86  std::make_pair("std.init",
87  std::make_unique<SimpleRequestor<decltype(&stdif::StdCmds::Init)>>(
88  &stdif::StdCmds::Init, args)));
89  requestors.emplace(
90  std::make_pair("std.enable",
91  std::make_unique<SimpleRequestor<decltype(&stdif::StdCmds::Enable)>>(
92  &stdif::StdCmds::Enable, args)));
93  requestors.emplace(
94  std::make_pair("std.disable",
95  std::make_unique<SimpleRequestor<decltype(&stdif::StdCmds::Disable)>>(
96  &stdif::StdCmds::Disable, args)));
97  requestors.emplace(
98  std::make_pair("std.exit",
99  std::make_unique<SimpleRequestor<decltype(&stdif::StdCmds::Exit)>>(
100  &stdif::StdCmds::Exit, args)));
101 
102  requestors.emplace(
103  std::make_pair("daq.stop",
104  std::make_unique<SimpleDaqRequestor<decltype(&ocmif::OcmDaq::StopDaq)>>(
105  &ocmif::OcmDaq::StopDaq, args)));
106  requestors.emplace(
107  std::make_pair("daq.forcestop",
108  std::make_unique<SimpleDaqRequestor<decltype(&ocmif::OcmDaq::ForceStopDaq)>>(
109  &ocmif::OcmDaq::ForceStopDaq, args)));
110  requestors.emplace(
111  std::make_pair("daq.abort",
112  std::make_unique<SimpleDaqRequestor<decltype(&ocmif::OcmDaq::AbortDaq)>>(
113  &ocmif::OcmDaq::AbortDaq, args)));
114  requestors.emplace(std::make_pair(
115  "daq.forceabort",
116  std::make_unique<SimpleDaqRequestor<decltype(&ocmif::OcmDaq::ForceAbortDaq)>>(
117  &ocmif::OcmDaq::ForceAbortDaq, args)));
118 
119  requestors.emplace(
120  std::make_pair("daq.getstatus",
121  std::make_unique<SimpleDaqRequestor<decltype(&ocmif::OcmDaq::GetStatus)>>(
122  &ocmif::OcmDaq::GetStatus, args)));
123  requestors.emplace(
124  std::make_pair("daq.getactivelist",
125  std::make_unique<NoArgRequestor<decltype(&ocmif::OcmDaq::GetActiveList)>>(
126  &ocmif::OcmDaq::GetActiveList, args)));
127 
128  requestors.emplace(std::make_pair("daq.start", std::make_unique<StartDaqRequestor>(args)));
129  requestors.emplace(
130  std::make_pair("daq.updatekeywords", std::make_unique<UpdateKeywordsRequestor>(args)));
131  requestors.emplace(
132  std::make_pair("daq.awaitstate", std::make_unique<AwaitStateRequestor>(args)));
133 
134  po::options_description generic("Options");
135  // clang-format off
136  generic.add_options()
137  ("help,h",
138  "produce help message (also use this option for each command to get relevant help)")(
139  "version", "print version string")
140  ("rep",
141  po::value<std::string>(&args.req_addr),
142  "request endpoint without service stem. e.g. zpb://127.0.0.1:4020, defaults to env. "
143  "$OCM_REQUEST_EP")
144  ("pep",
145  po::value<std::string>(&args.pub_addr),
146  "publish endpoint, defaults to env $OCM_PUBLISH_EP")
147  ("json", "output in JSON format")
148  ("status", "subscribe to server topics (requires --pep or $OCM_PUBLISH_EP to be set)")
149  ("timeout,t", po::value<unsigned>(&args.timeout)->default_value(10), "timeout in seconds")
150  ;
151  // clang-format on
152 
153  po::positional_options_description commands;
154  commands.add("command", 1).add("subargs", -1);
155 
156  po::options_description hidden("");
157  hidden.add_options()("command", "")(
158  "subargs", po::value<std::vector<std::string>>(&subargs), "");
159 
160  po::options_description all("");
161  all.add(generic).add(hidden);
162 
163  po::options_description visible(
164  "Synopsis:\n"
165  "ocmServerCtl --help\n"
166  " show main help\n\n"
167  "ocmServerCtl [args] <command> [<command args>]\n"
168  " send command with arguments\n\n"
169  "ocmServerCtl --status [<command> [<command args>]]\n"
170  " subscribe to topics during execution of command\n"
171  " if command is omitted topics are subscribed to indefinitely \n"
172  " (use ctrl-c to cancel)\n\n"
173  "List of supported commands and their arguments");
174  visible.add(generic);
175 
176  po::variables_map vm;
177  try {
178  auto parsed = po::command_line_parser(argc, argv)
179  .options(all)
180  .positional(commands)
181  .allow_unregistered()
182  .run();
183  po::store(parsed, vm);
184  po::notify(vm);
185 
186  if (vm.count("version")) {
187  std::cerr << "ocmServerCtl " << VERSION
188 #ifdef DEBUG
189  << " (debug build)"
190 #endif
191  << std::endl;
192  return 0;
193  }
194 
195  // Only handle help if no command has been specified (since commands will add more args to
196  // be parsed and provide help).
197  if (vm.count("help") && vm.count("command") == 0) {
198  // Add all requestors since no command was specified
199  for (auto const& req : requestors) {
200  // our boost is too old (no support for shared_ptr), let's leak some memory instead.
201  // auto opt = boost::make_shared<po::options_description>(req.first);
202  std::string descr;
203  if (req.second->Synopsis().empty()) {
204  descr = (boost::format("%s %s") % base_synopsis % req.first).str();
205  } else {
206  descr = (boost::format("%s %s %s") % base_synopsis % req.first %
207  req.second->Synopsis())
208  .str();
209  }
210  po::options_description* opt = new po::options_description(descr);
211  req.second->AddOptions(*opt);
212  visible.add(*opt);
213  }
214  std::cerr << visible << std::endl;
215  return 0;
216  }
217  // Provide default values for endpoints
218  if (!vm.count("rep") && vm.count("command")) {
219  auto ep = getenv("OCM_REQUEST_EP");
220  if (!ep) {
221  std::cerr << "Request endpoint (--rep or $OCM_REQUEST_EP) not provided\n";
222  return -1;
223  }
224  args.req_addr = ep;
225  }
226  if (!vm.count("pep") && vm.count("status")) {
227  auto ep = getenv("OCM_PUBLISH_EP");
228  if (!ep) {
229  std::cerr << "Request endpoint (--pep or $OCM_PUBLISH_EP) not provided\n";
230  return -1;
231  }
232  args.pub_addr = ep;
233 
234  return 0;
235  }
236 
237  // @todo: Add support for --status subscription without command
238  if (vm.count("command") != 1 && vm.count("status") == 0) {
239  std::cerr << "No command supplied\n";
240  return 1;
241  }
242  if (vm.count("json")) {
243  args.json = true;
244  }
245 
246  // Load ZPB and register MAL
247  auto mal = elt::mal::loadMal("zpb", {});
248  args.mal = mal.get();
249  elt::mal::CiiFactory::getInstance().registerMal("zpb", mal);
250 
251  std::optional<Subscription<stdif::StatusTopic>> std_status;
252  std::optional<Subscription<ocmif::DaqStatus>> daq_status;
253 
254  if (vm.count("status")) {
255  if (args.pub_addr.back() != '/') {
256  args.pub_addr.push_back('/');
257  }
258  std_status.emplace(MakeSubcription<stdif::StatusTopic>(
259  ocmif::MakeServiceUri(args.pub_addr, "std/status"),
260  [](auto& sub, auto const& event) {
261  auto const& sample = event.getData();
262  std::cerr << "status: " << sample->getStatus() << std::endl;
263  }));
264  daq_status.emplace(MakeSubcription<ocmif::DaqStatus>(
265  Uri(args.pub_addr + "daq/status"), [](auto& sub, auto const& event) {
266  auto const& sample = event.getData();
267  std::cerr << "daq: id=" << sample->getId()
268  << ", state=" << ocmif::ToString(sample->getState())
269  << ", substate=" << ocmif::ToString(sample->getSubState())
270  << ", error=" << (sample->getError() ? "true" : "false")
271  << std::endl;
272  }));
273  }
274 
275  // If command is specified, add the options for the command and parse again
276  std::vector<std::string> cmd_args =
277  po::collect_unrecognized(parsed.options, po::include_positional);
278  if (cmd_args.empty()) {
279  if (vm.count("status")) {
280  std::cerr << "no command provided -> will subscribe indefinitely\n";
281  while (!g_signal_status) {
282  std::this_thread::sleep_for(std::chrono::milliseconds(100));
283  }
284  return 0;
285  }
286  return 1;
287  }
288  // Erase command argument since that's implied from here on.
289  // cmd_args.erase(cmd_args.begin());
290 
291  auto command = vm["command"].as<std::string>();
292 
293  auto std_cmds = elt::mal::CiiFactory::getInstance().getClient<stdif::StdCmdsSync>(
294  ocmif::MakeServiceUri(args.req_addr, "std"),
295  {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(args.timeout))},
296  {});
297  auto daq_cmds = elt::mal::CiiFactory::getInstance().getClient<::ocmif::OcmDaqSync>(
298  ocmif::MakeServiceUri(args.req_addr, "daq"),
299  {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(args.timeout))},
300  {});
301 
302  // Look up the requestor in the known commands map
303  auto req_it = requestors.find(command);
304  if (req_it != requestors.end()) {
305  auto descr = (boost::format("%s %s %s\n"
306  "Supported arguments <args>") %
307  base_synopsis % req_it->first % req_it->second->Synopsis())
308  .str();
309  po::options_description opt(descr);
310  opt.add_options()("command", "");
311  req_it->second->AddOptions(opt);
312  opt.add(visible);
313  if (vm.count("help")) {
314  std::cerr << opt << std::endl;
315  return 0;
316  }
317  // Parse remaining options using description from requestor
318  auto parsed_subargs = po::command_line_parser(cmd_args).options(opt).run();
319  po::store(parsed_subargs, vm);
320  po::notify(vm);
321 
322  assert(req_it->second);
323  std::vector<std::string> remaining_args =
324  po::collect_unrecognized(parsed_subargs.options, po::include_positional);
325 
326  req_it->second->Handle(*std_cmds, *daq_cmds, vm, remaining_args);
327  std::cerr << "Done\n";
328  return 0;
329  } else {
330  std::cerr << "error: Unknown command: '" << command << "'\n";
331  return 1;
332  }
333  } catch (ocmif::DaqException const& e) {
334  std::cerr << "DaqException: id: '" << e.getId() << "', message: " << e.getMessage()
335  << std::endl;
336  return 1;
337  } catch (po::error const& e) {
338  std::cerr << "error: " << e.what() << std::endl;
339  return 1;
340  } catch (std::exception const& e) {
341  std::cerr << "error: " << e.what() << std::endl;
342  return 1;
343  } catch (...) {
344  std::cerr << "unknown error" << std::endl;
345  return 1;
346  }
347 }
CommonArgs
Definition: requestor.hpp:40
ocmif::MakeServiceUri
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
Definition: uri.cpp:19
CommonArgs::req_addr
std::string req_addr
Definition: requestor.hpp:43
SimpleRequestor
Simple requestor for commands without argument.
Definition: requestor.hpp:90
SignalHandler
void SignalHandler(int signal)
Definition: main.cpp:31
CommonArgs::json
bool json
Definition: requestor.hpp:41
Subscription::subscription
std::unique_ptr< elt::mal::ps::Subscription > subscription
Definition: main.cpp:38
NoArgRequestor
Definition: requestor.hpp:151
conversion.hpp
Contains support functions for ocmif.
SimpleDaqRequestor
Simple Daq commands that accepts a single argument id and returns a shared_ptr type that can be forma...
Definition: requestor.hpp:123
CommonArgs::mal
elt::mal::Mal * mal
Definition: requestor.hpp:45
CommonArgs::timeout
unsigned timeout
Definition: requestor.hpp:42
requestor.hpp
Subscription
Definition: main.cpp:36
MakeSubcription
auto MakeSubcription(elt::mal::Uri const &uri, Func &&func) -> Subscription< TopicType >
Definition: main.cpp:42
CommonArgs::pub_addr
std::string pub_addr
Definition: requestor.hpp:44
g_signal_status
volatile std::sig_atomic_t g_signal_status
Definition: main.cpp:30
Subscription::subscriber
std::unique_ptr< elt::mal::ps::Subscriber< TopicType > > subscriber
Definition: main.cpp:37
uri.hpp
Contains URI support functions for ocmif.
main
int main(int argc, char *argv[])
Application main.
Definition: main.cpp:138