ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
asyncProcess.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdaq
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daq::AsyncProcess class definition
9  */
10 #ifndef DAQ_ASYNC_PROCESS_HPP
11 #define DAQ_ASYNC_PROCESS_HPP
12 #include <daq/config.hpp>
13 
14 #include <optional>
15 #include <string>
16 #include <vector>
17 
18 #include <boost/process.hpp>
19 #include <boost/signals2/signal.hpp>
20 #include <boost/thread/future.hpp>
21 
22 namespace daq {
23 
24 /**
25  * Interface to asynchronous process.
26  */
28 public:
29  virtual ~AsyncProcessIf(){};
30 
31  /**
32  * Initiates async operation by executing the specified process.
33  *
34  * This can only be called once.
35  */
36  [[nodiscard]] virtual boost::future<int> Initiate() = 0;
37 
38  /**
39  * Get PID.
40  *
41  * @returns pid of process if process is running, otherwise nullopt.
42  */
43  virtual std::optional<pid_t> GetPid() const noexcept = 0;
44 
45  /**
46  * Aborts the operation by terminating process which completes the operation.
47  * If process is not running this will not do anything.
48  *
49  * @returns Error code of operation. If process is not running, because it has not been started
50  * for with AsyncProcess::Initiate() for example, it returns std::errc::no_such_process.
51  */
52  virtual std::error_code Abort() noexcept = 0;
53 
54  /**
55  * Send signal to process. Unlike Abort() this can be used to gracefully terminate application
56  * by sending e.g. SIGTERM.
57  *
58  * @param sig Signal to send.
59  * @returns Error code of operation. If process is not running, because it has not been started
60  * for with AsyncProcess::Initiate() for example, it returns std::errc::no_such_process.
61  */
62  virtual std::error_code Signal(int sig) noexcept = 0;
63 
64  /**
65  * @return arguments used when launching process.
66  */
67  virtual std::vector<std::string> const& GetArguments() const noexcept = 0;
68 
69  /**
70  * @returns true if process is running.
71  * @returns false otherwise.
72  */
73  virtual bool IsRunning() const noexcept = 0;
74 
75  /**
76  * @name Signals
77  */
78  /// @{
79  /**
80  * Signal type for stdout/stderr signals.
81  */
82  using SigOutStream = boost::signals2::signal<void(pid_t, std::string const&)>;
83 
84  /**
85  * Connect slot to line-buffered stdout signal.
86  *
87  * Signal is invoked for every line read from process.
88  *
89  * @returns connection object.
90  */
91  virtual boost::signals2::connection ConnectStdout(SigOutStream::slot_type const& slot) = 0;
92 
93  /**
94  * Connect slot to line-buffered stderr signal.
95  *
96  * Signal is invoked for every line read from process.
97  *
98  * @returns connection object.
99  */
100  virtual boost::signals2::connection ConnectStderr(SigOutStream::slot_type const& slot) = 0;
101  /// @}
102 };
103 
104 /**
105  * Represents a subprocess as an asynchronous operation.
106  *
107  * Once constructed the operation is initiated (only once) with `Initiate()` which starts the
108  * process and returns a boost::future object that will receive exit code when process terminates
109  * *and* all output has been read.
110  *
111  * @note All completion handlers have been executed and no signals will be emitted after future has
112  * received the value or exception, it is safe to delete AsyncProcess after this.
113  *
114  * Operation can be aborted with `Abort()` which will terminate process and set future with
115  * exceptional result.
116  *
117  * boost::process is pretty buggy so be very careful making changes to this.
118  * Examples:
119  * - Do not check if process is running with child::is_running() after `on_exit` has been executed.
120  * This cause exit codes to be lost [https://github.com/boostorg/process/issues/187]
121  *
122  * @ingroup daq_common_libdaq
123  */
124 class AsyncProcess : public virtual AsyncProcessIf {
125 public:
126  /**
127  * Constructor.
128  *
129  * @note Does not start the process or any other asynchronous operations. This is done in
130  * AsyncProcess::Initiate().
131  *
132  * @param ctx io_context instance to use.
133  * @param args Command line arguments. First argument specify the file to be executed.
134  */
135  explicit AsyncProcess(boost::asio::io_context& ctx, std::vector<std::string> args);
136  virtual ~AsyncProcess() noexcept;
137 
138  /**
139  * Starts process and asynchronous operations that read stdout and stderr.
140  *
141  * This can only be called once.
142  *
143  * @note Caller is responsible for keeping AsyncProcess alive until result is set on future.
144  *
145  * @returns Future that will receive process exit code @a after process has terminated and all
146  * output has been read.
147  */
148  [[nodiscard]] boost::future<int> Initiate() override;
149  std::optional<pid_t> GetPid() const noexcept override;
150 
151  /**
152  * Aborts the operation by terminating process which completes the operation.
153  * If process is not running this will not do anything.
154  *
155  * @returns Error code of operation. If process is not running, because it has not been started
156  * for with AsyncProcess::Initiate() for example, it returns std::errc::no_such_process.
157  */
158  std::error_code Abort() noexcept override;
159 
160  /**
161  * Send signal to process. Unlike Abort() this can be used to gracefully terminate application
162  * by sending e.g. SIGTERM.
163  *
164  * @param sig Signal to send.
165  * @returns Error code of operation. If process is not running, because it has not been started
166  * for with AsyncProcess::Initiate() for example, it returns std::errc::no_such_process.
167  */
168  std::error_code Signal(int sig) noexcept override;
169 
170  /**
171  * @returns true if process is running.
172  * @returns false otherwise.
173  */
174  bool IsRunning() const noexcept override;
175 
176  std::vector<std::string> const& GetArguments() const noexcept override {
177  return m_args;
178  }
179 
180  /**
181  * @name Signals
182  */
183  /// @{
185  boost::signals2::connection ConnectStdout(SigOutStream::slot_type const& slot) override {
186  return m_stdout.signal.connect(slot);
187  }
188  boost::signals2::connection ConnectStderr(SigOutStream::slot_type const& slot) override {
189  return m_stderr.signal.connect(slot);
190  }
191  /// @}
192 
193 protected:
194 private:
195  /**
196  * Describes process result.
197  */
198  struct Result {
199  /**
200  * If process was started this contains the exit_code.
201  */
202  int exit_code;
203  /**
204  * If process start fails this will be non-zero.
205  */
206  std::error_code ec;
207  };
208  struct Pipe {
209  boost::process::async_pipe pipe;
210  boost::asio::streambuf buffer = boost::asio::streambuf();
211  SigOutStream signal = {};
212  };
213  /**
214  * Checks if operation is completed and sets promise if it is.
215  *
216  * Three async operations may complete in any order and the future should be set only when all
217  * three complete.
218  *
219  * - stdout PIPE reads. Completes when read get EOF.
220  * - stderr PIPE reads. Completes when read get EOF.
221  * - on_exit handler, completed automatically when process exits.
222  *
223  * Each of these must invoke CheckCompleted() which will trigger completion of future only after
224  * all operations have completed.
225  */
226  void CheckCompleted();
227  /**
228  * Recursive async read initator */
229  void AsyncReadStream(Pipe&);
230 
231  boost::asio::io_context& m_io_ctx;
232  std::vector<std::string> m_args;
233  /** @name PIPEs and buffers
234  * @{
235  */
236  Pipe m_stdout;
237  Pipe m_stderr;
238  /** @} */
239 
240  boost::process::child m_proc;
241  pid_t m_pid;
242  std::optional<Result> m_result;
243  boost::promise<int> m_promise;
244 };
245 
246 } // namespace daq
247 
248 #endif // #ifndef DAQ_ASYNC_PROCESS_HPP
daq::AsyncProcessIf::ConnectStdout
virtual boost::signals2::connection ConnectStdout(SigOutStream::slot_type const &slot)=0
Connect slot to line-buffered stdout signal.
daq::AsyncProcess::ConnectStdout
boost::signals2::connection ConnectStdout(SigOutStream::slot_type const &slot) override
Signal type for stdout/stderr signals.
Definition: asyncProcess.hpp:185
daq::AsyncProcessIf::GetArguments
virtual std::vector< std::string > const & GetArguments() const noexcept=0
daq::AsyncProcessIf::Signal
virtual std::error_code Signal(int sig) noexcept=0
Send signal to process.
daq
Definition: asyncProcess.cpp:15
config.hpp
daq::AsyncProcessIf::Abort
virtual std::error_code Abort() noexcept=0
Aborts the operation by terminating process which completes the operation.
daq::AsyncProcess::GetArguments
std::vector< std::string > const & GetArguments() const noexcept override
Definition: asyncProcess.hpp:176
daq::AsyncProcessIf::Initiate
virtual boost::future< int > Initiate()=0
Initiates async operation by executing the specified process.
daq::AsyncProcessIf::ConnectStderr
virtual boost::signals2::connection ConnectStderr(SigOutStream::slot_type const &slot)=0
Connect slot to line-buffered stderr signal.
daq::Result
Utility class that represents a result and an error.
Definition: utility.hpp:17
daq::AsyncProcessIf::IsRunning
virtual bool IsRunning() const noexcept=0
daq::AsyncProcessIf::SigOutStream
boost::signals2::signal< void(pid_t, std::string const &)> SigOutStream
Signal type for stdout/stderr signals.
Definition: asyncProcess.hpp:82
daq::AsyncProcessIf::GetPid
virtual std::optional< pid_t > GetPid() const noexcept=0
Get PID.
daq::AsyncProcessIf::~AsyncProcessIf
virtual ~AsyncProcessIf()
Definition: asyncProcess.hpp:29
daq::AsyncProcessIf
Interface to asynchronous process.
Definition: asyncProcess.hpp:27
daq::AsyncProcess::ConnectStderr
boost::signals2::connection ConnectStderr(SigOutStream::slot_type const &slot) override
Signal type for stdout/stderr signals.
Definition: asyncProcess.hpp:188
daq::AsyncProcess
Represents a subprocess as an asynchronous operation.
Definition: asyncProcess.hpp:124