ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
testAsyncOpAwaitPrim.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_libdaq_test
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Unit test for op::AwaitPrimAsync
7  */
8 #include <daq/config.hpp>
9 
10 #include <gmock/gmock.h>
11 #include <gtest/gtest.h>
12 #include <log4cplus/logger.h>
13 
14 #include <daq/error.hpp>
15 #include <daq/op/initiate.hpp>
16 #include <daq/op/awaitPrim.hpp>
17 
18 #include "testAsyncOpBase.hpp"
19 
20 using namespace daq;
21 using namespace ::testing;
22 
23 namespace {
24 
25 class RecStatusFake : public recif::RecStatus {
26 public:
27  RecStatusFake(std::string id, recif::RecStatusNames status, std::vector<std::string> files)
28  : m_id(std::move(id)), m_status(status), m_files(std::move(files)) {
29  }
30  virtual std::vector<std::string> getDpFiles() const override {
31  return m_files;
32  }
33  virtual void setDpFiles(const std::vector<std::string>& dp_files) override {
34  m_files = dp_files;
35  }
36 
37  virtual double getEndTime() const override {
38  return 0.0;
39  }
40  virtual void setEndTime(double end_time) override {
41  }
42 
43  virtual int32_t getFilesGenerated() const override {
44  return m_files.size();
45  }
46  virtual void setFilesGenerated(int32_t files_generated) override {
47  }
48 
49  virtual int32_t getFramesProcessed() const override {
50  return 1;
51  }
52  virtual void setFramesProcessed(int32_t frames_processed) override {
53  }
54 
55  virtual int32_t getFramesRemaining() const override {
56  return 0;
57  }
58  virtual void setFramesRemaining(int32_t frames_remaining) override {
59  }
60 
61  virtual std::string getId() const override {
62  return m_id;
63  }
64  virtual void setId(const std::string& id) override {
65  m_id = id;
66  }
67 
68  virtual std::string getInfo() const override {
69  return {};
70  }
71  virtual void setInfo(const std::string& info) override {
72  }
73 
74  virtual double getRemainingTime() const override {
75  return 1.0;
76  }
77  virtual void setRemainingTime(double remaining_time) override {
78  }
79 
80  virtual int32_t getSizeRecorded() const override {
81  return 1;
82  }
83  virtual void setSizeRecorded(int32_t size_recorded) override {
84  }
85 
86  virtual double getStartTime() const override {
87  return 1.0;
88  }
89  virtual void setStartTime(double start_time) override {
90  }
91 
92  virtual ::recif::RecStatusNames getStatus() const override {
93  return m_status;
94  }
95  virtual void setStatus(::recif::RecStatusNames status) override {
96  m_status = status;
97  }
98 
99  virtual double getTimeElapsed() const override {
100  return 1.0;
101  }
102  virtual void setTimeElapsed(double time_elapsed) override {
103  }
104  bool hasKey() const override {
105  return false;
106  }
107  std::unique_ptr<recif::RecStatus> cloneKey() const override {
108  throw std::runtime_error("not clonable");
109  }
110  std::unique_ptr<recif::RecStatus> clone() const override {
111  throw std::runtime_error("not clonable");
112  }
113  bool keyEquals(const recif::RecStatus& other) const override {
114  return false;
115  }
116 
117 private:
118  std::string m_id;
119  recif::RecStatusNames m_status;
120  std::vector<std::string> m_files;
121 };
122 
123 class RecWaitStatusFake : public recif::RecWaitStatus {
124 public:
125  RecWaitStatusFake(recif::RecWaitStatusNames wait_status, std::shared_ptr<recif::RecStatus> rec_status)
126  : m_wait_status{wait_status}, m_rec_status{std::move(rec_status)} {
127  }
128  std::shared_ptr<recif::RecStatus> getRecStatus() const override {
129  return m_rec_status;
130  }
131 
132  recif::RecWaitStatusNames getStatus() const override {
133  return m_wait_status;
134  }
135  void setStatus(recif::RecWaitStatusNames wait_status) override {
136  m_wait_status = wait_status;
137  }
138  bool hasKey() const override {
139  return false;
140  }
141  std::unique_ptr<recif::RecWaitStatus> cloneKey() const override {
142  throw std::runtime_error("not clonable");
143  }
144  std::unique_ptr<recif::RecWaitStatus> clone() const override {
145  throw std::runtime_error("not clonable");
146  }
147  bool keyEquals(const recif::RecWaitStatus& other) const override {
148  return false;
149  }
150 
151 private:
152  recif::RecWaitStatusNames m_wait_status;
153  std::shared_ptr<recif::RecStatus> m_rec_status;
154 };
155 }
156 /**
157  * @ingroup daq_ocm_libdaq_test
158  */
160 
161  std::shared_ptr<recif::RecWaitStatus> MakeWaitNotCompletedStatus() {
162  auto status = std::make_shared<RecWaitStatusFake>(
163  recif::Timeout,
164  std::make_shared<RecStatusFake>(
165  m_id, recif::RecStatusNames::Active, std::vector<std::string>()));
166  return status;
167  }
168  std::shared_ptr<recif::RecWaitStatus> MakeWaitCompletedStatus() {
169  auto status = std::make_shared<RecWaitStatusFake>(
170  recif::Success,
171  std::make_shared<RecStatusFake>(
172  m_id, recif::RecStatusNames::Completed, std::vector<std::string>{"/file1.fits", "/file2.fits"}));
173  return status;
174  }
175 
177  return daq::op::AwaitOpParams(MakeParams(), std::chrono::milliseconds(0));
178  }
179 };
180 
181 TEST_F(TestAsyncOpAwaitPrim, AwaitCompletesOnFirstRequest) {
182  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_1;
183  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2;
184  auto reply_1 = MakeWaitCompletedStatus();
185  auto reply_2 = MakeWaitCompletedStatus();
186  EXPECT_CALL(*m_prim_rr_client, RecWait(_))
187  .WillOnce(Return(ByMove(prim_promise_1.get_future())));
188  EXPECT_CALL(*m_prim_rr_client2, RecWait(_))
189  .WillOnce(Return(ByMove(prim_promise_2.get_future())));
190 
191  // Run
192  auto fut = op::InitiateOperation<op::AwaitPrimAsync>(MakeAwaitOpParams());
193  // "Send replies"
194  prim_promise_1.set_value(reply_1);
195  prim_promise_2.set_value(reply_2);
196 
197  // Execute handlers
198  MakeTestProgress(m_io_ctx, &fut);
199 
200  ASSERT_TRUE(fut.is_ready());
201  ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
202  auto result = fut.get();
203  EXPECT_FALSE(result.error) << "There should have been no errors as each request was successful";
204  EXPECT_EQ(result.result.size(), 4u) << "Each primary source returned two files by default";
205 }
206 
207 TEST_F(TestAsyncOpAwaitPrim, AwaitCompletesOnSecondRequest) {
208  // Only the second source is completed on second attempt
209  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_1;
210  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_1;
211  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_2;
212  auto reply_1 = MakeWaitCompletedStatus();
213  auto reply_2_1 = MakeWaitNotCompletedStatus();
214  auto reply_2_2 = MakeWaitCompletedStatus();
215  EXPECT_CALL(*m_prim_rr_client, RecWait(_))
216  .WillOnce(Return(ByMove(prim_promise_1.get_future())));
217  EXPECT_CALL(*m_prim_rr_client2, RecWait(_))
218  .WillOnce(Return(ByMove(prim_promise_2_1.get_future())))
219  .WillOnce(Return(ByMove(prim_promise_2_2.get_future())));
220 
221  // Run
222  auto fut = op::InitiateOperation<op::AwaitPrimAsync>(MakeAwaitOpParams());
223  // "Send replies"
224  prim_promise_1.set_value(reply_1);
225  prim_promise_2_1.set_value(reply_2_1);
226 
227  // Run ready handlers
228  m_io_ctx.poll();
229  ASSERT_FALSE(fut.is_ready());
230 
231  prim_promise_2_2.set_value(reply_2_2);
232 
233  // Execute handlers
234  MakeTestProgress(m_io_ctx, &fut);
235 
236  ASSERT_TRUE(fut.is_ready());
237  ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
238  auto result = fut.get();
239  EXPECT_FALSE(result.error) << "There should have been no errors as each request was successful";
240  EXPECT_EQ(result.result.size(), 4u) << "Each primary source returned two files by default";
241 }
242 
243 TEST_F(TestAsyncOpAwaitPrim, SourceThrowsExceptionIsRetriedSuccessfully) {
244  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_1;
245  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_1;
246  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_2;
247  auto reply_1 = MakeWaitCompletedStatus();
248  auto reply_2_2 = MakeWaitCompletedStatus();
249  EXPECT_CALL(*m_prim_rr_client, RecWait(_))
250  .WillOnce(Return(ByMove(prim_promise_1.get_future())));
251  EXPECT_CALL(*m_prim_rr_client2, RecWait(_))
252  .WillOnce(Return(ByMove(prim_promise_2_1.get_future())))
253  .WillOnce(Return(ByMove(prim_promise_2_2.get_future())));
254 
255  // Run
256  auto fut = op::InitiateOperation<op::AwaitPrimAsync>(MakeAwaitOpParams());
257  // "Send replies"
258  prim_promise_1.set_value(reply_1);
259  prim_promise_2_1.set_exception(recif::ExceptionErr("random error", 1));
260 
261  // Run ready handlers
262  m_io_ctx.poll();
263  ASSERT_FALSE(fut.is_ready());
264 
265  prim_promise_2_2.set_value(reply_2_2);
266 
267  // Execute handlers
268  MakeTestProgress(m_io_ctx, &fut);
269 
270  ASSERT_TRUE(fut.is_ready());
271  ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
272  auto result = fut.get();
273  EXPECT_TRUE(result.error) << "A erors as each request was successful";
274  EXPECT_EQ(result.result.size(), 4u) << "Each primary source returned two files by default";
275 }
276 
277 TEST_F(TestAsyncOpAwaitPrim, SourceThrowsExceptionIsAbortedSuccessfully) {
278  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_1;
279  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_1;
280  boost::promise<std::shared_ptr<recif::RecWaitStatus>> prim_promise_2_2;
281  auto reply_1 = MakeWaitCompletedStatus();
282  auto reply_2_2 = MakeWaitCompletedStatus();
283  EXPECT_CALL(*m_prim_rr_client, RecWait(_))
284  .WillOnce(Return(ByMove(prim_promise_1.get_future())));
285  EXPECT_CALL(*m_prim_rr_client2, RecWait(_))
286  .WillOnce(Return(ByMove(prim_promise_2_1.get_future())));
287 
288  // Run
289  auto [fut, abort] = op::InitiateAbortableOperation<op::AwaitPrimAsync>(MakeAwaitOpParams());
290  // "Send replies"
291  prim_promise_1.set_value(reply_1);
292  prim_promise_2_1.set_exception(recif::ExceptionErr("random error", 1));
293 
294  // Run ready handlers
295  m_io_ctx.poll();
296  ASSERT_FALSE(fut.is_ready());
297 
298  // Abort operation. This should set the promise (but might not be ready immedately due to
299  // continuations)
300  EXPECT_TRUE(abort());
301  // Execute handlers
302  MakeTestProgress(m_io_ctx, &fut);
303  EXPECT_TRUE(fut.is_ready()) << "aborting operation should immediately set operation result";
304 
305  ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
306  auto result = fut.get();
307  EXPECT_TRUE(result.error) << "Operation was completed with error (exception from source)";
308  EXPECT_EQ(result.result.size(), 2u) << "Only result from first source was received before "
309  "being aborted";
310 
311  // Send reply to trigger cleanup of pending continuations
312  prim_promise_2_2.set_value(reply_1);
313  MakeTestProgress(m_io_ctx);
314 }
initiate.hpp
Contains declarations for the helper functions to initiate operations.
TestAsyncOpAwaitPrim::MakeWaitCompletedStatus
std::shared_ptr< recif::RecWaitStatus > MakeWaitCompletedStatus()
Definition: testAsyncOpAwaitPrim.cpp:168
TestAsyncOpAwaitPrim
Definition: testAsyncOpAwaitPrim.cpp:159
TestAsyncOpAwaitPrim::MakeAwaitOpParams
daq::op::AwaitOpParams MakeAwaitOpParams()
Definition: testAsyncOpAwaitPrim.cpp:176
daq::op::AwaitOpParams
Await specific parameters that is not provided with AsyncOpParams.
Definition: asyncOpParams.hpp:114
daq
Definition: asyncProcess.cpp:15
config.hpp
testAsyncOpBase.hpp
Contains declaration for async operations shared base class.
daq::TEST_F
TEST_F(TestDpmDaqController, StatusUpdateInNotScheduledSucceeds)
Definition: testDpmDaqController.cpp:60
TestAsyncOpBase
Base fixture for async operation tests.
Definition: testAsyncOpBase.hpp:38
awaitPrim.hpp
Contains declaration for the AwaitPrimAsync operation.
TestAsyncOpAwaitPrim::MakeWaitNotCompletedStatus
std::shared_ptr< recif::RecWaitStatus > MakeWaitNotCompletedStatus()
Definition: testAsyncOpAwaitPrim.cpp:161
error.hpp
Contains error related declarations for DAQ.
MakeTestProgress
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Definition: utils.hpp:42