13 #include <gtest/gtest.h>
30 using namespace ::testing;
31 using namespace std::chrono;
44 m_prim_rr_client = std::make_shared<RecCmdsAsyncMock>();
45 m_meta_rr_client = std::make_shared<MetaDaqAsyncMock>();
46 m_ops.start = op::InitiateOperation<op::StartAsync, boost::future<void>,
op::AsyncOpParams>;
48 boost::future<Result<DpParts>>,
52 boost::future<Result<void>>,
56 boost::future<Result<DpParts>>,
70 template <
class Iterator>
73 for (; it != end; ++it) {
102 std::shared_ptr<OcmDaqController>
m_daq;
115 m_files.emplace_back(
"foo",
"bar");
119 m_prim_rr_client = std::make_shared<RecCmdsAsyncMock>();
120 m_meta_rr_client = std::make_shared<MetaDaqAsyncMock>();
121 m_meta_rr_client2 = std::make_shared<NiceMock<MetaDaqAsyncMock>>();
123 m_mock_ops.swap(std::get<std::unique_ptr<MockAsyncOperations>>(tup));
129 MetaSource s1(
"meta-source-1", m_meta_rr_client);
130 MetaSource s2(
"meta-source-2", m_meta_rr_client2);
135 PreDaqControllerHook();
136 m_daq = std::make_shared<daq::OcmDaqController>(m_io_ctx,
141 std::get<OcmAsyncOperations>(tup));
144 ASSERT_EQ(m_status->GetState(), m_daq->GetState());
149 m_meta_rr_client.reset();
150 m_meta_rr_client2.reset();
151 m_prim_rr_client.reset();
165 boost::promise<void> reply_promise;
166 std::optional<op::AsyncOpParams> params;
167 EXPECT_CALL(*m_mock_ops, Start(_))
168 .WillOnce(DoAll(Invoke([&](
auto p) { params.emplace(p); }),
169 Return(ByMove(reply_promise.get_future()))));
173 auto fut = m_daq->StartAsync();
174 EXPECT_EQ(State::Starting, m_daq->GetState());
175 EXPECT_FALSE(fut.is_ready());
178 reply_promise.set_value();
185 ASSERT_TRUE(fut.is_ready());
186 EXPECT_EQ(State::Acquiring, fut.get());
192 std::optional<op::AsyncOpParams> params;
193 boost::promise<Result<void>> reply_promise;
194 EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
195 .WillOnce(DoAll(Invoke([&](
auto policy,
auto p) { params.emplace(p); }),
196 Return(ByMove(reply_promise.get_future()))));
199 auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
201 EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
202 <<
"Expected state to be in Stopping after requesting to abort";
205 reply_promise.set_value({
false});
212 ASSERT_TRUE(fut.is_ready());
213 ASSERT_FALSE(fut.has_exception()) <<
"Future has unexpected exception!";
214 auto result = fut.get();
215 EXPECT_EQ(State::Aborted, result.state);
216 EXPECT_FALSE(result.error);
217 EXPECT_EQ(State::Aborted, m_daq->GetState());
221 std::optional<op::AsyncOpParams> params;
222 boost::promise<Result<DpParts>> reply_promise;
224 EXPECT_CALL(*m_mock_ops, Stop(ErrorPolicy::Strict, _))
225 .WillOnce(DoAll(Invoke([&](
auto policy,
auto p) { params.emplace(p); }),
226 Return(ByMove(reply_promise.get_future()))));
229 auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
231 EXPECT_EQ(State::Stopping, m_daq->GetState())
232 <<
"Expected state to be in Stopping after requesting to stop";
236 reply_promise.set_value(reply);
243 ASSERT_TRUE(fut.is_ready());
244 ASSERT_FALSE(fut.has_exception()) <<
"Future has unexpected exception!";
245 auto status = fut.get();
246 EXPECT_EQ(State::Stopped, status.state)
247 <<
"Expected state to be Stopped since there were no errors";
248 EXPECT_FALSE(status.error);
249 EXPECT_EQ(State::Stopped, m_daq->GetState());
259 m_status->SetState(State::Acquiring);
265 m_status->SetState(State::Stopped);
272 PrimSource s1(
"prim-source-1", m_prim_rr_client);
273 m_sources.GetPrimarySources() = std::vector<daq::PrimSource>{s1};
276 EXPECT_CALL(*m_mock_ops, AwaitPrim(_))
277 .WillOnce(Return(ByMove(m_await_promise.get_future())));
285 EXPECT_THROW(boost::make_ready_future()
286 .then([](
auto f) -> boost::future<void> {
288 throw std::runtime_error(
"Meow");
290 return boost::make_exceptional_future<void>();
301 std::invalid_argument);
306 m_sources.GetMetadataSources() = {s};
307 m_context.id =
"not-id";
310 std::invalid_argument);
315 m_sources.GetMetadataSources() = {s};
321 std::invalid_argument);
328 std::invalid_argument);
335 std::invalid_argument);
341 m_sources.GetMetadataSources() = {s};
347 m_sources.GetMetadataSources() = {s};
348 boost::future<State> fut;
352 fut =
daq->AwaitAsync({
"source-id"}, 100ms);
353 ASSERT_FALSE(fut.is_ready());
357 ASSERT_TRUE(fut.is_ready())
358 <<
"Future should have been cancelled since daq should have been deleted.";
359 EXPECT_TRUE(fut.has_exception());
364 ASSERT_EQ(State::NotStarted, m_daq->GetState()) <<
"The initial state should be NotStarted";
368 auto status_ptr = m_daq->GetStatus();
369 EXPECT_EQ(status_ptr.get(), m_status.get());
373 SCOPED_TRACE(
"CannotStopStoppedOcmDaqController");
377 ASSERT_EQ(State::Stopped, m_daq->GetState()) <<
"Setup failed";
380 auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
381 EXPECT_TRUE(fut.has_exception());
382 EXPECT_THROW(fut.get(), std::exception);
386 SCOPED_TRACE(
"CannotAbortStoppedOcmDaqController");
390 ASSERT_EQ(State::Stopped, m_daq->GetState()) <<
"Setup failed";
393 auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
394 EXPECT_TRUE(fut.has_exception());
395 EXPECT_THROW(fut.get(), std::exception);
398 TEST_F(
TestState, StartingFailsToSendStartDaqWillAbortAndSetErrorFlagAndStayInStarting) {
400 boost::promise<void> reply_promise;
401 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
404 auto fut = m_daq->StartAsync();
405 EXPECT_EQ(State::Starting, m_daq->GetState());
408 reply_promise.set_exception(std::runtime_error(
"Fake test failure"));
413 ASSERT_TRUE(fut.is_ready());
414 EXPECT_TRUE(fut.has_exception()) <<
"Expected future to contain exception";
415 EXPECT_THROW(fut.get(), std::exception) <<
"Expected exception to derive from std::exception";
416 EXPECT_EQ(
true, m_daq->GetErrorFlag());
421 boost::promise<void> reply_promise;
422 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
424 auto fut = m_daq->StartAsync();
425 ASSERT_EQ(State::Starting, m_daq->GetState());
426 EXPECT_FALSE(fut.is_ready());
430 auto fut2 = m_daq->StartAsync();
431 ASSERT_TRUE(fut2.has_exception());
432 EXPECT_THROW(fut2.get(), std::exception)
433 <<
"Multiple simultaneous start operations are not supported and an exception "
438 reply_promise.set_value();
448 auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
449 EXPECT_THROW(fut.get(), std::exception)
450 <<
"It should not be possible to stop a data acquisition that has not started";
457 std::optional<op::AsyncOpParams> params;
458 boost::promise<Result<DpParts>> reply_promise;
460 .WillOnce(DoAll(Invoke([&](
auto policy,
auto p) { params.emplace(p); }),
461 Return(ByMove(reply_promise.get_future()))));
466 EXPECT_EQ(State::Stopping, m_daq->GetState())
467 <<
"Expected state to be in Stopping after requesting to stop";
471 reply_promise.set_value(reply);
479 ASSERT_TRUE(fut.is_ready());
480 ASSERT_FALSE(fut.has_exception()) <<
"Future has unexpected exception!";
481 auto status = fut.get();
482 EXPECT_EQ(State::Stopped, status.state)
483 <<
"Expected state to be Stopped since there were no errors";
484 EXPECT_TRUE(status.error) <<
"Error flag should be set since the reply_promise had an error";
485 EXPECT_EQ(State::Stopped, m_daq->GetState());
492 auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
493 ASSERT_TRUE(fut.is_ready())
494 <<
"Aborting a NotStarted data acquisition should be ready immediately";
495 EXPECT_FALSE(fut.has_exception()) <<
"Future should not have failed";
497 auto result = fut.get();
498 EXPECT_EQ(State::Aborted, result.state) <<
"Unexpected state";
499 EXPECT_FALSE(result.error);
500 EXPECT_EQ(State::Aborted, m_daq->GetState());
508 boost::promise<void> reply_promise;
509 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
511 auto fut = m_daq->StartAsync();
512 ASSERT_EQ(State::Starting, m_daq->GetState())
513 <<
"Setup failed, unexpected state, aborting test";
514 ASSERT_FALSE(fut.is_ready());
518 auto fut = m_daq->StopAsync(ErrorPolicy::Strict);
519 EXPECT_TRUE(fut.has_exception())
520 <<
"Cannot stop unless DAQ is in State::Acquiring, current state: "
521 << m_daq->GetState();
523 EXPECT_THROW(fut.get(), std::exception)
524 <<
"Cannot stop if data acquisition is `Starting`. An exeption was expected";
528 reply_promise.set_value();
543 SCOPED_TRACE(
"AbortingIsOkWhenStarting");
546 boost::promise<void> start_promise;
547 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(start_promise.get_future())));
552 auto start_fut = m_daq->StartAsync();
553 ASSERT_EQ(State::Starting, m_daq->GetState());
554 EXPECT_FALSE(start_fut.is_ready());
558 boost::promise<Result<void>> abort_promise;
559 EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
560 .WillOnce(Return(ByMove(abort_promise.get_future())));
565 auto abort_fut = m_daq->AbortAsync(ErrorPolicy::Strict);
569 start_promise.set_value();
574 ASSERT_TRUE(start_fut.is_ready()) <<
"Cannot proceed with test since future is not ready";
575 EXPECT_FALSE(start_fut.has_exception())
576 <<
"Mock did not simulate failure so future should be ok";
581 abort_promise.set_value({
false});
586 ASSERT_TRUE(abort_fut.is_ready()) <<
"Cannot proceed with test since future is not ready";
587 EXPECT_FALSE(abort_fut.has_exception())
588 <<
"Mock didn't simulate failure so future should be OK";
589 auto result = abort_fut.get();
590 EXPECT_EQ(State::Aborted, result.state);
591 EXPECT_FALSE(result.error);
592 EXPECT_EQ(State::Aborted, m_daq->GetState());
596 SCOPED_TRACE(
"Acquiring");
603 SCOPED_TRACE(
"AbortOcmDaqControllerInStateAborting");
606 ASSERT_EQ(State::Acquiring, m_daq->GetState()) <<
"Test Setup failed";
609 ASSERT_EQ(State::Aborted, m_daq->GetState()) <<
"Test setup failed";
612 auto fut = m_daq->AbortAsync(ErrorPolicy::Strict);
613 ASSERT_TRUE(fut.is_ready());
614 EXPECT_THROW(fut.get(), std::runtime_error);
619 SCOPED_TRACE(
"AbortOcmDaqControllerInStateStarting");
621 ASSERT_EQ(State::Acquiring, m_daq->GetState()) <<
"Test Setup failed";
625 EXPECT_EQ(State::Aborted, m_daq->GetState());
630 SCOPED_TRACE(
"NewAbortSupersedesFailedAbort");
633 boost::promise<Result<void>> abort_promise_1;
637 .WillOnce(Return(ByMove(abort_promise_1.get_future())));
642 EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
643 <<
"Expected state to be in Stopping after requesting to abort";
646 abort_promise_1.set_value({
true});
649 ASSERT_TRUE(fut1.has_value());
651 auto result1 = fut1.get();
652 EXPECT_EQ(State::Aborted, result1.state);
653 EXPECT_TRUE(result1.error);
654 EXPECT_EQ(State::Aborted, m_daq->GetState());
664 SCOPED_TRACE(
"NewAbortSupersedesSuccessfulAbort");
668 boost::promise<Result<void>> abort_promise_1;
670 boost::promise<Result<void>> abort_promise_2;
673 EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
675 .WillOnce(Return(ByMove(abort_promise_1.get_future())))
676 .WillOnce(Return(ByMove(abort_promise_2.get_future())));
680 auto fut1 = m_daq->AbortAsync(ErrorPolicy::Strict);
681 auto fut2 = m_daq->AbortAsync(ErrorPolicy::Strict);
683 EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
684 <<
"Expected state to be in Stopping after requesting to abort";
687 abort_promise_1.set_value({
false});
690 ASSERT_TRUE(fut1.is_ready());
691 ASSERT_FALSE(fut1.has_exception()) <<
"Future has unexpected exception!";
692 auto result1 = fut1.get();
693 EXPECT_EQ(State::Aborted, result1.state);
694 EXPECT_FALSE(result1.error);
695 EXPECT_EQ(State::Aborted, m_daq->GetState());
698 abort_promise_2.set_value({
false});
700 auto result2 = fut2.get();
701 EXPECT_EQ(State::Aborted, result2.state);
702 EXPECT_FALSE(result2.error);
703 EXPECT_EQ(State::Aborted, m_daq->GetState());
708 SCOPED_TRACE(
"NewAbortSupersedesFailedAbort");
711 boost::promise<Result<void>> abort_promise_1;
713 boost::promise<Result<void>> abort_promise_2;
716 EXPECT_CALL(*m_mock_ops, Abort(ErrorPolicy::Strict, _))
718 .WillOnce(Return(ByMove(abort_promise_1.get_future())))
719 .WillOnce(Return(ByMove(abort_promise_2.get_future())));
722 auto fut1 = m_daq->AbortAsync(ErrorPolicy::Strict);
724 EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState())
725 <<
"Expected state to be in Stopping after requesting to abort";
728 abort_promise_1.set_exception(
DaqSourceErrors(std::vector<std::exception_ptr>()));
731 ASSERT_TRUE(fut1.is_ready());
732 ASSERT_TRUE(fut1.has_exception()) <<
"Future has unexpected exception!";
734 EXPECT_EQ(State::AbortingAcquiring, m_daq->GetState());
737 auto fut2 = m_daq->AbortAsync(ErrorPolicy::Strict);
740 abort_promise_2.set_value({
false});
743 ASSERT_TRUE(fut2.has_value());
745 auto result2 = fut2.get();
746 EXPECT_EQ(State::Aborted, result2.state);
747 EXPECT_FALSE(result2.error);
748 EXPECT_EQ(State::Aborted, m_daq->GetState());
753 SCOPED_TRACE(
"StopOcmDaqControllerSuccessfully");
756 ASSERT_EQ(State::Acquiring, m_daq->GetState()) <<
"Test Setup failed";
763 auto fut = m_daq->AwaitAsync({
"non-existant"}, 0ms);
764 ASSERT_TRUE(fut.has_exception());
765 EXPECT_THROW(fut.get(), std::invalid_argument);
770 auto fut = m_daq->AwaitAsync({
"meta-source-1"}, 1ms);
773 ASSERT_TRUE(fut.has_exception());
778 SCOPED_TRACE(
"AwaitSingleSourceIsOk");
780 auto fut = m_daq->AwaitAsync({
"meta-source-1"}, 150ms);
781 EXPECT_FALSE(fut.is_ready())
782 <<
"The future shouldn't be ready yet as we haven't started the data acquisition!";
786 EXPECT_FALSE(fut.is_ready())
787 <<
"Wait condition not fulfilled, so future should not be ready yet";
790 ASSERT_TRUE(fut.is_ready());
791 ASSERT_FALSE(fut.has_exception());
792 auto state = fut.get();
793 EXPECT_TRUE(state == State::Stopping || state == State::Stopped)
794 <<
"Await condition should have been ready once source is stopped (meaning that DAQ is "
795 "Stopping or Stopped, depending on the order of the source)";
799 SCOPED_TRACE(
"AwaitSingleSourceIsOk");
801 auto fut = m_daq->AwaitAsync({
"meta-source-1",
"meta-source-2"}, 150ms);
802 EXPECT_FALSE(fut.is_ready())
803 <<
"The future shouldn't be ready yet as we haven't started the data acquisition!";
807 EXPECT_FALSE(fut.is_ready());
811 EXPECT_TRUE(fut.is_ready());
812 ASSERT_FALSE(fut.has_exception());
813 auto state = fut.get();
814 EXPECT_TRUE(state == State::AbortingAcquiring || state == State::Aborted)
815 <<
"Await condition should have been ready once source is stopped (meaning that DAQ is "
816 "Aborting or Aborted, depending on the order of the source)";
820 SCOPED_TRACE(
"AwaitSingleSourceIsOk");
826 auto fut = m_daq->AwaitAsync({
"meta-source-1"}, 150ms);
828 EXPECT_TRUE(fut.is_ready()) <<
"Condition already fulfilled so future should be ready";
829 ASSERT_FALSE(fut.has_exception());
830 EXPECT_EQ(State::Stopped, fut.get());
837 m_daq->UpdateKeywords(m_keywords);
839 EXPECT_EQ(m_keywords, m_daq->GetContext().keywords);
846 EXPECT_THROW(m_daq->UpdateKeywords(m_keywords), std::runtime_error);
850 SCOPED_TRACE(
"StartWillAwait");
860 SCOPED_TRACE(
"AutomaticStop");
867 DpPart prim_part{
"s1",
"/tmp/file.fits"};
868 m_await_promise.set_value({
false, {prim_part}});
872 EXPECT_CALL(*m_mock_ops, Stop(ErrorPolicy::Strict, _))
873 .WillOnce(Return(ByMove(boost::make_ready_future<
Result<DpParts>>(stop_op_reply))));
874 ASSERT_EQ(State::Acquiring, this->m_status->GetState());
875 EXPECT_EQ(0u, m_daq->GetContext().results.size());
879 m_io_ctx, [
this]() ->
bool {
return this->m_status->GetState() == State::Stopped; });
880 EXPECT_FALSE(m_status->GetError());
881 EXPECT_EQ(2u, m_daq->GetContext().results.size()) <<
"One from m_files (metadata), "
883 EXPECT_THAT(m_daq->GetContext().results, Contains(prim_part));