ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
asyncProcess.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_libdaq
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daq::AsyncProcess class declaration
9  */
11 
12 #include <boost/asio/read_until.hpp>
13 #include <boost/process/args.hpp>
14 
15 namespace daq {
16 
17 namespace bp = boost::process;
18 
19 AsyncProcess::AsyncProcess(boost::asio::io_context& ctx, std::vector<std::string> args)
20  : m_io_ctx(ctx), m_args(std::move(args)), m_stdout{ctx}, m_stderr{ctx} {
21  if (m_args.size() < 1) {
22  throw std::invalid_argument("No arguments provided");
23  }
24  m_args[0] = bp::search_path(m_args[0]).native();
25 }
26 
28 }
29 
30 boost::future<int> AsyncProcess::Initiate() {
31  assert(!m_proc.valid());
32 
33  // on_exit handler has the critical role of triggering the clean shutdown of in-progress
34  // async operations.
35  //
36  // There may be scheduled, but not yet executed, completion handlers for PIPE reads so we cannot
37  // simply cancel or close PIPES without risk loosing data.
38  //
39  // To ensure all output is read we close PIPE write end so that when read is exhausted EOF will
40  // be emitted.
41  //
42  // Observed: on_exit handler not invoked.
43  m_proc = bp::child(
44  bp::args = m_args,
45  bp::std_out > m_stdout.pipe,
46  bp::std_err > m_stderr.pipe,
47  m_io_ctx, // io_context is required for on_exit handler to function.
48  bp::on_exit = [this](int exit, const std::error_code& ec_in) {
49  // After process is terminated PIPEs will eventually reach EOF which triggers
50  // the completion AsyncProcess future. For now we signal the completion of the process
51  // by setting value.
52  m_result = {exit, ec_in};
53  CheckCompleted();
54  });
55  m_pid = m_proc.id();
56  AsyncReadStream(m_stdout);
57  AsyncReadStream(m_stderr);
58  return m_promise.get_future();
59 }
60 
61 std::error_code AsyncProcess::Abort() noexcept {
62  if (!IsRunning()) {
63  return std::make_error_code(std::errc::no_such_process);
64  }
65  // Terminate process hard, this triggers even handlers and normal unwind.
66  std::error_code ec;
67  m_proc.terminate(ec);
68  return ec;
69 }
70 
71 std::optional<pid_t> AsyncProcess::GetPid() const noexcept {
72  if (m_proc.valid()) {
73  return m_pid;
74  }
75  return std::nullopt;
76 }
77 
78 std::error_code AsyncProcess::Signal(int sig) noexcept {
79  if (!IsRunning()) {
80  return std::make_error_code(std::errc::no_such_process);
81  }
82 
83  pid_t id = m_proc.id();
84  int err = kill(id, sig);
85  return std::make_error_code(static_cast<std::errc>(err));
86 }
87 
88 bool AsyncProcess::IsRunning() const noexcept {
89  // warning: It is not safe to invoke child::running() due to bug in boost so we infer the
90  // running state based on whether exit_handler has been invoked or not. This is not 100%
91  // accurate but will work for most use-cases.
92  return m_proc.valid() && !m_result.has_value();
93 }
94 
95 void AsyncProcess::AsyncReadStream(AsyncProcess::Pipe& pipe) {
96  // Process pipe content 'line-buffered'
97  boost::asio::async_read_until(pipe.pipe,
98  pipe.buffer,
99  '\n',
100  [this, &pipe](const boost::system::error_code& ec, std::size_t) {
101  if (ec) {
102  // Expected error is 'EOF', but we don't make a
103  // distinction at this point.
104  //
105  // There might still be data in buffer from previous reads
106  // that do not end in newline, so we read all of it and
107  // send it out to connected slots.
108  std::istream is(&pipe.buffer);
109  std::string remaining(pipe.buffer.size(), '\0');
110  is.read(remaining.data(), remaining.size());
111  pipe.signal(m_pid, remaining);
112 
113  // Close pipe to signal we're done.
114  pipe.pipe.close();
115  CheckCompleted();
116 
117  // We're done, do not schedule another read.
118  return;
119  }
120 
121  // Signal any slots.
122  std::istream is(&pipe.buffer);
123  std::string line;
124  std::getline(is, line);
125  // note: getline consumes '\n' which we don't want since we
126  // don't want to modify source at all.
127  line.push_back('\n');
128  pipe.signal(m_pid, line);
129 
130  AsyncReadStream(pipe);
131  });
132 }
133 
134 void AsyncProcess::CheckCompleted() {
135  // Note that we cannot check if process is alive with m_proc.is_running() as this
136  if (!m_result.has_value() || m_stdout.pipe.is_open() || m_stderr.pipe.is_open()) {
137  // Not all read operations have completed.
138  return;
139  }
140  if (m_result->ec) {
141  m_promise.set_exception(std::system_error(m_result->ec));
142  } else {
143  m_promise.set_value(m_result->exit_code);
144  }
145 }
146 
147 } // namespace daq
daq::AsyncProcess class definition
virtual ~AsyncProcess() noexcept
boost::future< int > Initiate() override
Starts process and asynchronous operations that read stdout and stderr.
AsyncProcess(boost::asio::io_context &ctx, std::vector< std::string > args)
Constructor.