ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
rsyncAsyncProcess.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_libdaq
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daq::RsyncAsyncProcess class definition
9  */
11 
12 #include <array>
13 #include <string_view>
14 
15 #include <fmt/format.h>
16 
17 namespace daq {
18 namespace {
19 
20 /**
21  * Literal speed units used by rsync.
22  */
23 static constexpr std::array<std::string_view, 4> RSYNC_UNITS = {"B/s", "kB/s", "MB/s", "GB/s"};
24 
25 static std::vector<std::string>
26 MakeAsyncProcessArgs(std::string source,
27  std::string dest,
28  RsyncOptions const& opts,
30  std::vector<std::string> args;
31  args.emplace_back(opts.rsync.value_or("rsync"));
32  if (opts.bw_limit) {
33  args.emplace_back(fmt::format("--bwlimit={}", *opts.bw_limit));
34  }
35  if (opts.whole_file) {
36  args.emplace_back(*opts.whole_file ? "--whole-file" : "--no-whole-file");
37  }
38  if (opts.inplace) {
39  args.emplace_back(*opts.whole_file ? "--inplace" : "--no-inplace");
40  }
41  if (opts.timeout) {
42  args.emplace_back(fmt::format("--timeout={}", opts.timeout->count()));
43  }
45  args.emplace_back("--dry-run");
46  }
47  // preseve modification times
48  args.emplace_back("--times");
49  // Mandatory to have functioning progress updates.
50  args.emplace_back("--info=progress2");
51  // No human readable gets rid of thousand separators in number of transferred bytes.
52  args.emplace_back("--no-human-readable");
53  args.emplace_back(std::move(source));
54  args.emplace_back(std::move(dest));
55  return args;
56 }
57 
58 } // namespace
59 
60 std::optional<RsyncProgress> ParseRsyncProgress(std::string const& line) noexcept {
61  using std::chrono::hours;
62  using std::chrono::minutes;
63  using std::chrono::seconds;
64 
65  // Format in the form:
66  // 0 0% 0.00kB/s 0:00:00
67  // 367656960 34% 350.62MB/s 0:00:01
68  // 735313920 68% 350.80MB/s 0:00:00
69  // 1073741824 100% 351.29MB/s 0:00:02 (xfr#1 to-chk=0/1)
70  RsyncProgress progress;
71  unsigned percent;
72  float speed;
73  char unit[5];
74  unsigned hrs;
75  unsigned mins;
76  unsigned secs;
77 
78  auto ret = std::sscanf(line.c_str(),
79  " %lu %u%% %f%4s %u:%u:%u",
80  &progress.transferred,
81  &percent,
82  &speed,
83  &unit[0],
84  &hrs,
85  &mins,
86  &secs);
87  if (ret != 7) {
88  return {};
89  }
90  progress.progress = percent / 100.0f;
91  bool found = false;
92  for (auto u : RSYNC_UNITS) {
93  if (u == unit) {
94  found = true;
95  break;
96  }
97  speed *= 1024.0f;
98  }
99  if (!found) {
100  return {};
101  }
102  progress.speed = speed;
103  progress.remaining = hours(hrs) + minutes(mins) + seconds(secs);
104  return progress;
105 }
106 
107 RsyncAsyncProcess::RsyncAsyncProcess(boost::asio::io_context& ctx,
108  std::string source,
109  std::string dest,
110  RsyncOptions const& opts,
111  DryRun flag)
112  : AsyncProcess(ctx, MakeAsyncProcessArgs(std::move(source), std::move(dest), opts, flag)) {
113 }
114 
116 }
117 
118 boost::future<int> RsyncAsyncProcess::Initiate() {
119  AsyncProcess::ConnectStdout([this](pid_t pid, std::string const& line) mutable {
120  // Note: parse failures are ignored
121  auto maybe_progress = ParseRsyncProgress(line);
122  if (maybe_progress) {
123  m_progress(pid, *maybe_progress);
124  }
125  });
126  return AsyncProcess::Initiate();;
127 }
128 
129 boost::signals2::connection RsyncAsyncProcess::ConnectProgress(SigProgress::slot_type const& slot) {
130  return m_progress.connect(slot);
131 }
132 
133 } // namespace daq
daq::RsyncProgress::speed
float speed
Transfer speed in bytes/sec.
Definition: rsyncAsyncProcess.hpp:67
daq::RsyncAsyncProcess::~RsyncAsyncProcess
virtual ~RsyncAsyncProcess()
Definition: rsyncAsyncProcess.cpp:115
daq::ParseRsyncProgress
std::optional< RsyncProgress > ParseRsyncProgress(std::string const &line) noexcept
Parse progress update from rsync.
Definition: rsyncAsyncProcess.cpp:60
daq::AsyncProcess::Initiate
boost::future< int > Initiate() override
Starts process and asynchronous operations that read stdout and stderr.
Definition: asyncProcess.cpp:31
daq::AsyncProcess::ConnectStdout
boost::signals2::connection ConnectStdout(SigOutStream::slot_type const &slot) override
Signal type for stdout/stderr signals.
Definition: asyncProcess.hpp:185
daq::RsyncAsyncProcess::RsyncAsyncProcess
RsyncAsyncProcess(boost::asio::io_context &ctx, std::string source, std::string dest, RsyncOptions const &opts={}, DryRun flag=DryRun::Disabled)
Construct async operation.
Definition: rsyncAsyncProcess.cpp:107
daq::RsyncAsyncProcessIf::DryRun
DryRun
Definition: rsyncAsyncProcess.hpp:79
daq::RsyncOptions
Options controlling rsync invocation.
Definition: rsyncAsyncProcess.hpp:28
daq::RsyncAsyncProcess::Initiate
boost::future< int > Initiate() override
Progress update signal.
Definition: rsyncAsyncProcess.cpp:118
daq::RsyncProgress::progress
float progress
Progress as fraction of 1 (complete == 1.0)
Definition: rsyncAsyncProcess.hpp:63
daq
Definition: asyncProcess.cpp:15
rsyncAsyncProcess.hpp
daq::RsyncAsyncProcess and related class declarations.
daq::RsyncAsyncProcess::ConnectProgress
boost::signals2::connection ConnectProgress(SigProgress::slot_type const &slot) override
Connect to progress signal.
Definition: rsyncAsyncProcess.cpp:129
daq::RsyncProgress::remaining
std::chrono::seconds remaining
Estimated remaining time.
Definition: rsyncAsyncProcess.hpp:71
daq::RsyncProgress::transferred
uint64_t transferred
Number of transferred bytes.
Definition: rsyncAsyncProcess.hpp:59
daq::RsyncProgress
Describes file transfer progress,.
Definition: rsyncAsyncProcess.hpp:55
daq::RsyncAsyncProcessIf::DryRun::Enabled
@ Enabled
daq::AsyncProcess
Represents a subprocess as an asynchronous operation.
Definition: asyncProcess.hpp:124