ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
rsyncAsyncProcess.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdaq
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daq::RsyncAsyncProcess and related class declarations.
9  */
10 #ifndef DAQ_PROCESS_RSYNC_ASYNC_PROCESS_HPP
11 #define DAQ_PROCESS_RSYNC_ASYNC_PROCESS_HPP
12 #include <daq/config.hpp>
13 
14 #include <chrono>
15 #include <optional>
16 #include <string>
17 #include <vector>
18 
19 #include <boost/signals2/signal.hpp>
20 
22 
23 namespace daq {
24 
25 /**
26  * Options controlling rsync invocation.
27  */
28 struct RsyncOptions {
29  /**
30  * Binary to use.
31  */
32  std::optional<std::string> rsync;
33  /**
34  * Enables rate-limiting in kb/s.
35  */
36  std::optional<unsigned> bw_limit;
37  /**
38  * Transfer with/without delta xfer algorithm.
39  */
40  std::optional<bool> whole_file;
41 
42  /**
43  * --inplace/--no-inplace
44  */
45  std::optional<bool> inplace;
46  /**
47  * I/O timeout
48  */
49  std::optional<std::chrono::seconds> timeout;
50 };
51 
52 /**
53  * Describes file transfer progress,
54  */
55 struct RsyncProgress {
56  /**
57  * Number of transferred bytes.
58  */
59  uint64_t transferred;
60  /**
61  * Progress as fraction of 1 (complete == 1.0)
62  */
63  float progress;
64  /**
65  * Transfer speed in bytes/sec.
66  */
67  float speed;
68  /**
69  * Estimated remaining time.
70  */
71  std::chrono::seconds remaining;
72 };
73 
74 /**
75  * More specialized version for `rsync` which also monitors transfer progress.
76  */
77 class RsyncAsyncProcessIf : public virtual AsyncProcessIf {
78 public:
79  enum class DryRun { Disabled = 0, Enabled };
81  }
82 
83  /**
84  * Progress update signal.
85  */
86  using SigProgress = boost::signals2::signal<void(pid_t, RsyncProgress const&)>;
87 
88  /**
89  * Connect to progress signal.
90  */
91  virtual boost::signals2::connection ConnectProgress(SigProgress::slot_type const& slot) = 0;
92 };
93 
94 /**
95  * Parse progress update from rsync.
96  *
97  * @param line a single line in the format rsync outputs with option `--info=progress2`.
98  * @returns optional with value if parsing suceeds, empty optional otherwise.
99  */
100 std::optional<RsyncProgress> ParseRsyncProgress(std::string const& line) noexcept;
101 
102 /**
103  * Represents an rsync process as an asynchronous operation.
104  *
105  * Once constructed the operation is initiated (only once) with `Initiate()` which starts the
106  * process and returns a boost::future object that will receive exit code when process terminates
107  * *and* all output has been read.
108  *
109  * @note No signals will be emitted after future has received the value or exception.
110  *
111  * Operation can be aborted with `Abort()` which will terminate process and set future with
112  * exceptional result.
113  *
114  * @ingroup daq_common_libdaq
115  */
116 class RsyncAsyncProcess : public virtual RsyncAsyncProcessIf, public AsyncProcess {
117 public:
118  virtual ~RsyncAsyncProcess();
119 
120  /**
121  * Construct async operation.
122  *
123  * @note Does not start the process or any other asynchronous operations. This is done in
124  * AsyncProcess::Initiate().
125  *
126  * @param ctx io_context instance to use.
127  * @param source rsync file source.
128  * @param dest rsync file dest.
129  * @param opts rsync options.
130  * @param flag Control whether rsync should do a dry-run (useful to test if connection is
131  * possible).
132  */
133  RsyncAsyncProcess(boost::asio::io_context& ctx,
134  std::string source,
135  std::string dest,
136  RsyncOptions const& opts = {},
137  DryRun flag = DryRun::Disabled);
138 
139  /**
140  * @name Signals
141  */
142  /// @{
143  /**
144  * Progress update signal.
145  */
147 
148  [[nodiscard]] boost::future<int> Initiate() override;
149 
150  /**
151  * Connect to progress signal.
152  */
153  boost::signals2::connection ConnectProgress(SigProgress::slot_type const& slot) override;
154  /// @}
155 private:
156  SigProgress m_progress;
157 };
158 
159 } // namespace daq
160 
161 #endif // #ifndef DAQ_PROCESS_RSYNC_ASYNC_PROCESS_HPP
daq::RsyncAsyncProcessIf::~RsyncAsyncProcessIf
virtual ~RsyncAsyncProcessIf()
Definition: rsyncAsyncProcess.hpp:80
daq::RsyncProgress::speed
float speed
Transfer speed in bytes/sec.
Definition: rsyncAsyncProcess.hpp:67
daq::RsyncAsyncProcess::~RsyncAsyncProcess
virtual ~RsyncAsyncProcess()
Definition: rsyncAsyncProcess.cpp:115
daq::ParseRsyncProgress
std::optional< RsyncProgress > ParseRsyncProgress(std::string const &line) noexcept
Parse progress update from rsync.
Definition: rsyncAsyncProcess.cpp:60
daq::RsyncAsyncProcessIf::ConnectProgress
virtual boost::signals2::connection ConnectProgress(SigProgress::slot_type const &slot)=0
Connect to progress signal.
daq::RsyncOptions::rsync
std::optional< std::string > rsync
Binary to use.
Definition: rsyncAsyncProcess.hpp:32
daq::RsyncAsyncProcess::RsyncAsyncProcess
RsyncAsyncProcess(boost::asio::io_context &ctx, std::string source, std::string dest, RsyncOptions const &opts={}, DryRun flag=DryRun::Disabled)
Construct async operation.
Definition: rsyncAsyncProcess.cpp:107
daq::RsyncAsyncProcessIf::DryRun
DryRun
Definition: rsyncAsyncProcess.hpp:79
daq::RsyncOptions
Options controlling rsync invocation.
Definition: rsyncAsyncProcess.hpp:28
daq::RsyncAsyncProcess::Initiate
boost::future< int > Initiate() override
Progress update signal.
Definition: rsyncAsyncProcess.cpp:118
daq::RsyncAsyncProcess
Represents an rsync process as an asynchronous operation.
Definition: rsyncAsyncProcess.hpp:116
daq::RsyncProgress::progress
float progress
Progress as fraction of 1 (complete == 1.0)
Definition: rsyncAsyncProcess.hpp:63
daq::RsyncOptions::whole_file
std::optional< bool > whole_file
Transfer with/without delta xfer algorithm.
Definition: rsyncAsyncProcess.hpp:40
daq
Definition: asyncProcess.cpp:15
config.hpp
daq::RsyncAsyncProcessIf::DryRun::Disabled
@ Disabled
daq::RsyncOptions::timeout
std::optional< std::chrono::seconds > timeout
I/O timeout.
Definition: rsyncAsyncProcess.hpp:49
asyncProcess.hpp
daq::AsyncProcess class definition
daq::RsyncAsyncProcess::ConnectProgress
boost::signals2::connection ConnectProgress(SigProgress::slot_type const &slot) override
Connect to progress signal.
Definition: rsyncAsyncProcess.cpp:129
daq::RsyncAsyncProcessIf::SigProgress
boost::signals2::signal< void(pid_t, RsyncProgress const &)> SigProgress
Progress update signal.
Definition: rsyncAsyncProcess.hpp:86
daq::RsyncProgress::remaining
std::chrono::seconds remaining
Estimated remaining time.
Definition: rsyncAsyncProcess.hpp:71
daq::RsyncAsyncProcessIf
More specialized version for rsync which also monitors transfer progress.
Definition: rsyncAsyncProcess.hpp:77
daq::RsyncProgress::transferred
uint64_t transferred
Number of transferred bytes.
Definition: rsyncAsyncProcess.hpp:59
daq::RsyncOptions::bw_limit
std::optional< unsigned > bw_limit
Enables rate-limiting in kb/s.
Definition: rsyncAsyncProcess.hpp:36
daq::RsyncProgress
Describes file transfer progress,.
Definition: rsyncAsyncProcess.hpp:55
daq::AsyncProcessIf
Interface to asynchronous process.
Definition: asyncProcess.hpp:27
daq::RsyncAsyncProcessIf::DryRun::Enabled
@ Enabled
daq::RsyncOptions::inplace
std::optional< bool > inplace
–inplace/–no-inplace
Definition: rsyncAsyncProcess.hpp:45
daq::AsyncProcess
Represents a subprocess as an asynchronous operation.
Definition: asyncProcess.hpp:124