ifw-daq  1.0.0
IFW Data Acquisition modules
testDaqController.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_libdaq_test
4  * @copyright 2021 ESO - European Southern Observatory
5  *
6  * @brief Unit test for `daq::DaqControllerImpl`
7  */
8 // NOLINT
9 #include <memory>
10 #include <stdexcept>
11 #include <thread>
12 
13 #include <gtest/gtest.h>
14 
15 #include <daq/daqController.hpp>
16 #include <daq/error.hpp>
17 #include <daq/op/initiate.hpp>
18 #include <daq/op/start.hpp>
19 #include <daq/op/abort.hpp>
20 #include <daq/op/stop.hpp>
21 #include <daq/op/awaitPrim.hpp>
22 
24 #include "mock/fitsController.hpp"
25 #include "mock/metadaqifMock.hpp"
26 #include "mock/recifMock.hpp"
27 #include "utils.hpp"
28 #include "statusObserver.hpp"
29 
30 
31 using namespace daq;
32 using namespace ::testing;
33 using namespace std::chrono;
34 
35 /**
36  * Fixture for daq::DaqController life cycle tests
37  *
38  * @ingroup daq_ocm_libdaq_test
39  */
40 class TestLifeCycle : public ::testing::Test {
41 public:
42  TestLifeCycle() : m_io_ctx(), m_status(std::make_shared<ObservableStatus>("id")),
43  m_event_log(std::make_shared<ObservableEventLog>()) {
44  m_prim_rr_client = std::make_shared<RecCmdsAsyncMock>();
45  m_meta_rr_client = std::make_shared<MetaDaqAsyncMock>();
46  m_ops.start = op::InitiateOperation<op::StartAsync, boost::future<void>, op::AsyncOpParams>;
48  boost::future<Result<DpParts>>,
52  boost::future<Result<void>>,
56  boost::future<Result<DpParts>>,
58  m_props.id = "id";
59  }
60  boost::asio::io_context m_io_ctx; // NOLINT
61  std::shared_ptr<ObservableStatus> m_status; // NOLINT
62  std::shared_ptr<ObservableEventLog> m_event_log; // NOLINT
63  std::shared_ptr<PrimSource::RrClient> m_prim_rr_client; // NOLINT
64  std::shared_ptr<MetaSource::RrClient> m_meta_rr_client; // NOLINT
67 };
68 
69 template<class Iterator>
70 void SetSourceState(Iterator begin, Iterator end, State state) {
71  auto it = begin;
72  for (; it != end; ++it) {
73  it->SetState(state);
74  }
75 }
76 
78  SetSourceState(params.meta_sources.begin(), params.meta_sources.end(), state);
79  SetSourceState(params.prim_sources.begin(), params.prim_sources.end(), state);
80 }
81 
82 /**
83  *
84  * Developer notes:
85  * DaqControllerImpl use boost::when_all to compose futures. This does not support executors and will
86  * spawn a thread to perform the work. This means that the tests will either have to block
87  * indefinitely with future::get() or use a timeout.
88  *
89  * @ingroup daq_ocm_libdaq_test
90  */
91 struct TestState : ::testing::Test {
92  std::shared_ptr<PrimSource::RrClient> m_prim_rr_client; // NOLINT
93  std::shared_ptr<MetaDaqAsyncMock> m_meta_rr_client; // NOLINT
94  std::shared_ptr<MetaDaqAsyncMock> m_meta_rr_client2; // NOLINT
95  std::unique_ptr<MockAsyncOperations> m_mock_ops; // NOLINT
96  // Not owner
98 
99  boost::asio::io_context m_io_ctx; // NOLINT
101  std::shared_ptr<ObservableStatus> m_status; // NOLINT
102  std::shared_ptr<ObservableEventLog> m_event_log; // NOLINT
103  std::shared_ptr<DaqControllerImpl> m_daq; // NOLINT
105  std::vector<DpPart> m_files; // NOLINT
107 
109  : m_mock_fits_ctl(nullptr)
110  , m_io_ctx()
111  , m_status(std::make_shared<ObservableStatus>("id"))
112  , m_event_log(std::make_shared<ObservableEventLog>()) {
113  }
114 
115  void SetUp() override {
116  m_files.emplace_back("foo", "bar");
117  m_keywords.emplace_back(fits::EsoKeyword("FOO", "BAR"));
118  m_keywords.emplace_back(fits::ValueKeyword("FOO", "BAR"));
119 
120  m_prim_rr_client = std::make_shared<RecCmdsAsyncMock>();
121  m_meta_rr_client = std::make_shared<MetaDaqAsyncMock>();
122  m_meta_rr_client2 = std::make_shared<NiceMock<MetaDaqAsyncMock>>();
123  auto tup = CreateMockAsyncOperations();
124  m_mock_ops.swap(std::get<std::unique_ptr<MockAsyncOperations>>(tup));
125  auto fits_ctl = std::make_unique<FitsControllerMock>();
126  m_mock_fits_ctl = fits_ctl.get();
127 
128  // Connect listener
129  // @todo: Add expectations for observer
130  // m_status->ConnectObserver(std::reference_wrapper(m_observer));
131 
132  MetaSource s1("meta-source-1", m_meta_rr_client);
133  MetaSource s2("meta-source-2", m_meta_rr_client2);
134  m_props.id = "id";
135  m_props.prim_sources = std::vector<daq::PrimSource>{},
136  m_props.meta_sources = std::vector<daq::MetaSource>{s1, s2},
137 
138  PreDaqControllerHook();
139  m_daq = std::make_shared<daq::DaqControllerImpl>(
140  m_io_ctx, m_props, std::move(fits_ctl), m_status, m_event_log, std::get<AsyncOperations>(tup));
141 
142  ASSERT_TRUE(m_daq);
143  ASSERT_EQ(m_status->GetState(), m_daq->GetState());
144  }
145 
146  void TearDown() override {
147  m_mock_fits_ctl = nullptr;
148  m_daq.reset();
149  m_meta_rr_client.reset();
150  m_meta_rr_client2.reset();
151  m_prim_rr_client.reset();
152  }
153 
154  virtual void PreDaqControllerHook() {}
155  virtual void PreStartAsyncHook() {}
156 
157  /**
158  * Executes a successful StartAsync() call
159  */
160  void StartDaq() {
161  // Setup
162  // Set up mock so that op::StartAsync invocation returns the future from our promise.
163  boost::promise<void> reply_promise;
164  std::optional<op::AsyncOpParams> params;
165  EXPECT_CALL(*m_mock_fits_ctl, Start());
166  EXPECT_CALL(*m_mock_ops, Start(_))
167  .WillOnce(DoAll(Invoke([&](auto p) { params.emplace(p); }),
168  Return(ByMove(reply_promise.get_future()))));
169  PreStartAsyncHook();
170 
171  // Run
172  auto fut = m_daq->StartAsync();
173  EXPECT_EQ(State::Starting, m_daq->GetState());
174  EXPECT_FALSE(fut.is_ready());
175 
176  // "Send reply"
177  reply_promise.set_value();
178  ASSERT_TRUE(params);
179  SetAllSourceState(*params, State::Acquiring);
180 
181  // Execute scheduled handlers
182  MakeTestProgress(m_io_ctx, &fut);
183 
184  ASSERT_TRUE(fut.is_ready());
185  EXPECT_EQ(State::Acquiring, fut.get());
186  }
187 
188  void AbortDaq() {
189  // Setup
190  // Set up mock so that op::StartAsync invocation returns the future from our promise.
191  std::optional<op::AsyncOpParams> params;
192  boost::promise<Result<void>> reply_promise;
193  EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
194  .WillOnce(DoAll(Invoke([&](auto policy, auto p) { params.emplace(p); }),
195  Return(ByMove(reply_promise.get_future()))));
196  EXPECT_CALL(*m_mock_fits_ctl, Abort(ErrorPolicy::Strict));
197 
198  // Run
199  auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
200 
201  EXPECT_EQ(State::Aborting, m_daq->GetState())
202  << "Expected state to be in Stopping after requesting to abort";
203 
204  // "Send reply"
205  reply_promise.set_value({false});
206  ASSERT_TRUE(params);
208 
209  // Execute handlers
210  MakeTestProgress(m_io_ctx, &fut);
211 
212  ASSERT_TRUE(fut.is_ready());
213  ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
214  auto result = fut.get();
215  EXPECT_EQ(State::Aborted, result.state);
216  EXPECT_FALSE(result.error);
217  EXPECT_EQ(State::Aborted, m_daq->GetState());
218  }
219 
220  void StopDaq() {
221  std::optional<op::AsyncOpParams> params;
222  boost::promise<Result<DpParts>> reply_promise;
223 
224  EXPECT_CALL(*m_mock_fits_ctl, UpdateKeywords(_));
225  EXPECT_CALL(*m_mock_fits_ctl, Stop(ErrorPolicy::Strict)).WillOnce(Return(DpPart("OCM",
226  "baz")));
227  EXPECT_CALL(*m_mock_ops, Stop(ErrorPolicy::Strict, _))
228  .WillOnce(DoAll(Invoke([&](auto policy, auto p) { params.emplace(p); }),
229  Return(ByMove(reply_promise.get_future()))));
230 
231  // Run
232  auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
233 
234  EXPECT_EQ(State::Stopping, m_daq->GetState())
235  << "Expected state to be in Stopping after requesting to stop";
236 
237  // "Send reply"
238  Result<DpParts> reply {false, m_files};
239  reply_promise.set_value(reply);
240  ASSERT_TRUE(params);
241  SetAllSourceState(*params, State::Stopped);
242 
243  // Execute handlers
244  MakeTestProgress(m_io_ctx, &fut);
245 
246  ASSERT_TRUE(fut.is_ready());
247  ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
248  auto status = fut.get();
249  EXPECT_EQ(State::Stopped, status.state)
250  << "Expected state to be Stopped since there were no errors";
251  EXPECT_FALSE(status.error);
252  EXPECT_EQ(2u, status.files.size()) << "One from m_files and 1 from FitsController";
253  EXPECT_EQ(State::Stopped, m_daq->GetState());
254  }
255 
256 };
257 
258 struct TestAwait : TestState {};
259 
261 
264  m_status->SetState(State::Acquiring);
265  }
266 };
267 
270  m_status->SetState(State::Stopped);
271  }
272 };
275  // Add a primary source, which was not needed for other tests
276  PrimSource s1("prim-source-1", m_prim_rr_client);
277  m_props.prim_sources = std::vector<daq::PrimSource>{s1};
278  }
280  EXPECT_CALL(*m_mock_ops, AwaitPrim(_))
281  .WillOnce(Return(ByMove(m_await_promise.get_future())));
282  }
283 
284  boost::promise<Result<DpParts>> m_await_promise; // NOLINT
285 };
286 
287 // Simple test to understand boost::future::unwrap()
288 TEST(TestBoost, Unwrap) {
289  EXPECT_THROW(boost::make_ready_future()
290  .then([](auto f) -> boost::future<void> {
291  try {
292  throw std::runtime_error("Meow");
293  } catch (...) {
294  return boost::make_exceptional_future<void>();
295  }
296  })
297  .unwrap()
298  .get(),
299  std::runtime_error);
300 }
301 
302 
303 TEST_F(TestLifeCycle, ConstructorFailsIfNoSourcesAreProvided) {
304 
305  ASSERT_THROW(DaqControllerImpl::Create(m_io_ctx, m_props, m_status, m_event_log, m_ops),
306  std::invalid_argument);
307 }
308 
309 
310 TEST_F(TestLifeCycle, ConstructorFailsObservableStatusIdDoesNotMatchDaqPropertiesId) {
311  MetaSource s("source-id", m_meta_rr_client);
312  m_props.meta_sources = {s};
313  m_props.id = "not-id";
314  ASSERT_THROW(DaqControllerImpl::Create(m_io_ctx, m_props, m_status, m_event_log, m_ops),
315  std::invalid_argument);
316 }
317 
318 
319 
320 TEST_F(TestLifeCycle, ConstructorFailsIfAsyncOperationIsInvalid) {
321  MetaSource s("source-id", m_meta_rr_client);
322  m_props.meta_sources = {s};
323  {
324  auto ops = m_ops;
325  ops.start = {};
326  ASSERT_THROW(DaqControllerImpl::Create(m_io_ctx, m_props, m_status, m_event_log, ops),
327  std::invalid_argument);
328  }
329  {
330  auto ops = m_ops;
331  ops.stop = {};
332  ASSERT_THROW(DaqControllerImpl::Create(m_io_ctx, m_props, m_status, m_event_log, ops),
333  std::invalid_argument);
334  }
335  {
336  auto ops = m_ops;
337  ops.abort = {};
338  ASSERT_THROW(DaqControllerImpl::Create(m_io_ctx, m_props, m_status, m_event_log, ops),
339  std::invalid_argument);
340  }
341 }
342 
343 
344 TEST_F(TestLifeCycle, ConstructorSucceedsIfSingleMetadataSourceIsUsed) {
345  MetaSource s("source-id", m_meta_rr_client);
346  m_props.meta_sources = {s};
347  DaqControllerImpl::Create(m_io_ctx, m_props, m_status, m_event_log, m_ops);
348 }
349 
350 
351 
352 TEST_F(TestLifeCycle, DestructionAbortsAsyncWait) {
353  MetaSource s("source-id", m_meta_rr_client);
354  m_props.meta_sources = {s};
355  boost::future<State> fut;
356  {
357  auto daq = DaqControllerImpl::Create(m_io_ctx, m_props, m_status, m_event_log, m_ops);
358  fut = daq->AwaitAsync({"source-id"}, 100ms);
359  ASSERT_FALSE(fut.is_ready());
360  }
361 
362  MakeTestProgress(m_io_ctx, &fut);
363  ASSERT_TRUE(fut.is_ready()) << "Future should have been cancelled since daq should have been deleted.";
364  EXPECT_TRUE(fut.has_exception());
365  EXPECT_THROW(fut.get(), DaqOperationAborted);
366 }
367 
368 
369 TEST_F(TestState, NotStarted) {
370  ASSERT_EQ(State::NotStarted, m_daq->GetState()) << "The initial state should be NotStarted";
371 }
372 
373 
374 TEST_F(TestState, GetStatusReturnsSameStatusObject) {
375  auto status_ptr = m_daq->GetStatus();
376  EXPECT_EQ(status_ptr.get(), m_status.get());
377 }
378 
379 
380 TEST_F(TestState, CannotStopStoppedDaqControllerImpl) {
381  SCOPED_TRACE("CannotStopStoppedDaqControllerImpl");
382  StartDaq();
383  StopDaq();
384 
385  ASSERT_EQ(State::Stopped, m_daq->GetState()) << "Setup failed";
386 
387  // Try to stop again
388  auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
389  EXPECT_TRUE(fut.has_exception());
390  EXPECT_THROW(fut.get(), std::exception);
391 }
392 
393 
394 TEST_F(TestState, CannotAbortStoppedDaqControllerImpl) {
395  SCOPED_TRACE("CannotAbortStoppedDaqControllerImpl");
396  StartDaq();
397  StopDaq();
398 
399  ASSERT_EQ(State::Stopped, m_daq->GetState()) << "Setup failed";
400 
401  // Try to stop again
402  auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
403  EXPECT_TRUE(fut.has_exception());
404  EXPECT_THROW(fut.get(), std::exception);
405 }
406 
407 TEST_F(TestState, StartAsyncFailsIfFitsControllerFails) {
408  // Setup
409  EXPECT_CALL(*m_mock_fits_ctl, Start()).WillOnce(Throw(std::runtime_error("error")));
410 
411  // Run
412  auto fut = m_daq->StartAsync();
413  EXPECT_EQ(State::Starting, m_daq->GetState());
414 
415  // Run async handlers
416  MakeTestProgress(m_io_ctx, &fut);
417 
418  ASSERT_TRUE(fut.is_ready());
419  EXPECT_TRUE(fut.has_exception()) << "Expected future to contain exception";
420  EXPECT_THROW(fut.get(), std::exception) << "Expected exception to derive from std::exception";
421  EXPECT_EQ(true, m_daq->GetErrorFlag());
422 }
423 
424 TEST_F(TestState, StartingFailsToSendStartDaqWillAbortAndSetErrorFlagAndStayInStarting) {
425  // Setup
426  EXPECT_CALL(*m_mock_fits_ctl, Start());
427  boost::promise<void> reply_promise;
428  EXPECT_CALL(*m_mock_ops, Start(_))
429  .WillOnce(Return(ByMove(reply_promise.get_future())));
430 
431  // Run
432  auto fut = m_daq->StartAsync();
433  EXPECT_EQ(State::Starting, m_daq->GetState());
434 
435  // Set up mock future so that it results in an exception exception.
436  reply_promise.set_exception(std::runtime_error("Fake test failure"));
437 
438  // Run async handlers
439  MakeTestProgress(m_io_ctx, &fut);
440 
441  ASSERT_TRUE(fut.is_ready());
442  EXPECT_TRUE(fut.has_exception()) << "Expected future to contain exception";
443  EXPECT_THROW(fut.get(), std::exception) << "Expected exception to derive from std::exception";
444  EXPECT_EQ(true, m_daq->GetErrorFlag());
445 }
446 
447 
448 TEST_F(TestState, StartAsyncReturnsExceptionalFutureInStateStarting) {
449  // Setup
450  EXPECT_CALL(*m_mock_fits_ctl, Start());
451  boost::promise<void> reply_promise;
452  EXPECT_CALL(*m_mock_ops, Start(_))
453  .WillOnce(Return(ByMove(reply_promise.get_future())));
454 
455  auto fut = m_daq->StartAsync();
456  ASSERT_EQ(State::Starting, m_daq->GetState());
457  EXPECT_FALSE(fut.is_ready());
458 
459  // Run
460  // @todo: Shouldn't this be communicated through the future?
461  auto fut2 = m_daq->StartAsync();
462  ASSERT_TRUE(fut2.has_exception());
463  EXPECT_THROW(fut2.get(), std::exception)
464  << "Multiple simultaneous start operations are not supported and an exception "
465  "was exected";
466 
467  // Complete pending operations to avoid "leaking" mock objects
468  // "Send reply"
469  reply_promise.set_value();
470 
471  // Make progress
472  MakeTestProgress(m_io_ctx, &fut);
473 }
474 
475 
476 /**
477  * It's possible to abort but not stop (and keep)
478  */
479 TEST_F(TestState, StopAsyncThrowsIfNotStarted) {
480  auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
481  EXPECT_THROW(fut.get(), std::exception)
482  << "It should not be possible to stop a data acquisition that has not started";
483 }
484 
485 TEST_F(TestState, StopAsyncDoesNotThrowWithTolerantPolicy) {
486  // Setup
487  StartDaq();
488 
489  std::optional<op::AsyncOpParams> params;
490  boost::promise<Result<DpParts>> reply_promise;
491  EXPECT_CALL(*m_mock_ops, Stop(ErrorPolicy::Tolerant, _))
492  .WillOnce(DoAll(Invoke([&](auto policy, auto p) { params.emplace(p); }),
493  Return(ByMove(reply_promise.get_future()))));
494  EXPECT_CALL(*m_mock_fits_ctl, UpdateKeywords(_));
495  EXPECT_CALL(*m_mock_fits_ctl,
496  Stop(ErrorPolicy::Tolerant)).WillOnce(Return(std::nullopt));
497 
498  // Run
499  auto fut = m_daq->StopAsync(ErrorPolicy::Tolerant);
500 
501  EXPECT_EQ(State::Stopping, m_daq->GetState())
502  << "Expected state to be in Stopping after requesting to stop";
503 
504  // "Send reply"
505  Result<DpParts> reply{true, {}};
506  reply_promise.set_value(reply);
507  ASSERT_TRUE(params);
508  // Since we are forcing stop it should be acceptable that sources are not stopped.
509  SetAllSourceState(*params, State::Stopping);
510 
511  // Execute handlers
512  MakeTestProgress(m_io_ctx, &fut);
513 
514  ASSERT_TRUE(fut.is_ready());
515  ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
516  auto status = fut.get();
517  EXPECT_EQ(State::Stopped, status.state)
518  << "Expected state to be Stopped since there were no errors";
519  EXPECT_TRUE(status.error) << "Error flag should be set since the reply_promise had an error";
520  EXPECT_EQ(State::Stopped, m_daq->GetState());
521 }
522 
523 /**
524  * It should be possible to abort a data acquisition even if it's not started.
525  */
526 TEST_F(TestState, AbortAsyncIsOkIfNotStarted) {
527  auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
528  ASSERT_TRUE(fut.is_ready())
529  << "Aborting a NotStarted data acquisition should be ready immediately";
530  EXPECT_FALSE(fut.has_exception()) << "Future should not have failed";
531 
532  auto result = fut.get();
533  EXPECT_EQ(State::Aborted, result.state) << "Unexpected state";
534  EXPECT_FALSE(result.error);
535  EXPECT_EQ(State::Aborted, m_daq->GetState());
536 }
537 
538 
539 /**
540  * It's possible to abort but not stop (and keep) if data acquisition is starting
541  */
542 TEST_F(TestState, StopAsyncThrowsIfStarting) {
543  // Setup
544  EXPECT_CALL(*m_mock_fits_ctl, Start());
545  boost::promise<void> reply_promise;
546  EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
547 
548  auto fut = m_daq->StartAsync();
549  ASSERT_EQ(State::Starting, m_daq->GetState())
550  << "Setup failed, unexpected state, aborting test";
551  ASSERT_FALSE(fut.is_ready());
552 
553  // Run
554  {
555  auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
556  EXPECT_TRUE(fut.has_exception())
557  << "Cannot stop unless DAQ is in State::Acquiring, current state: "
558  << m_daq->GetState();
559 
560  EXPECT_THROW(fut.get(), std::exception)
561  << "Cannot stop if data acquisition is `Starting`. An exeption was expected";
562  }
563  // Complete pending operations to avoid "leaking" mock objects
564  // "Send reply"
565  reply_promise.set_value();
566 
567  // Make progress
568  MakeTestProgress(m_io_ctx, &fut);
569 }
570 
571 
572 /**
573  * Test sequence:
574  *
575  * 1. Send StartDaq
576  * 2. Send AbortDaq
577  * 3. StartDaq still succeeds in this case (simulates serial handling of client requests at source).
578  * 4. AbortDaq suceeds.
579  */
580 TEST_F(TestState, AbortingIsOkWhenStarting) {
581  SCOPED_TRACE("AbortingIsOkWhenStarting");
582  // Setup
583  EXPECT_CALL(*m_mock_fits_ctl, Start());
584  // Set up mock so that StartDaq invocation returns the future from our promise.
585  boost::promise<void> start_promise;
586  EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(start_promise.get_future())));
587 
588  // Run
589  //
590  // Start data acquisition
591  auto start_fut = m_daq->StartAsync();
592  ASSERT_EQ(State::Starting, m_daq->GetState());
593  EXPECT_FALSE(start_fut.is_ready());
594 
595  // Setup
596  // And ditto for Abort
597  boost::promise<Result<void>> abort_promise;
598  EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
599  .WillOnce(Return(ByMove(abort_promise.get_future())));
600  EXPECT_CALL(*m_mock_fits_ctl, Abort(ErrorPolicy::Strict));
601 
602  // Run
603  //
604  // Abort data acquisition
605  auto abort_fut = m_daq->AbortAsync(ErrorPolicy::Strict);
606 
607  // Complete pending operations to avoid "leaking" mock objects
608  // "Send reply"
609  start_promise.set_value();
610 
611  // Make progress
612  MakeTestProgress(m_io_ctx, &start_fut);
613 
614  ASSERT_TRUE(start_fut.is_ready()) << "Cannot proceed with test since future is not ready";
615  EXPECT_FALSE(start_fut.has_exception())
616  << "Mock did not simulate failure so future should be ok";
617  // @todo: What state do we expect to be in?
618 
619  // Complete pending operations to avoid "leaking" mock objects
620  // "Send reply"
621  abort_promise.set_value({ false });
622 
623  // Make progress
624  MakeTestProgress(m_io_ctx, &abort_fut);
625 
626  ASSERT_TRUE(abort_fut.is_ready()) << "Cannot proceed with test since future is not ready";
627  EXPECT_FALSE(abort_fut.has_exception())
628  << "Mock didn't simulate failure so future should be OK";
629  auto result = abort_fut.get();
630  EXPECT_EQ(State::Aborted, result.state);
631  EXPECT_FALSE(result.error);
632  EXPECT_EQ(State::Aborted, m_daq->GetState());
633 }
634 
635 
636 TEST_F(TestState, StartAsyncCompletesSuccessfully) {
637  SCOPED_TRACE("Acquiring");
638 
639  StartDaq();
640 }
641 
642 
643 TEST_F(TestState, AbortDaqControllerImplInStateAborting) {
644  // Setup
645  SCOPED_TRACE("AbortDaqControllerImplInStateAborting");
646  StartDaq();
647 
648  ASSERT_EQ(State::Acquiring, m_daq->GetState()) << "Test Setup failed";
649 
650  AbortDaq();
651  ASSERT_EQ(State::Aborted, m_daq->GetState()) << "Test setup failed";
652 
653  // Test that abort fails if daq is already aborted
654  auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
655  ASSERT_TRUE(fut.is_ready());
656  EXPECT_THROW(fut.get(), std::runtime_error);
657 }
658 
659 
660 TEST_F(TestState, AbortDaqControllerImplInStateStarting) {
661  // Setup
662  SCOPED_TRACE("AbortDaqControllerImplInStateStarting");
663  StartDaq();
664  ASSERT_EQ(State::Acquiring, m_daq->GetState()) << "Test Setup failed";
665 
666  // Test
667  AbortDaq();
668  EXPECT_EQ(State::Aborted, m_daq->GetState());
669 }
670 
671 TEST_F(TestState, AbortAsyncReturnsWithErrorInsteadOfExceptionForTolerantPolicy) {
672  // Setup
673  SCOPED_TRACE("NewAbortSupersedesFailedAbort");
674  StartDaq();
675 
676  boost::promise<Result<void>> abort_promise_1;
677 
678  // Expect two calls to abort since the first one will fail
679  EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Tolerant, _))
680  .WillOnce(Return(ByMove(abort_promise_1.get_future())));
681 
682  EXPECT_CALL(*m_mock_fits_ctl,
683  Abort(ErrorPolicy::Tolerant)).WillOnce(Throw(std::runtime_error("error")));
684 
685  // Run
686  auto fut1 = m_daq->AbortAsync(ErrorPolicy::Tolerant);
687 
688  EXPECT_EQ(State::Aborting, m_daq->GetState())
689  << "Expected state to be in Stopping after requesting to abort";
690 
691  // "Send reply1" where first source fails and second is ok.
692  abort_promise_1.set_value({ true });
693  MakeTestProgress(m_io_ctx, &fut1);
694 
695  ASSERT_TRUE(fut1.has_value());
696 
697  auto result1 = fut1.get();
698  EXPECT_EQ(State::Aborted, result1.state);
699  EXPECT_TRUE(result1.error);
700  EXPECT_EQ(State::Aborted, m_daq->GetState());
701 }
702 
703 
704 /**
705  * It is possible to abort even though an abort operation has already been started.
706  * Nothing special happens in this case though.
707  * @todo The command bein superseeded should probably fail.
708  */
709 TEST_F(TestState, NewAbortSupersedesSuccessfulAbort) {
710  // Setup
711  SCOPED_TRACE("NewAbortSupersedesSuccessfulAbort");
712  StartDaq();
713 
714 
715  // First abort
716  boost::promise<Result<void>> abort_promise_1;
717  // Second abort
718  boost::promise<Result<void>> abort_promise_2;
719 
720  // Expect two calls to abort
721  EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
722  .Times(2)
723  .WillOnce(Return(ByMove(abort_promise_1.get_future())))
724  .WillOnce(Return(ByMove(abort_promise_2.get_future())));
725  EXPECT_CALL(*m_mock_fits_ctl, Abort(ErrorPolicy::Strict));
726 
727  // Run
728  // Launch async operations concurrently
729  auto fut1 = m_daq->AbortAsync(ErrorPolicy::Strict);
730  auto fut2 = m_daq->AbortAsync(ErrorPolicy::Strict);
731 
732  EXPECT_EQ(State::Aborting, m_daq->GetState())
733  << "Expected state to be in Stopping after requesting to abort";
734 
735  // "Send reply1" to cause interleaving
736  abort_promise_1.set_value({ false });
737  MakeTestProgress(m_io_ctx, &fut1);
738 
739  ASSERT_TRUE(fut1.is_ready());
740  ASSERT_FALSE(fut1.has_exception()) << "Future has unexpected exception!";
741  auto result1 = fut1.get();
742  EXPECT_EQ(State::Aborted, result1.state);
743  EXPECT_FALSE(result1.error);
744  EXPECT_EQ(State::Aborted, m_daq->GetState());
745 
746  // "Send reply2"
747  abort_promise_2.set_value({ false });
748  MakeTestProgress(m_io_ctx, &fut2);
749  auto result2 = fut2.get();
750  EXPECT_EQ(State::Aborted, result2.state);
751  EXPECT_FALSE(result2.error);
752  EXPECT_EQ(State::Aborted, m_daq->GetState());
753 }
754 
755 
756 TEST_F(TestState, NewAbortSupersedesFailedAbortWithStrictPolicy) {
757  // Setup
758  SCOPED_TRACE("NewAbortSupersedesFailedAbort");
759  StartDaq();
760 
761  boost::promise<Result<void>> abort_promise_1;
762  // Second abort
763  boost::promise<Result<void>> abort_promise_2;
764 
765  // Expect two calls to abort since the first one will fail
766  EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
767  .Times(2)
768  .WillOnce(Return(ByMove(abort_promise_1.get_future())))
769  .WillOnce(Return(ByMove(abort_promise_2.get_future())));
770 
771  // Aborting local FITS production is expected _once_ as it is not executed in parallel
772  // with all other metadata sources
773  EXPECT_CALL(*m_mock_fits_ctl, Abort(ErrorPolicy::Strict)).Times(1);
774 
775  // Run
776  auto fut1 = m_daq->AbortAsync(ErrorPolicy::Strict);
777 
778  EXPECT_EQ(State::Aborting, m_daq->GetState())
779  << "Expected state to be in Stopping after requesting to abort";
780 
781  // "Send reply1" where first source fails and second is ok.
782  abort_promise_1.set_exception(DaqSourceErrors(std::vector<std::exception_ptr>()));
783  MakeTestProgress(m_io_ctx, &fut1);
784 
785  ASSERT_TRUE(fut1.is_ready());
786  ASSERT_TRUE(fut1.has_exception()) << "Future has unexpected exception!";
787  EXPECT_THROW(fut1.get(), DaqSourceErrors);
788  EXPECT_EQ(State::Aborting, m_daq->GetState());
789 
790  // Abort again, this time it works.
791  auto fut2 = m_daq->AbortAsync(ErrorPolicy::Strict);
792 
793  // "Send reply2"
794  abort_promise_2.set_value({ false });
795 
796  MakeTestProgress(m_io_ctx, &fut2);
797  ASSERT_TRUE(fut2.has_value());
798 
799  auto result2 = fut2.get();
800  EXPECT_EQ(State::Aborted, result2.state);
801  EXPECT_FALSE(result2.error);
802  EXPECT_EQ(State::Aborted, m_daq->GetState());
803 }
804 
805 
806 TEST_F(TestState, StopDaqControllerImplSuccessfully) {
807  // Setup
808  SCOPED_TRACE("StopDaqControllerImplSuccessfully");
809  StartDaq();
810 
811  ASSERT_EQ(State::Acquiring, m_daq->GetState()) << "Test Setup failed";
812 
813  StopDaq();
814 }
815 
816 
817 TEST_F(TestAwait, AwaitNonExistantSourceFails) {
818  // Run
819  auto fut = m_daq->AwaitAsync({"non-existant"}, 0ms);
820  ASSERT_TRUE(fut.has_exception());
821  EXPECT_THROW(fut.get(), std::invalid_argument);
822 }
823 
824 
825 TEST_F(TestAwait, AwaitTimeout) {
826  // Run
827  auto fut = m_daq->AwaitAsync({"meta-source-1"}, 1ms);
828  MakeTestProgress(m_io_ctx, &fut);
829 
830  ASSERT_TRUE(fut.has_exception());
831  EXPECT_THROW(fut.get(), DaqOperationTimeout);
832 }
833 
834 
835 TEST_F(TestAwait, AwaitStopSingleSourceIsOk) {
836  SCOPED_TRACE("AwaitSingleSourceIsOk");
837  // Setup
838  auto fut = m_daq->AwaitAsync({"meta-source-1"}, 150ms);
839  EXPECT_FALSE(fut.is_ready())
840  << "The future shouldn't be ready yet as we haven't started the data acquisition!";
841 
842  // Run
843  StartDaq();
844  EXPECT_FALSE(fut.is_ready()) << "Wait condition not fulfilled, so future should not be ready yet";
845  StopDaq();
846 
847  ASSERT_TRUE(fut.is_ready());
848  ASSERT_FALSE(fut.has_exception());
849  auto state = fut.get();
850  EXPECT_TRUE(state == State::Stopping || state == State::Stopped)
851  << "Await condition should have been ready once source is stopped (meaning that DAQ is "
852  "Stopping or Stopped, depending on the order of the source)";
853 }
854 
855 
856 TEST_F(TestAwait, AwaitAbortAllMetadataSources) {
857  SCOPED_TRACE("AwaitSingleSourceIsOk");
858  // Setup
859  auto fut = m_daq->AwaitAsync({"meta-source-1", "meta-source-2"}, 150ms);
860  EXPECT_FALSE(fut.is_ready())
861  << "The future shouldn't be ready yet as we haven't started the data acquisition!";
862 
863  // Run
864  StartDaq();
865  EXPECT_FALSE(fut.is_ready());
866 
867  AbortDaq();
868 
869  EXPECT_TRUE(fut.is_ready());
870  ASSERT_FALSE(fut.has_exception());
871  auto state = fut.get();
872  EXPECT_TRUE(state == State::Aborting || state == State::Aborted)
873  << "Await condition should have been ready once source is stopped (meaning that DAQ is "
874  "Aborting or Aborted, depending on the order of the source)";
875 }
876 
877 
878 TEST_F(TestAwait, AwaitStopSingleSourceWhenConditionIsFulfilled) {
879  SCOPED_TRACE("AwaitSingleSourceIsOk");
880  // Setup
881  // Run
882  StartDaq();
883  StopDaq();
884 
885  auto fut = m_daq->AwaitAsync({"meta-source-1"}, 150ms);
886 
887  EXPECT_TRUE(fut.is_ready()) << "Condition already fulfilled so future should be ready";
888  ASSERT_FALSE(fut.has_exception());
889  EXPECT_EQ(State::Stopped, fut.get());
890 }
891 
892 TEST_F(TestNotStarted, CanUpdateKeywords) {
893  // Setup
894 
895  // Run
896  m_daq->UpdateKeywords(m_keywords);
897 
898  EXPECT_EQ(m_keywords, m_daq->GetStatus()->GetKeywords());
899 }
900 
901 
902 TEST_F(TestStopped, CannotUpdateKeywordsInStopped) {
903  // Setup
904 
905  // Run
906  EXPECT_THROW(m_daq->UpdateKeywords(m_keywords), std::runtime_error);
907 }
908 
909 TEST_F(TestDaqControllerAwait, StartWillAwait) {
910  SCOPED_TRACE("StartWillAwait");
911  // Setup
912  // Run
913  StartDaq();
914 }
915 
916 /**
917  * Tests that DaqController automatically stops DAQ when the await-op completes.
918  */
920  SCOPED_TRACE("AutomaticStop");
921  // Setup
922  StartDaq();
923 
924  // Run
925  // DaqController is monitoring the completion of all primary data sources.
926  // By setting the value we simulate that completion. This should then trigger StopDaq.
927  DpPart prim_part{"s1", "/tmp/file.fits"};
928  m_await_promise.set_value({false, {prim_part}});
929 
930  // Setup expectations for stopping
931  Result<DpParts> stop_op_reply {false, m_files};
932  EXPECT_CALL(*m_mock_fits_ctl, UpdateKeywords(_));
933  EXPECT_CALL(*m_mock_fits_ctl, Stop(ErrorPolicy::Strict)).WillOnce(Return(DpPart("OCM",
934  "baz")));
935  EXPECT_CALL(*m_mock_ops, Stop(ErrorPolicy::Strict, _))
936  .WillOnce(Return(ByMove(boost::make_ready_future<Result<DpParts>>(stop_op_reply))));
937  ASSERT_EQ(State::Acquiring, this->m_status->GetState());
938  EXPECT_EQ(0u, m_status->GetFiles().size());
939 
940  // There's no future to await-on, so we run until the observed state changes instead.
941  MakeTestProgressUntil(m_io_ctx,
942  [this]() -> bool {
943  return this->m_status->GetState() == State::Stopped;
944  });
945  EXPECT_FALSE(m_status->GetError());
946  EXPECT_EQ(3u, m_status->GetFiles().size()) << "One from m_files (metadata), "
947  "1 from FitsController and 1 from primary";
948  EXPECT_THAT(m_status->GetFiles(), Contains(prim_part));
949 }
TestLifeCycle::m_meta_rr_client
std::shared_ptr< MetaSource::RrClient > m_meta_rr_client
Definition: testDaqController.cpp:64
daq::State
State
Observable states of the data acquisition process.
Definition: state.hpp:41
daq::op::AsyncOpParams::prim_sources
std::vector< Source< PrimSource > > & prim_sources
Note: Consider vector immutable!
Definition: asyncOpParams.hpp:54
abort.hpp
Contains declaration for the AbortAsync operation.
SetAllSourceState
void SetAllSourceState(op::AsyncOpParams &params, State state)
Definition: testDaqController.cpp:77
TestLifeCycle::TestLifeCycle
TestLifeCycle()
Definition: testDaqController.cpp:42
initiate.hpp
Contains declarations for the helper functions to initiate operations.
TestState::m_mock_fits_ctl
FitsControllerMock * m_mock_fits_ctl
Definition: testDaqController.cpp:97
utils.hpp
Defines shared test utilities.
TestState::AbortDaq
void AbortDaq()
Definition: testDaqController.cpp:188
metadaqifMock.hpp
Mockup of metadaqif classes.
daq::op::AsyncOpParams::meta_sources
std::vector< Source< MetaSource > > & meta_sources
Note: Consider vector immutable!a.
Definition: asyncOpParams.hpp:55
TestStopped::PreDaqControllerHook
void PreDaqControllerHook()
Definition: testDaqController.cpp:269
daq::MetaSource
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:139
TestNotStarted
Definition: testDaqController.cpp:260
TestState::SetUp
void SetUp() override
Definition: testDaqController.cpp:115
TestState::m_meta_rr_client
std::shared_ptr< MetaDaqAsyncMock > m_meta_rr_client
Definition: testDaqController.cpp:93
TestState::m_props
DaqProperties m_props
Definition: testDaqController.cpp:104
start.hpp
Contains declaration for the StartAsync operation.
stop.hpp
Contains declaration for the StopAsync operation.
daq::DaqProperties
Structure carrying properties needed to start a DataAcquisition.
Definition: daqProperties.hpp:28
daq::DaqSourceErrors
Exception thrown to carry reply errors.
Definition: error.hpp:84
daq::DaqControllerImpl::Create
static std::shared_ptr< DaqControllerImpl > Create(boost::asio::io_context &io_context, DaqProperties properties, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, AsyncOperations operations)
Construct object.
Definition: daqController.cpp:86
daq::op::AbortAsync
A composite async operation that aborts a DAQ.
Definition: abort.hpp:26
daq::TEST_F
TEST_F(TestSource, Constructors)
Definition: testSource.cpp:34
recifMock.hpp
Mockup of metadaqif classes.
daq::DaqProperties::meta_sources
std::vector< MetaSource > meta_sources
Definition: daqProperties.hpp:50
daq::DaqOperationTimeout
Started operation timed out.
Definition: error.hpp:57
daq::ObservableStatus
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:68
TestState::m_mock_ops
std::unique_ptr< MockAsyncOperations > m_mock_ops
Definition: testDaqController.cpp:95
daq::op::InitiateOperation
R InitiateOperation(Params &&... params)
Constructs and initiates Op and return the future result.
Definition: initiate.hpp:66
daq::fits::UpdateKeywords
void UpdateKeywords(KeywordVector &to, KeywordVector const &from)
Updates a with keywords from b.
Definition: keyword.cpp:120
TestDaqControllerAwait::PreDaqControllerHook
void PreDaqControllerHook()
Definition: testDaqController.cpp:274
daq::op::AwaitOpParams
Await specific parameters that is not provided with AsyncOpParams.
Definition: asyncOpParams.hpp:64
SetSourceState
void SetSourceState(Iterator begin, Iterator end, State state)
Definition: testDaqController.cpp:70
TestState::m_prim_rr_client
std::shared_ptr< PrimSource::RrClient > m_prim_rr_client
Definition: testDaqController.cpp:92
daq
Definition: daqController.cpp:18
daq::op::AwaitPrimAsync
A composite async operation that awaits primary data sources.
Definition: awaitPrim.hpp:56
TestDaqControllerAwait::PreStartAsyncHook
void PreStartAsyncHook()
Definition: testDaqController.cpp:279
TestState::m_keywords
fits::KeywordVector m_keywords
Definition: testDaqController.cpp:106
TestStopped
Definition: testDaqController.cpp:268
TestLifeCycle::m_prim_rr_client
std::shared_ptr< PrimSource::RrClient > m_prim_rr_client
Definition: testDaqController.cpp:63
TestLifeCycle::m_ops
AsyncOperations m_ops
Definition: testDaqController.cpp:65
daq::fits::BasicKeyword
A type safe version of FormattedKeyword that consist of the three basic components of a FITS keyword ...
Definition: keyword.hpp:51
TestLifeCycle::m_status
std::shared_ptr< ObservableStatus > m_status
Definition: testDaqController.cpp:61
TestState::TestState
TestState()
Definition: testDaqController.cpp:108
TestState::PreStartAsyncHook
virtual void PreStartAsyncHook()
Definition: testDaqController.cpp:155
TestAwait
Definition: testDaqController.cpp:258
TestState::m_event_log
std::shared_ptr< ObservableEventLog > m_event_log
Definition: testDaqController.cpp:102
TestState::m_observer
StatusObserverMock m_observer
Definition: testDaqController.cpp:100
TestAcquiring
Definition: testDaqController.cpp:262
daq::DaqOperationAborted
Started operation was aborted.
Definition: error.hpp:47
daq::DaqProperties::id
std::string id
Definition: daqProperties.hpp:35
TEST
TEST(TestBoost, Unwrap)
Definition: testDaqController.cpp:288
daq::Result
Utility class that represents a result and an error.
Definition: utility.hpp:17
daqController.hpp
Contains declaration for for DaqController.
statusObserver.hpp
TestAcquiring::PreDaqControllerHook
void PreDaqControllerHook()
Definition: testDaqController.cpp:263
TestState::TearDown
void TearDown() override
Definition: testDaqController.cpp:146
daq::DpPart
Provides information of the location and origin of a FITS file or keywords produced by a data acquisi...
Definition: dpPart.hpp:26
StatusObserverMock
Simple observer used for testing.
Definition: statusObserver.hpp:22
TestLifeCycle::m_io_ctx
boost::asio::io_context m_io_ctx
Definition: testDaqController.cpp:60
TestState::m_status
std::shared_ptr< ObservableStatus > m_status
Definition: testDaqController.cpp:101
awaitPrim.hpp
Contains declaration for the AwaitPrimAsync operation.
TestState::PreDaqControllerHook
virtual void PreDaqControllerHook()
Definition: testDaqController.cpp:154
daq::op::InitiateAbortableOperation
std::pair< R, std::function< bool()> > InitiateAbortableOperation(Params &&... params)
Like InitiateOperation but in addition to returning the future it also returns an unspecified object ...
Definition: initiate.hpp:75
TestState
Developer notes: DaqControllerImpl use boost::when_all to compose futures.
Definition: testDaqController.cpp:91
TestState::StopDaq
void StopDaq()
Definition: testDaqController.cpp:220
daq::fits::KeywordVector
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:138
CreateMockAsyncOperations
std::tuple< std::unique_ptr< MockAsyncOperations >, daq::AsyncOperations > CreateMockAsyncOperations()
Definition: mockAsyncOperations.hpp:45
daq::op::AsyncOpParams
Parameters required for each async operation.
Definition: asyncOpParams.hpp:23
mockAsyncOperations.hpp
TestDaqControllerAwait
Definition: testDaqController.cpp:273
TestState::m_meta_rr_client2
std::shared_ptr< MetaDaqAsyncMock > m_meta_rr_client2
Definition: testDaqController.cpp:94
TestLifeCycle::m_props
DaqProperties m_props
Definition: testDaqController.cpp:66
daq::ObservableEventLog
Stores data acquisition status and allows subscription to status changes.
Definition: eventLog.hpp:107
TestLifeCycle
Fixture for daq::DaqController life cycle tests.
Definition: testDaqController.cpp:40
daq::State::NotStarted
@ NotStarted
Initial state of data acquisition.
daq::ErrorPolicy
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
fitsController.hpp
Mock of FitsController.
TestState::m_files
std::vector< DpPart > m_files
Definition: testDaqController.cpp:105
daq::ErrorPolicy::Strict
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
TestLifeCycle::m_event_log
std::shared_ptr< ObservableEventLog > m_event_log
Definition: testDaqController.cpp:62
daq::AsyncOperations
Async operations.
Definition: daqController.hpp:46
MakeTestProgressUntil
void MakeTestProgressUntil(boost::asio::io_context &io_ctx, Predicate &&pred, std::chrono::milliseconds timeout=std::chrono::seconds(3))
Executes io_ctx::poll until pred returns true or it times out.
Definition: utils.hpp:21
TestState::m_io_ctx
boost::asio::io_context m_io_ctx
Definition: testDaqController.cpp:99
daq::DaqProperties::prim_sources
std::vector< PrimSource > prim_sources
Definition: daqProperties.hpp:49
TestDaqControllerAwait::m_await_promise
boost::promise< Result< DpParts > > m_await_promise
Definition: testDaqController.cpp:284
daq::FitsControllerMock
Mock version of daq::FitsController.
Definition: fitsController.hpp:21
TestState::StartDaq
void StartDaq()
Executes a successful StartAsync() call.
Definition: testDaqController.cpp:160
error.hpp
Contains error related declarations for DAQ.
daq::PrimSource
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:96
daq::op::StopAsync
A composite async operation that starts DAQ.
Definition: stop.hpp:27
TestState::m_daq
std::shared_ptr< DaqControllerImpl > m_daq
Definition: testDaqController.cpp:103
MakeTestProgress
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Definition: utils.hpp:42