ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
asyncProcess.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_libdaq
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daq::AsyncProcess class declaration
9  */
11 
12 #include <boost/asio/read_until.hpp>
13 #include <boost/process/args.hpp>
14 
15 namespace daq {
16 
17 namespace bp = boost::process;
18 
19 AsyncProcess::AsyncProcess(boost::asio::io_context& ctx, std::vector<std::string> args)
20  : m_io_ctx(ctx), m_args(std::move(args)), m_stdout{ctx}, m_stderr{ctx} {
21  if (m_args.size() < 1) {
22  throw std::invalid_argument("No arguments provided");
23  }
24  m_args[0] = bp::search_path(m_args[0]).native();
25 }
26 
28  assert(!IsRunning());
29 }
30 
31 boost::future<int> AsyncProcess::Initiate() {
32  assert(!m_proc.valid());
33 
34  // on_exit handler has the critical role of triggering the clean shutdown of in-progress
35  // async operations.
36  //
37  // There may be scheduled, but not yet executed, completion handlers for PIPE reads so we cannot
38  // simply cancel or close PIPES without risk loosing data.
39  //
40  // To ensure all output is read we close PIPE write end so that when read is exhausted EOF will
41  // be emitted.
42  //
43  // Observed: on_exit handler not invoked.
44  m_proc = bp::child(
45  bp::args = m_args,
46  bp::std_out > m_stdout.pipe,
47  bp::std_err > m_stderr.pipe,
48  m_io_ctx, // io_context is required for on_exit handler to function.
49  bp::on_exit = [this](int exit, const std::error_code& ec_in) {
50  // After process is terminated PIPEs will eventually reach EOF which triggers
51  // the completion AsyncProcess future. For now we signal the completion of the process
52  // by setting value.
53  m_result = {exit, ec_in};
54  CheckCompleted();
55  });
56  m_pid = m_proc.id();
57  AsyncReadStream(m_stdout);
58  AsyncReadStream(m_stderr);
59  return m_promise.get_future();
60 }
61 
62 std::error_code AsyncProcess::Abort() noexcept {
63  if (!IsRunning()) {
64  return std::make_error_code(std::errc::no_such_process);
65  }
66  // Terminate process hard, this triggers even handlers and normal unwind.
67  std::error_code ec;
68  m_proc.terminate(ec);
69  return ec;
70 }
71 
72 std::optional<pid_t> AsyncProcess::GetPid() const noexcept {
73  if (m_proc.valid()) {
74  return m_pid;
75  }
76  return std::nullopt;
77 }
78 
79 std::error_code AsyncProcess::Signal(int sig) noexcept {
80  if (!IsRunning()) {
81  return std::make_error_code(std::errc::no_such_process);
82  }
83 
84  pid_t id = m_proc.id();
85  int err = kill(id, sig);
86  return std::make_error_code(static_cast<std::errc>(err));
87 }
88 
89 bool AsyncProcess::IsRunning() const noexcept {
90  // warning: It is not safe to invoke child::running() due to bug in boost so we infer the
91  // running state based on whether exit_handler has been invoked or not. This is not 100%
92  // accurate but will work for most use-cases.
93  return m_proc.valid() && !m_result.has_value();
94 }
95 
96 void AsyncProcess::AsyncReadStream(AsyncProcess::Pipe& pipe) {
97  // Process pipe content 'line-buffered'
98  boost::asio::async_read_until(pipe.pipe,
99  pipe.buffer,
100  '\n',
101  [this, &pipe](const boost::system::error_code& ec, std::size_t) {
102  if (ec) {
103  // Expected error is 'EOF', but we don't make a
104  // distinction at this point.
105  //
106  // There might still be data in buffer from previous reads
107  // that do not end in newline, so we read all of it and
108  // send it out to connected slots.
109  std::istream is(&pipe.buffer);
110  std::string remaining(pipe.buffer.size(), '\0');
111  is.read(remaining.data(), remaining.size());
112  pipe.signal(m_pid, remaining);
113 
114  // Close pipe to signal we're done.
115  pipe.pipe.close();
116  CheckCompleted();
117 
118  // We're done, do not schedule another read.
119  return;
120  }
121 
122  // Signal any slots.
123  std::istream is(&pipe.buffer);
124  std::string line;
125  std::getline(is, line);
126  // note: getline consumes '\n' which we don't want since we
127  // don't want to modify source at all.
128  line.push_back('\n');
129  pipe.signal(m_pid, line);
130 
131  AsyncReadStream(pipe);
132  });
133 }
134 
135 void AsyncProcess::CheckCompleted() {
136  // Note that we cannot check if process is alive with m_proc.is_running() as this
137  if (!m_result.has_value() || m_stdout.pipe.is_open() || m_stderr.pipe.is_open()) {
138  // Not all read operations have completed.
139  return;
140  }
141  if (m_result->ec) {
142  m_promise.set_exception(std::system_error(m_result->ec));
143  } else {
144  m_promise.set_value(m_result->exit_code);
145  }
146 }
147 
148 } // namespace daq
daq::AsyncProcess::Initiate
boost::future< int > Initiate() override
Starts process and asynchronous operations that read stdout and stderr.
Definition: asyncProcess.cpp:31
daq
Definition: asyncProcess.cpp:15
asyncProcess.hpp
daq::AsyncProcess class definition
daq::AsyncProcess::~AsyncProcess
virtual ~AsyncProcess() noexcept
Definition: asyncProcess.cpp:27
daq::AsyncProcess::AsyncProcess
AsyncProcess(boost::asio::io_context &ctx, std::vector< std::string > args)
Constructor.
Definition: asyncProcess.cpp:19
daq::AsyncProcess::IsRunning
bool IsRunning() const noexcept override
Definition: asyncProcess.cpp:89