ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
testOcmDaqController.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_libdaq_test
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Unit test for `daq::OcmDaqController`
7  */
8 // NOLINT
9 #include <memory>
10 #include <stdexcept>
11 #include <thread>
12 
13 #include <gtest/gtest.h>
14 
15 #include <daq/daqController.hpp>
16 #include <daq/error.hpp>
17 #include <daq/op/abort.hpp>
18 #include <daq/op/awaitPrim.hpp>
19 #include <daq/op/initiate.hpp>
20 #include <daq/op/start.hpp>
21 #include <daq/op/stop.hpp>
22 
23 #include "mock/metadaqifMock.hpp"
25 #include "mock/recifMock.hpp"
26 #include "statusObserver.hpp"
27 #include "utils.hpp"
28 
29 using namespace daq;
30 using namespace ::testing;
31 using namespace std::chrono;
32 
33 /**
34  * Fixture for daq::DaqController life cycle tests
35  *
36  * @ingroup daq_ocm_libdaq_test
37  */
38 class TestOcmDaqControllerLifeCycle : public ::testing::Test {
39 public:
41  : m_io_ctx()
42  , m_status(std::make_shared<ObservableStatus>("id", "fileid"))
43  , m_event_log(std::make_shared<ObservableEventLog>()) {
44  m_prim_rr_client = std::make_shared<RecCmdsAsyncMock>();
45  m_meta_rr_client = std::make_shared<MetaDaqAsyncMock>();
46  m_ops.start = op::InitiateOperation<op::StartAsync, boost::future<void>, op::AsyncOpParams>;
48  boost::future<Result<DpParts>>,
52  boost::future<Result<void>>,
56  boost::future<Result<DpParts>>,
58  m_context.id = "id";
59  }
60  boost::asio::io_context m_io_ctx; // NOLINT
61  std::shared_ptr<ObservableStatus> m_status; // NOLINT
62  std::shared_ptr<ObservableEventLog> m_event_log; // NOLINT
63  std::shared_ptr<PrimSource::RrClient> m_prim_rr_client; // NOLINT
64  std::shared_ptr<MetaSource::RrClient> m_meta_rr_client; // NOLINT
66  DaqContext m_context; // NOLINT
68 };
69 
70 template <class Iterator>
71 void SetSourceState(Iterator begin, Iterator end, State state) {
72  auto it = begin;
73  for (; it != end; ++it) {
74  it->SetState(state);
75  }
76 }
77 
79  SetSourceState(params.meta_sources.begin(), params.meta_sources.end(), state);
80  SetSourceState(params.prim_sources.begin(), params.prim_sources.end(), state);
81 }
82 
83 /**
84  *
85  * Developer notes:
86  * OcmDaqController use boost::when_all to compose futures. This does not support executors and will
87  * spawn a thread to perform the work. This means that the tests will either have to block
88  * indefinitely with future::get() or use a timeout.
89  *
90  * @ingroup daq_ocm_libdaq_test
91  */
92 struct TestState : ::testing::Test {
93  std::shared_ptr<PrimSource::RrClient> m_prim_rr_client; // NOLINT
94  std::shared_ptr<MetaDaqAsyncMock> m_meta_rr_client; // NOLINT
95  std::shared_ptr<MetaDaqAsyncMock> m_meta_rr_client2; // NOLINT
96  std::unique_ptr<MockAsyncOperations> m_mock_ops; // NOLINT
97 
98  boost::asio::io_context m_io_ctx; // NOLINT
100  std::shared_ptr<ObservableStatus> m_status; // NOLINT
101  std::shared_ptr<ObservableEventLog> m_event_log; // NOLINT
102  std::shared_ptr<OcmDaqController> m_daq; // NOLINT
105  std::vector<DpPart> m_files; // NOLINT
107 
109  : m_io_ctx()
110  , m_status(std::make_shared<ObservableStatus>("id", "fileid"))
111  , m_event_log(std::make_shared<ObservableEventLog>()) {
112  }
113 
114  void SetUp() override {
115  m_files.emplace_back("foo", "bar");
116  m_keywords.emplace_back(fits::EsoKeyword("FOO", "BAR"));
117  m_keywords.emplace_back(fits::ValueKeyword("FOO", "BAR"));
118 
119  m_prim_rr_client = std::make_shared<RecCmdsAsyncMock>();
120  m_meta_rr_client = std::make_shared<MetaDaqAsyncMock>();
121  m_meta_rr_client2 = std::make_shared<NiceMock<MetaDaqAsyncMock>>();
122  auto tup = CreateMockAsyncOperations();
123  m_mock_ops.swap(std::get<std::unique_ptr<MockAsyncOperations>>(tup));
124 
125  // Connect listener
126  // @todo: Add expectations for observer
127  // m_status->ConnectObserver(std::reference_wrapper(m_observer));
128 
129  MetaSource s1("meta-source-1", m_meta_rr_client);
130  MetaSource s2("meta-source-2", m_meta_rr_client2);
131  m_context.id = "id";
132  m_sources.GetPrimarySources() = std::vector<daq::PrimSource>{},
133  m_sources.GetMetadataSources() = std::vector<daq::MetaSource>{s1, s2},
134 
135  PreDaqControllerHook();
136  m_daq = std::make_shared<daq::OcmDaqController>(m_io_ctx,
137  m_context,
138  m_sources,
139  m_status,
140  m_event_log,
141  std::get<OcmAsyncOperations>(tup));
142 
143  ASSERT_TRUE(m_daq);
144  ASSERT_EQ(m_status->GetState(), m_daq->GetState());
145  }
146 
147  void TearDown() override {
148  m_daq.reset();
149  m_meta_rr_client.reset();
150  m_meta_rr_client2.reset();
151  m_prim_rr_client.reset();
152  }
153 
154  virtual void PreDaqControllerHook() {
155  }
156  virtual void PreStartAsyncHook() {
157  }
158 
159  /**
160  * Executes a successful StartAsync() call
161  */
162  void StartDaq() {
163  // Setup
164  // Set up mock so that op::StartAsync invocation returns the future from our promise.
165  boost::promise<void> reply_promise;
166  std::optional<op::AsyncOpParams> params;
167  EXPECT_CALL(*m_mock_ops, Start(_))
168  .WillOnce(DoAll(Invoke([&](auto p) { params.emplace(p); }),
169  Return(ByMove(reply_promise.get_future()))));
170  PreStartAsyncHook();
171 
172  // Run
173  auto fut = m_daq->StartAsync();
174  EXPECT_EQ(State::Starting, m_daq->GetState());
175  EXPECT_FALSE(fut.is_ready());
176 
177  // "Send reply"
178  reply_promise.set_value();
179  ASSERT_TRUE(params);
181 
182  // Execute scheduled handlers
183  MakeTestProgress(m_io_ctx, &fut);
184 
185  ASSERT_TRUE(fut.is_ready());
186  EXPECT_EQ(State::Acquiring, fut.get());
187  }
188 
189  void AbortDaq() {
190  // Setup
191  // Set up mock so that op::StartAsync invocation returns the future from our promise.
192  std::optional<op::AsyncOpParams> params;
193  boost::promise<Result<void>> reply_promise;
194  EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
195  .WillOnce(DoAll(Invoke([&](auto policy, auto p) { params.emplace(p); }),
196  Return(ByMove(reply_promise.get_future()))));
197 
198  // Run
199  auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
200 
201  EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
202  << "Expected state to be in Stopping after requesting to abort";
203 
204  // "Send reply"
205  reply_promise.set_value({false});
206  ASSERT_TRUE(params);
208 
209  // Execute handlers
210  MakeTestProgress(m_io_ctx, &fut);
211 
212  ASSERT_TRUE(fut.is_ready());
213  ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
214  auto result = fut.get();
215  EXPECT_EQ(State::Aborted, result.state);
216  EXPECT_FALSE(result.error);
217  EXPECT_EQ(State::Aborted, m_daq->GetState());
218  }
219 
220  void StopDaq() {
221  std::optional<op::AsyncOpParams> params;
222  boost::promise<Result<DpParts>> reply_promise;
223 
224  EXPECT_CALL(*m_mock_ops, Stop(ErrorPolicy::Strict, _))
225  .WillOnce(DoAll(Invoke([&](auto policy, auto p) { params.emplace(p); }),
226  Return(ByMove(reply_promise.get_future()))));
227 
228  // Run
229  auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
230 
231  EXPECT_EQ(State::Stopping, m_daq->GetState())
232  << "Expected state to be in Stopping after requesting to stop";
233 
234  // "Send reply"
235  Result<DpParts> reply{false, m_files};
236  reply_promise.set_value(reply);
237  ASSERT_TRUE(params);
239 
240  // Execute handlers
241  MakeTestProgress(m_io_ctx, &fut);
242 
243  ASSERT_TRUE(fut.is_ready());
244  ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
245  auto status = fut.get();
246  EXPECT_EQ(State::Stopped, status.state)
247  << "Expected state to be Stopped since there were no errors";
248  EXPECT_FALSE(status.error);
249  EXPECT_EQ(State::Stopped, m_daq->GetState());
250  }
251 };
252 
253 struct TestAwait : TestState {};
254 
256 
259  m_status->SetState(State::Acquiring);
260  }
261 };
262 
265  m_status->SetState(State::Stopped);
266  }
267 };
268 
271  // Add a primary source, which was not needed for other tests
272  PrimSource s1("prim-source-1", m_prim_rr_client);
273  m_sources.GetPrimarySources() = std::vector<daq::PrimSource>{s1};
274  }
276  EXPECT_CALL(*m_mock_ops, AwaitPrim(_))
277  .WillOnce(Return(ByMove(m_await_promise.get_future())));
278  }
279 
280  boost::promise<Result<DpParts>> m_await_promise; // NOLINT
281 };
282 
283 // Simple test to understand boost::future::unwrap()
284 TEST(TestBoost, Unwrap) {
285  EXPECT_THROW(boost::make_ready_future()
286  .then([](auto f) -> boost::future<void> {
287  try {
288  throw std::runtime_error("Meow");
289  } catch (...) {
290  return boost::make_exceptional_future<void>();
291  }
292  })
293  .unwrap()
294  .get(),
295  std::runtime_error);
296 }
297 
298 TEST_F(TestOcmDaqControllerLifeCycle, ConstructorFailsIfNoSourcesAreProvided) {
299  ASSERT_THROW(
300  OcmDaqController::Create(m_io_ctx, m_context, m_sources, m_status, m_event_log, m_ops),
301  std::invalid_argument);
302 }
303 
304 TEST_F(TestOcmDaqControllerLifeCycle, ConstructorFailsObservableStatusIdDoesNotMatchDaqContextId) {
305  MetaSource s("source-id", m_meta_rr_client);
306  m_sources.GetMetadataSources() = {s};
307  m_context.id = "not-id";
308  ASSERT_THROW(
309  OcmDaqController::Create(m_io_ctx, m_context, m_sources, m_status, m_event_log, m_ops),
310  std::invalid_argument);
311 }
312 
313 TEST_F(TestOcmDaqControllerLifeCycle, ConstructorFailsIfAsyncOperationIsInvalid) {
314  MetaSource s("source-id", m_meta_rr_client);
315  m_sources.GetMetadataSources() = {s};
316  {
317  auto ops = m_ops;
318  ops.start = {};
319  ASSERT_THROW(
320  OcmDaqController::Create(m_io_ctx, m_context, m_sources, m_status, m_event_log, ops),
321  std::invalid_argument);
322  }
323  {
324  auto ops = m_ops;
325  ops.stop = {};
326  ASSERT_THROW(
327  OcmDaqController::Create(m_io_ctx, m_context, m_sources, m_status, m_event_log, ops),
328  std::invalid_argument);
329  }
330  {
331  auto ops = m_ops;
332  ops.abort = {};
333  ASSERT_THROW(
334  OcmDaqController::Create(m_io_ctx, m_context, m_sources, m_status, m_event_log, ops),
335  std::invalid_argument);
336  }
337 }
338 
339 TEST_F(TestOcmDaqControllerLifeCycle, ConstructorSucceedsIfSingleMetadataSourceIsUsed) {
340  MetaSource s("source-id", m_meta_rr_client);
341  m_sources.GetMetadataSources() = {s};
342  OcmDaqController::Create(m_io_ctx, m_context, m_sources, m_status, m_event_log, m_ops);
343 }
344 
345 TEST_F(TestOcmDaqControllerLifeCycle, DestructionAbortsAsyncWait) {
346  MetaSource s("source-id", m_meta_rr_client);
347  m_sources.GetMetadataSources() = {s};
348  boost::future<State> fut;
349  {
350  auto daq =
351  OcmDaqController::Create(m_io_ctx, m_context, m_sources, m_status, m_event_log, m_ops);
352  fut = daq->AwaitAsync({"source-id"}, 100ms);
353  ASSERT_FALSE(fut.is_ready());
354  }
355 
356  MakeTestProgress(m_io_ctx, &fut);
357  ASSERT_TRUE(fut.is_ready())
358  << "Future should have been cancelled since daq should have been deleted.";
359  EXPECT_TRUE(fut.has_exception());
360  EXPECT_THROW(fut.get(), DaqOperationAborted);
361 }
362 
363 TEST_F(TestState, NotStarted) {
364  ASSERT_EQ(State::NotStarted, m_daq->GetState()) << "The initial state should be NotStarted";
365 }
366 
367 TEST_F(TestState, GetStatusReturnsSameStatusObject) {
368  auto status_ptr = m_daq->GetStatus();
369  EXPECT_EQ(status_ptr.get(), m_status.get());
370 }
371 
372 TEST_F(TestState, CannotStopStoppedOcmDaqController) {
373  SCOPED_TRACE("CannotStopStoppedOcmDaqController");
374  StartDaq();
375  StopDaq();
376 
377  ASSERT_EQ(State::Stopped, m_daq->GetState()) << "Setup failed";
378 
379  // Try to stop again
380  auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
381  EXPECT_TRUE(fut.has_exception());
382  EXPECT_THROW(fut.get(), std::exception);
383 }
384 
385 TEST_F(TestState, CannotAbortStoppedOcmDaqController) {
386  SCOPED_TRACE("CannotAbortStoppedOcmDaqController");
387  StartDaq();
388  StopDaq();
389 
390  ASSERT_EQ(State::Stopped, m_daq->GetState()) << "Setup failed";
391 
392  // Try to stop again
393  auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
394  EXPECT_TRUE(fut.has_exception());
395  EXPECT_THROW(fut.get(), std::exception);
396 }
397 
398 TEST_F(TestState, StartingFailsToSendStartDaqWillAbortAndSetErrorFlagAndStayInStarting) {
399  // Setup
400  boost::promise<void> reply_promise;
401  EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
402 
403  // Run
404  auto fut = m_daq->StartAsync();
405  EXPECT_EQ(State::Starting, m_daq->GetState());
406 
407  // Set up mock future so that it results in an exception exception.
408  reply_promise.set_exception(std::runtime_error("Fake test failure"));
409 
410  // Run async handlers
411  MakeTestProgress(m_io_ctx, &fut);
412 
413  ASSERT_TRUE(fut.is_ready());
414  EXPECT_TRUE(fut.has_exception()) << "Expected future to contain exception";
415  EXPECT_THROW(fut.get(), std::exception) << "Expected exception to derive from std::exception";
416  EXPECT_EQ(true, m_daq->GetErrorFlag());
417 }
418 
419 TEST_F(TestState, StartAsyncReturnsExceptionalFutureInStateStarting) {
420  // Setup
421  boost::promise<void> reply_promise;
422  EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
423 
424  auto fut = m_daq->StartAsync();
425  ASSERT_EQ(State::Starting, m_daq->GetState());
426  EXPECT_FALSE(fut.is_ready());
427 
428  // Run
429  // @todo: Shouldn't this be communicated through the future?
430  auto fut2 = m_daq->StartAsync();
431  ASSERT_TRUE(fut2.has_exception());
432  EXPECT_THROW(fut2.get(), std::exception)
433  << "Multiple simultaneous start operations are not supported and an exception "
434  "was exected";
435 
436  // Complete pending operations to avoid "leaking" mock objects
437  // "Send reply"
438  reply_promise.set_value();
439 
440  // Make progress
441  MakeTestProgress(m_io_ctx, &fut);
442 }
443 
444 /**
445  * It's possible to abort but not stop (and keep)
446  */
447 TEST_F(TestState, StopAsyncThrowsIfNotStarted) {
448  auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
449  EXPECT_THROW(fut.get(), std::exception)
450  << "It should not be possible to stop a data acquisition that has not started";
451 }
452 
453 TEST_F(TestState, StopAsyncDoesNotThrowWithTolerantPolicy) {
454  // Setup
455  StartDaq();
456 
457  std::optional<op::AsyncOpParams> params;
458  boost::promise<Result<DpParts>> reply_promise;
459  EXPECT_CALL(*m_mock_ops, Stop(ErrorPolicy::Tolerant, _))
460  .WillOnce(DoAll(Invoke([&](auto policy, auto p) { params.emplace(p); }),
461  Return(ByMove(reply_promise.get_future()))));
462 
463  // Run
464  auto fut = m_daq->StopAsync(ErrorPolicy::Tolerant);
465 
466  EXPECT_EQ(State::Stopping, m_daq->GetState())
467  << "Expected state to be in Stopping after requesting to stop";
468 
469  // "Send reply"
470  Result<DpParts> reply{true, {}};
471  reply_promise.set_value(reply);
472  ASSERT_TRUE(params);
473  // Since we are forcing stop it should be acceptable that sources are not stopped.
475 
476  // Execute handlers
477  MakeTestProgress(m_io_ctx, &fut);
478 
479  ASSERT_TRUE(fut.is_ready());
480  ASSERT_FALSE(fut.has_exception()) << "Future has unexpected exception!";
481  auto status = fut.get();
482  EXPECT_EQ(State::Stopped, status.state)
483  << "Expected state to be Stopped since there were no errors";
484  EXPECT_TRUE(status.error) << "Error flag should be set since the reply_promise had an error";
485  EXPECT_EQ(State::Stopped, m_daq->GetState());
486 }
487 
488 /**
489  * It should be possible to abort a data acquisition even if it's not started.
490  */
491 TEST_F(TestState, AbortAsyncIsOkIfNotStarted) {
492  auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
493  ASSERT_TRUE(fut.is_ready())
494  << "Aborting a NotStarted data acquisition should be ready immediately";
495  EXPECT_FALSE(fut.has_exception()) << "Future should not have failed";
496 
497  auto result = fut.get();
498  EXPECT_EQ(State::Aborted, result.state) << "Unexpected state";
499  EXPECT_FALSE(result.error);
500  EXPECT_EQ(State::Aborted, m_daq->GetState());
501 }
502 
503 /**
504  * It's possible to abort but not stop (and keep) if data acquisition is starting
505  */
506 TEST_F(TestState, StopAsyncThrowsIfStarting) {
507  // Setup
508  boost::promise<void> reply_promise;
509  EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
510 
511  auto fut = m_daq->StartAsync();
512  ASSERT_EQ(State::Starting, m_daq->GetState())
513  << "Setup failed, unexpected state, aborting test";
514  ASSERT_FALSE(fut.is_ready());
515 
516  // Run
517  {
518  auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
519  EXPECT_TRUE(fut.has_exception())
520  << "Cannot stop unless DAQ is in State::Acquiring, current state: "
521  << m_daq->GetState();
522 
523  EXPECT_THROW(fut.get(), std::exception)
524  << "Cannot stop if data acquisition is `Starting`. An exeption was expected";
525  }
526  // Complete pending operations to avoid "leaking" mock objects
527  // "Send reply"
528  reply_promise.set_value();
529 
530  // Make progress
531  MakeTestProgress(m_io_ctx, &fut);
532 }
533 
534 /**
535  * Test sequence:
536  *
537  * 1. Send StartDaq
538  * 2. Send AbortDaq
539  * 3. StartDaq still succeeds in this case (simulates serial handling of client requests at source).
540  * 4. AbortDaq suceeds.
541  */
542 TEST_F(TestState, AbortingIsOkWhenStarting) {
543  SCOPED_TRACE("AbortingIsOkWhenStarting");
544  // Setup
545  // Set up mock so that StartDaq invocation returns the future from our promise.
546  boost::promise<void> start_promise;
547  EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(start_promise.get_future())));
548 
549  // Run
550  //
551  // Start data acquisition
552  auto start_fut = m_daq->StartAsync();
553  ASSERT_EQ(State::Starting, m_daq->GetState());
554  EXPECT_FALSE(start_fut.is_ready());
555 
556  // Setup
557  // And ditto for Abort
558  boost::promise<Result<void>> abort_promise;
559  EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
560  .WillOnce(Return(ByMove(abort_promise.get_future())));
561 
562  // Run
563  //
564  // Abort data acquisition
565  auto abort_fut = m_daq->AbortAsync(ErrorPolicy::Strict);
566 
567  // Complete pending operations to avoid "leaking" mock objects
568  // "Send reply"
569  start_promise.set_value();
570 
571  // Make progress
572  MakeTestProgress(m_io_ctx, &start_fut);
573 
574  ASSERT_TRUE(start_fut.is_ready()) << "Cannot proceed with test since future is not ready";
575  EXPECT_FALSE(start_fut.has_exception())
576  << "Mock did not simulate failure so future should be ok";
577  // @todo: What state do we expect to be in?
578 
579  // Complete pending operations to avoid "leaking" mock objects
580  // "Send reply"
581  abort_promise.set_value({false});
582 
583  // Make progress
584  MakeTestProgress(m_io_ctx, &abort_fut);
585 
586  ASSERT_TRUE(abort_fut.is_ready()) << "Cannot proceed with test since future is not ready";
587  EXPECT_FALSE(abort_fut.has_exception())
588  << "Mock didn't simulate failure so future should be OK";
589  auto result = abort_fut.get();
590  EXPECT_EQ(State::Aborted, result.state);
591  EXPECT_FALSE(result.error);
592  EXPECT_EQ(State::Aborted, m_daq->GetState());
593 }
594 
595 TEST_F(TestState, StartAsyncCompletesSuccessfully) {
596  SCOPED_TRACE("Acquiring");
597 
598  StartDaq();
599 }
600 
601 TEST_F(TestState, AbortOcmDaqControllerInStateAborting) {
602  // Setup
603  SCOPED_TRACE("AbortOcmDaqControllerInStateAborting");
604  StartDaq();
605 
606  ASSERT_EQ(State::Acquiring, m_daq->GetState()) << "Test Setup failed";
607 
608  AbortDaq();
609  ASSERT_EQ(State::Aborted, m_daq->GetState()) << "Test setup failed";
610 
611  // Test that abort fails if daq is already aborted
612  auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
613  ASSERT_TRUE(fut.is_ready());
614  EXPECT_THROW(fut.get(), std::runtime_error);
615 }
616 
617 TEST_F(TestState, AbortOcmDaqControllerInStateStarting) {
618  // Setup
619  SCOPED_TRACE("AbortOcmDaqControllerInStateStarting");
620  StartDaq();
621  ASSERT_EQ(State::Acquiring, m_daq->GetState()) << "Test Setup failed";
622 
623  // Test
624  AbortDaq();
625  EXPECT_EQ(State::Aborted, m_daq->GetState());
626 }
627 
628 TEST_F(TestState, AbortAsyncReturnsWithErrorInsteadOfExceptionForTolerantPolicy) {
629  // Setup
630  SCOPED_TRACE("NewAbortSupersedesFailedAbort");
631  StartDaq();
632 
633  boost::promise<Result<void>> abort_promise_1;
634 
635  // Expect two calls to abort since the first one will fail
636  EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Tolerant, _))
637  .WillOnce(Return(ByMove(abort_promise_1.get_future())));
638 
639  // Run
640  auto fut1 = m_daq->AbortAsync(ErrorPolicy::Tolerant);
641 
642  EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
643  << "Expected state to be in Stopping after requesting to abort";
644 
645  // "Send reply1" where first source fails and second is ok.
646  abort_promise_1.set_value({true});
647  MakeTestProgress(m_io_ctx, &fut1);
648 
649  ASSERT_TRUE(fut1.has_value());
650 
651  auto result1 = fut1.get();
652  EXPECT_EQ(State::Aborted, result1.state);
653  EXPECT_TRUE(result1.error);
654  EXPECT_EQ(State::Aborted, m_daq->GetState());
655 }
656 
657 /**
658  * It is possible to abort even though an abort operation has already been started.
659  * Nothing special happens in this case though.
660  * @todo The command bein superseeded should probably fail.
661  */
662 TEST_F(TestState, NewAbortSupersedesSuccessfulAbort) {
663  // Setup
664  SCOPED_TRACE("NewAbortSupersedesSuccessfulAbort");
665  StartDaq();
666 
667  // First abort
668  boost::promise<Result<void>> abort_promise_1;
669  // Second abort
670  boost::promise<Result<void>> abort_promise_2;
671 
672  // Expect two calls to abort
673  EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
674  .Times(2)
675  .WillOnce(Return(ByMove(abort_promise_1.get_future())))
676  .WillOnce(Return(ByMove(abort_promise_2.get_future())));
677 
678  // Run
679  // Launch async operations concurrently
680  auto fut1 = m_daq->AbortAsync(ErrorPolicy::Strict);
681  auto fut2 = m_daq->AbortAsync(ErrorPolicy::Strict);
682 
683  EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
684  << "Expected state to be in Stopping after requesting to abort";
685 
686  // "Send reply1" to cause interleaving
687  abort_promise_1.set_value({false});
688  MakeTestProgress(m_io_ctx, &fut1);
689 
690  ASSERT_TRUE(fut1.is_ready());
691  ASSERT_FALSE(fut1.has_exception()) << "Future has unexpected exception!";
692  auto result1 = fut1.get();
693  EXPECT_EQ(State::Aborted, result1.state);
694  EXPECT_FALSE(result1.error);
695  EXPECT_EQ(State::Aborted, m_daq->GetState());
696 
697  // "Send reply2"
698  abort_promise_2.set_value({false});
699  MakeTestProgress(m_io_ctx, &fut2);
700  auto result2 = fut2.get();
701  EXPECT_EQ(State::Aborted, result2.state);
702  EXPECT_FALSE(result2.error);
703  EXPECT_EQ(State::Aborted, m_daq->GetState());
704 }
705 
706 TEST_F(TestState, NewAbortSupersedesFailedAbortWithStrictPolicy) {
707  // Setup
708  SCOPED_TRACE("NewAbortSupersedesFailedAbort");
709  StartDaq();
710 
711  boost::promise<Result<void>> abort_promise_1;
712  // Second abort
713  boost::promise<Result<void>> abort_promise_2;
714 
715  // Expect two calls to abort since the first one will fail
716  EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
717  .Times(2)
718  .WillOnce(Return(ByMove(abort_promise_1.get_future())))
719  .WillOnce(Return(ByMove(abort_promise_2.get_future())));
720 
721  // Run
722  auto fut1 = m_daq->AbortAsync(ErrorPolicy::Strict);
723 
724  EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
725  << "Expected state to be in Stopping after requesting to abort";
726 
727  // "Send reply1" where first source fails and second is ok.
728  abort_promise_1.set_exception(DaqSourceErrors(std::vector<std::exception_ptr>()));
729  MakeTestProgress(m_io_ctx, &fut1);
730 
731  ASSERT_TRUE(fut1.is_ready());
732  ASSERT_TRUE(fut1.has_exception()) << "Future has unexpected exception!";
733  EXPECT_THROW(fut1.get(), DaqSourceErrors);
734  EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState());
735 
736  // Abort again, this time it works.
737  auto fut2 = m_daq->AbortAsync(ErrorPolicy::Strict);
738 
739  // "Send reply2"
740  abort_promise_2.set_value({false});
741 
742  MakeTestProgress(m_io_ctx, &fut2);
743  ASSERT_TRUE(fut2.has_value());
744 
745  auto result2 = fut2.get();
746  EXPECT_EQ(State::Aborted, result2.state);
747  EXPECT_FALSE(result2.error);
748  EXPECT_EQ(State::Aborted, m_daq->GetState());
749 }
750 
751 TEST_F(TestState, StopOcmDaqControllerSuccessfully) {
752  // Setup
753  SCOPED_TRACE("StopOcmDaqControllerSuccessfully");
754  StartDaq();
755 
756  ASSERT_EQ(State::Acquiring, m_daq->GetState()) << "Test Setup failed";
757 
758  StopDaq();
759 }
760 
761 TEST_F(TestAwait, AwaitNonExistantSourceFails) {
762  // Run
763  auto fut = m_daq->AwaitAsync({"non-existant"}, 0ms);
764  ASSERT_TRUE(fut.has_exception());
765  EXPECT_THROW(fut.get(), std::invalid_argument);
766 }
767 
768 TEST_F(TestAwait, AwaitTimeout) {
769  // Run
770  auto fut = m_daq->AwaitAsync({"meta-source-1"}, 1ms);
771  MakeTestProgress(m_io_ctx, &fut);
772 
773  ASSERT_TRUE(fut.has_exception());
774  EXPECT_THROW(fut.get(), DaqOperationTimeout);
775 }
776 
777 TEST_F(TestAwait, AwaitStopSingleSourceIsOk) {
778  SCOPED_TRACE("AwaitSingleSourceIsOk");
779  // Setup
780  auto fut = m_daq->AwaitAsync({"meta-source-1"}, 150ms);
781  EXPECT_FALSE(fut.is_ready())
782  << "The future shouldn't be ready yet as we haven't started the data acquisition!";
783 
784  // Run
785  StartDaq();
786  EXPECT_FALSE(fut.is_ready())
787  << "Wait condition not fulfilled, so future should not be ready yet";
788  StopDaq();
789 
790  ASSERT_TRUE(fut.is_ready());
791  ASSERT_FALSE(fut.has_exception());
792  auto state = fut.get();
793  EXPECT_TRUE(state == State::Stopping || state == State::Stopped)
794  << "Await condition should have been ready once source is stopped (meaning that DAQ is "
795  "Stopping or Stopped, depending on the order of the source)";
796 }
797 
798 TEST_F(TestAwait, AwaitAbortAllMetadataSources) {
799  SCOPED_TRACE("AwaitSingleSourceIsOk");
800  // Setup
801  auto fut = m_daq->AwaitAsync({"meta-source-1", "meta-source-2"}, 150ms);
802  EXPECT_FALSE(fut.is_ready())
803  << "The future shouldn't be ready yet as we haven't started the data acquisition!";
804 
805  // Run
806  StartDaq();
807  EXPECT_FALSE(fut.is_ready());
808 
809  AbortDaq();
810 
811  EXPECT_TRUE(fut.is_ready());
812  ASSERT_FALSE(fut.has_exception());
813  auto state = fut.get();
814  EXPECT_TRUE(state == State::AbortingAcquiring || state == State::Aborted)
815  << "Await condition should have been ready once source is stopped (meaning that DAQ is "
816  "Aborting or Aborted, depending on the order of the source)";
817 }
818 
819 TEST_F(TestAwait, AwaitStopSingleSourceWhenConditionIsFulfilled) {
820  SCOPED_TRACE("AwaitSingleSourceIsOk");
821  // Setup
822  // Run
823  StartDaq();
824  StopDaq();
825 
826  auto fut = m_daq->AwaitAsync({"meta-source-1"}, 150ms);
827 
828  EXPECT_TRUE(fut.is_ready()) << "Condition already fulfilled so future should be ready";
829  ASSERT_FALSE(fut.has_exception());
830  EXPECT_EQ(State::Stopped, fut.get());
831 }
832 
833 TEST_F(TestNotStarted, CanUpdateKeywords) {
834  // Setup
835 
836  // Run
837  m_daq->UpdateKeywords(m_keywords);
838 
839  EXPECT_EQ(m_keywords, m_daq->GetContext().keywords);
840 }
841 
842 TEST_F(TestStopped, CannotUpdateKeywordsInStopped) {
843  // Setup
844 
845  // Run
846  EXPECT_THROW(m_daq->UpdateKeywords(m_keywords), std::runtime_error);
847 }
848 
849 TEST_F(TestDaqControllerAwait, StartWillAwait) {
850  SCOPED_TRACE("StartWillAwait");
851  // Setup
852  // Run
853  StartDaq();
854 }
855 
856 /**
857  * Tests that DaqController automatically stops DAQ when the await-op completes.
858  */
860  SCOPED_TRACE("AutomaticStop");
861  // Setup
862  StartDaq();
863 
864  // Run
865  // DaqController is monitoring the completion of all primary data sources.
866  // By setting the value we simulate that completion. This should then trigger StopDaq.
867  DpPart prim_part{"s1", "/tmp/file.fits"};
868  m_await_promise.set_value({false, {prim_part}});
869 
870  // Setup expectations for stopping
871  Result<DpParts> stop_op_reply{false, m_files};
872  EXPECT_CALL(*m_mock_ops, Stop(ErrorPolicy::Strict, _))
873  .WillOnce(Return(ByMove(boost::make_ready_future<Result<DpParts>>(stop_op_reply))));
874  ASSERT_EQ(State::Acquiring, this->m_status->GetState());
875  EXPECT_EQ(0u, m_daq->GetContext().results.size());
876 
877  // There's no future to await-on, so we run until the observed state changes instead.
879  m_io_ctx, [this]() -> bool { return this->m_status->GetState() == State::Stopped; });
880  EXPECT_FALSE(m_status->GetError());
881  EXPECT_EQ(2u, m_daq->GetContext().results.size()) << "One from m_files (metadata), "
882  "one from primary";
883  EXPECT_THAT(m_daq->GetContext().results, Contains(prim_part));
884 }
Contains declaration for the AbortAsync operation.
Contains declaration for the AwaitPrimAsync operation.
Started operation was aborted.
Definition: error.hpp:47
Started operation timed out.
Definition: error.hpp:57
Exception thrown to carry reply errors.
Definition: error.hpp:84
Data acquisition sources.
Definition: source.hpp:184
std::vector< MetaSource > const & GetMetadataSources() const noexcept
Definition: source.hpp:198
std::vector< PrimSource > const & GetPrimarySources() const noexcept
Definition: source.hpp:190
Provides information of the location and source of a FITS file or keywords produced by a data acquisi...
Definition: dpPart.hpp:26
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:140
Stores data acquisition status and allows subscription to status changes.
Definition: eventLog.hpp:107
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:165
static std::shared_ptr< OcmDaqController > Create(boost::asio::io_context &io_context, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations operations)
Construct object.
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:96
Contains error related declarations for DAQ.
virtual void PreDaqControllerHook()
std::shared_ptr< MetaSource::RrClient > m_meta_rr_client
void SetUp() override
std::vector< DpPart > m_files
std::shared_ptr< ObservableEventLog > m_event_log
fits::KeywordVector m_keywords
std::shared_ptr< MetaDaqAsyncMock > m_meta_rr_client2
StatusObserverMock m_observer
std::shared_ptr< MetaDaqAsyncMock > m_meta_rr_client
std::shared_ptr< OcmDaqController > m_daq
std::shared_ptr< PrimSource::RrClient > m_prim_rr_client
DaqContext m_context
boost::asio::io_context m_io_ctx
void StartDaq()
Executes a successful StartAsync() call.
void TearDown() override
boost::asio::io_context m_io_ctx
virtual void PreStartAsyncHook()
std::shared_ptr< ObservableStatus > m_status
std::unique_ptr< MockAsyncOperations > m_mock_ops
std::shared_ptr< ObservableStatus > m_status
std::shared_ptr< ObservableEventLog > m_event_log
DaqSources m_sources
std::shared_ptr< PrimSource::RrClient > m_prim_rr_client
Fixture for daq::DaqController life cycle tests.
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Definition: utils.hpp:42
Simple observer used for testing.
Developer notes: OcmDaqController use boost::when_all to compose futures.
Contains declarations for the helper functions to initiate operations.
Mockup of metadaqif classes.
std::tuple< std::unique_ptr< MockAsyncOperations >, daq::OcmAsyncOperations > CreateMockAsyncOperations()
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:414
R InitiateOperation(Params &&... params)
Constructs and initiates Op and return the future result.
Definition: initiate.hpp:66
std::pair< R, std::function< bool()> > InitiateAbortableOperation(Params &&... params)
Like InitiateOperation but in addition to returning the future it also returns an unspecified object ...
Definition: initiate.hpp:75
TEST_F(TestDpmDaqController, StatusUpdateInNotScheduledSucceeds)
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
State
Observable states of the data acquisition process.
Definition: state.hpp:39
@ Aborted
Data acquisition has been aborted by user.
@ Stopping
Transitional state between Acquiring and Stopped.
@ Acquiring
All data sources have reported data acquisition is in progress.
@ Stopped
All data sources have reported they have stopped acquiring data.
@ Starting
Transitional state between NotStarted and Acquiring when sources have not begun acquiring data yet.
@ AbortingAcquiring
Transitional state for aborting during acquisition.
@ NotStarted
Initial state of data acquisition.
TEST(TestDaqContext, Files)
Utility class that represents a result and an error.
Definition: utility.hpp:17
Mockup of metadaqif classes.
Contains declaration for for DaqController.
Contains declaration for the StartAsync operation.
Contains declaration for the StopAsync operation.
boost::promise< Result< DpParts > > m_await_promise
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:44
std::string id
DAQ identfier, possibly provided by user.
Definition: daqContext.hpp:60
OCM Async operations.
A type safe version of LiteralKeyword that consist of the three basic components of a FITS keyword ke...
Definition: keyword.hpp:266
A composite async operation that aborts a DAQ.
Definition: abort.hpp:26
Parameters required for each async operation.
std::vector< Source< MetaSource > > & meta_sources
Note: Consider vector immutable!a.
std::vector< Source< PrimSource > > & prim_sources
Note: Consider vector immutable!
Await specific parameters that is not provided with AsyncOpParams.
A composite async operation that awaits primary data sources.
Definition: awaitPrim.hpp:56
A composite async operation that starts DAQ.
Definition: stop.hpp:27
void SetAllSourceState(op::AsyncOpParams &params, State state)
void SetSourceState(Iterator begin, Iterator end, State state)
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
Defines shared test utilities.
void MakeTestProgressUntil(boost::asio::io_context &io_ctx, Predicate &&pred, std::chrono::milliseconds timeout=std::chrono::seconds(3))
Executes io_ctx::poll until pred returns true or it times out.
Definition: utils.hpp:21