13 #include <gtest/gtest.h>
30 using namespace ::testing;
31 using namespace std::chrono;
44 m_prim_rr_client = std::make_shared<RecCmdsAsyncMock>();
45 m_meta_rr_client = std::make_shared<MetaDaqAsyncMock>();
46 m_ops.start = op::InitiateOperation<op::StartAsync, boost::future<void>,
op::AsyncOpParams>;
48 boost::future<Result<DpParts>>,
52 boost::future<Result<void>>,
56 boost::future<Result<DpParts>>,
70 template <
class Iterator>
73 for (; it != end; ++it) {
102 std::shared_ptr<OcmDaqController>
m_daq;
115 m_files.emplace_back(
"foo",
"bar");
119 m_prim_rr_client = std::make_shared<RecCmdsAsyncMock>();
120 m_meta_rr_client = std::make_shared<MetaDaqAsyncMock>();
121 m_meta_rr_client2 = std::make_shared<NiceMock<MetaDaqAsyncMock>>();
123 m_mock_ops.swap(std::get<std::unique_ptr<MockAsyncOperations>>(tup));
129 MetaSource s1(
"meta-source-1", m_meta_rr_client);
130 MetaSource s2(
"meta-source-2", m_meta_rr_client2);
135 PreDaqControllerHook();
136 m_daq = std::make_shared<daq::OcmDaqController>(m_io_ctx,
141 std::get<OcmAsyncOperations>(tup));
144 ASSERT_EQ(m_status->GetState(), m_daq->GetState());
149 m_meta_rr_client.reset();
150 m_meta_rr_client2.reset();
151 m_prim_rr_client.reset();
165 boost::promise<void> reply_promise;
166 std::optional<op::AsyncOpParams> params;
167 EXPECT_CALL(*m_mock_ops, Start(_))
168 .WillOnce(DoAll(Invoke([&](
auto p) { params.emplace(p); }),
169 Return(ByMove(reply_promise.get_future()))));
173 auto fut = m_daq->StartAsync();
175 EXPECT_FALSE(fut.is_ready());
178 reply_promise.set_value();
185 ASSERT_TRUE(fut.is_ready());
192 std::optional<op::AsyncOpParams> params;
193 boost::promise<Result<void>> reply_promise;
195 .WillOnce(DoAll(Invoke([&](
auto policy,
auto p) { params.emplace(p); }),
196 Return(ByMove(reply_promise.get_future()))));
202 <<
"Expected state to be in Stopping after requesting to abort";
205 reply_promise.set_value({
false});
212 ASSERT_TRUE(fut.is_ready());
213 ASSERT_FALSE(fut.has_exception()) <<
"Future has unexpected exception!";
214 auto result = fut.get();
216 EXPECT_FALSE(result.error);
221 std::optional<op::AsyncOpParams> params;
222 boost::promise<Result<DpParts>> reply_promise;
225 .WillOnce(DoAll(Invoke([&](
auto policy,
auto p) { params.emplace(p); }),
226 Return(ByMove(reply_promise.get_future()))));
232 <<
"Expected state to be in Stopping after requesting to stop";
236 reply_promise.set_value(reply);
243 ASSERT_TRUE(fut.is_ready());
244 ASSERT_FALSE(fut.has_exception()) <<
"Future has unexpected exception!";
245 auto status = fut.get();
247 <<
"Expected state to be Stopped since there were no errors";
248 EXPECT_FALSE(status.error);
272 PrimSource s1(
"prim-source-1", m_prim_rr_client);
273 m_sources.GetPrimarySources() = std::vector<daq::PrimSource>{s1};
276 EXPECT_CALL(*m_mock_ops, AwaitPrim(_))
277 .WillOnce(Return(ByMove(m_await_promise.get_future())));
285 EXPECT_THROW(boost::make_ready_future()
286 .then([](
auto f) -> boost::future<void> {
288 throw std::runtime_error(
"Meow");
290 return boost::make_exceptional_future<void>();
301 std::invalid_argument);
306 m_sources.GetMetadataSources() = {s};
307 m_context.id =
"not-id";
310 std::invalid_argument);
315 m_sources.GetMetadataSources() = {s};
321 std::invalid_argument);
328 std::invalid_argument);
335 std::invalid_argument);
341 m_sources.GetMetadataSources() = {s};
347 m_sources.GetMetadataSources() = {s};
348 boost::future<State> fut;
352 fut =
daq->AwaitAsync({
"source-id"}, 100ms);
353 ASSERT_FALSE(fut.is_ready());
357 ASSERT_TRUE(fut.is_ready())
358 <<
"Future should have been cancelled since daq should have been deleted.";
359 EXPECT_TRUE(fut.has_exception());
368 auto status_ptr = m_daq->GetStatus();
369 EXPECT_EQ(status_ptr.get(), m_status.get());
373 SCOPED_TRACE(
"CannotStopStoppedOcmDaqController");
381 EXPECT_TRUE(fut.has_exception());
382 EXPECT_THROW(fut.get(), std::exception);
386 SCOPED_TRACE(
"CannotAbortStoppedOcmDaqController");
394 EXPECT_TRUE(fut.has_exception());
395 EXPECT_THROW(fut.get(), std::exception);
398 TEST_F(
TestState, StartingFailsToSendStartDaqWillAbortAndSetErrorFlagAndStayInStarting) {
400 boost::promise<void> reply_promise;
401 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
404 auto fut = m_daq->StartAsync();
408 reply_promise.set_exception(std::runtime_error(
"Fake test failure"));
413 ASSERT_TRUE(fut.is_ready());
414 EXPECT_TRUE(fut.has_exception()) <<
"Expected future to contain exception";
415 EXPECT_THROW(fut.get(), std::exception) <<
"Expected exception to derive from std::exception";
421 boost::promise<void> reply_promise;
422 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
424 auto fut = m_daq->StartAsync();
426 EXPECT_FALSE(fut.is_ready());
430 auto fut2 = m_daq->StartAsync();
431 ASSERT_TRUE(fut2.has_exception());
432 EXPECT_THROW(fut2.get(), std::exception)
433 <<
"Multiple simultaneous start operations are not supported and an exception "
438 reply_promise.set_value();
449 EXPECT_THROW(fut.get(), std::exception)
450 <<
"It should not be possible to stop a data acquisition that has not started";
457 std::optional<op::AsyncOpParams> params;
458 boost::promise<Result<DpParts>> reply_promise;
460 .WillOnce(DoAll(Invoke([&](
auto policy,
auto p) { params.emplace(p); }),
461 Return(ByMove(reply_promise.get_future()))));
467 <<
"Expected state to be in Stopping after requesting to stop";
471 reply_promise.set_value(reply);
479 ASSERT_TRUE(fut.is_ready());
480 ASSERT_FALSE(fut.has_exception()) <<
"Future has unexpected exception!";
481 auto status = fut.get();
483 <<
"Expected state to be Stopped since there were no errors";
484 EXPECT_TRUE(status.error) <<
"Error flag should be set since the reply_promise had an error";
493 ASSERT_TRUE(fut.is_ready())
494 <<
"Aborting a NotStarted data acquisition should be ready immediately";
495 EXPECT_FALSE(fut.has_exception()) <<
"Future should not have failed";
497 auto result = fut.get();
499 EXPECT_FALSE(result.error);
508 boost::promise<void> reply_promise;
509 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(reply_promise.get_future())));
511 auto fut = m_daq->StartAsync();
513 <<
"Setup failed, unexpected state, aborting test";
514 ASSERT_FALSE(fut.is_ready());
519 EXPECT_TRUE(fut.has_exception())
520 <<
"Cannot stop unless DAQ is in State::Acquiring, current state: "
521 << m_daq->GetState();
523 EXPECT_THROW(fut.get(), std::exception)
524 <<
"Cannot stop if data acquisition is `Starting`. An exeption was expected";
528 reply_promise.set_value();
543 SCOPED_TRACE(
"AbortingIsOkWhenStarting");
546 boost::promise<void> start_promise;
547 EXPECT_CALL(*m_mock_ops, Start(_)).WillOnce(Return(ByMove(start_promise.get_future())));
552 auto start_fut = m_daq->StartAsync();
554 EXPECT_FALSE(start_fut.is_ready());
558 boost::promise<Result<void>> abort_promise;
560 .WillOnce(Return(ByMove(abort_promise.get_future())));
569 start_promise.set_value();
574 ASSERT_TRUE(start_fut.is_ready()) <<
"Cannot proceed with test since future is not ready";
575 EXPECT_FALSE(start_fut.has_exception())
576 <<
"Mock did not simulate failure so future should be ok";
581 abort_promise.set_value({
false});
586 ASSERT_TRUE(abort_fut.is_ready()) <<
"Cannot proceed with test since future is not ready";
587 EXPECT_FALSE(abort_fut.has_exception())
588 <<
"Mock didn't simulate failure so future should be OK";
589 auto result = abort_fut.get();
591 EXPECT_FALSE(result.error);
596 SCOPED_TRACE(
"Acquiring");
603 SCOPED_TRACE(
"AbortOcmDaqControllerInStateAborting");
613 ASSERT_TRUE(fut.is_ready());
614 EXPECT_THROW(fut.get(), std::runtime_error);
619 SCOPED_TRACE(
"AbortOcmDaqControllerInStateStarting");
630 SCOPED_TRACE(
"NewAbortSupersedesFailedAbort");
633 boost::promise<Result<void>> abort_promise_1;
637 .WillOnce(Return(ByMove(abort_promise_1.get_future())));
643 <<
"Expected state to be in Stopping after requesting to abort";
646 abort_promise_1.set_value({
true});
649 ASSERT_TRUE(fut1.has_value());
651 auto result1 = fut1.get();
653 EXPECT_TRUE(result1.error);
664 SCOPED_TRACE(
"NewAbortSupersedesSuccessfulAbort");
668 boost::promise<Result<void>> abort_promise_1;
670 boost::promise<Result<void>> abort_promise_2;
675 .WillOnce(Return(ByMove(abort_promise_1.get_future())))
676 .WillOnce(Return(ByMove(abort_promise_2.get_future())));
684 <<
"Expected state to be in Stopping after requesting to abort";
687 abort_promise_1.set_value({
false});
690 ASSERT_TRUE(fut1.is_ready());
691 ASSERT_FALSE(fut1.has_exception()) <<
"Future has unexpected exception!";
692 auto result1 = fut1.get();
694 EXPECT_FALSE(result1.error);
698 abort_promise_2.set_value({
false});
700 auto result2 = fut2.get();
702 EXPECT_FALSE(result2.error);
708 SCOPED_TRACE(
"NewAbortSupersedesFailedAbort");
711 boost::promise<Result<void>> abort_promise_1;
713 boost::promise<Result<void>> abort_promise_2;
718 .WillOnce(Return(ByMove(abort_promise_1.get_future())))
719 .WillOnce(Return(ByMove(abort_promise_2.get_future())));
725 <<
"Expected state to be in Stopping after requesting to abort";
728 abort_promise_1.set_exception(
DaqSourceErrors(std::vector<std::exception_ptr>()));
731 ASSERT_TRUE(fut1.is_ready());
732 ASSERT_TRUE(fut1.has_exception()) <<
"Future has unexpected exception!";
740 abort_promise_2.set_value({
false});
743 ASSERT_TRUE(fut2.has_value());
745 auto result2 = fut2.get();
747 EXPECT_FALSE(result2.error);
753 SCOPED_TRACE(
"StopOcmDaqControllerSuccessfully");
763 auto fut = m_daq->AwaitAsync({
"non-existant"}, 0ms);
764 ASSERT_TRUE(fut.has_exception());
765 EXPECT_THROW(fut.get(), std::invalid_argument);
770 auto fut = m_daq->AwaitAsync({
"meta-source-1"}, 1ms);
773 ASSERT_TRUE(fut.has_exception());
778 SCOPED_TRACE(
"AwaitSingleSourceIsOk");
780 auto fut = m_daq->AwaitAsync({
"meta-source-1"}, 150ms);
781 EXPECT_FALSE(fut.is_ready())
782 <<
"The future shouldn't be ready yet as we haven't started the data acquisition!";
786 EXPECT_FALSE(fut.is_ready())
787 <<
"Wait condition not fulfilled, so future should not be ready yet";
790 ASSERT_TRUE(fut.is_ready());
791 ASSERT_FALSE(fut.has_exception());
792 auto state = fut.get();
794 <<
"Await condition should have been ready once source is stopped (meaning that DAQ is "
795 "Stopping or Stopped, depending on the order of the source)";
799 SCOPED_TRACE(
"AwaitSingleSourceIsOk");
801 auto fut = m_daq->AwaitAsync({
"meta-source-1",
"meta-source-2"}, 150ms);
802 EXPECT_FALSE(fut.is_ready())
803 <<
"The future shouldn't be ready yet as we haven't started the data acquisition!";
807 EXPECT_FALSE(fut.is_ready());
811 EXPECT_TRUE(fut.is_ready());
812 ASSERT_FALSE(fut.has_exception());
813 auto state = fut.get();
815 <<
"Await condition should have been ready once source is stopped (meaning that DAQ is "
816 "Aborting or Aborted, depending on the order of the source)";
820 SCOPED_TRACE(
"AwaitSingleSourceIsOk");
826 auto fut = m_daq->AwaitAsync({
"meta-source-1"}, 150ms);
828 EXPECT_TRUE(fut.is_ready()) <<
"Condition already fulfilled so future should be ready";
829 ASSERT_FALSE(fut.has_exception());
837 m_daq->UpdateKeywords(m_keywords);
839 EXPECT_EQ(m_keywords, m_daq->GetContext().keywords);
846 EXPECT_THROW(m_daq->UpdateKeywords(m_keywords), std::runtime_error);
850 SCOPED_TRACE(
"StartWillAwait");
860 SCOPED_TRACE(
"AutomaticStop");
867 DpPart prim_part{
"s1",
"/tmp/file.fits"};
868 m_await_promise.set_value({
false, {prim_part}});
873 .WillOnce(Return(ByMove(boost::make_ready_future<
Result<DpParts>>(stop_op_reply))));
875 EXPECT_EQ(0u, m_daq->GetContext().results.size());
879 m_io_ctx, [
this]() ->
bool {
return this->m_status->GetState() ==
State::Stopped; });
880 EXPECT_FALSE(m_status->GetError());
881 EXPECT_EQ(2u, m_daq->GetContext().results.size()) <<
"One from m_files (metadata), "
883 EXPECT_THAT(m_daq->GetContext().results, Contains(prim_part));
Contains declaration for the AbortAsync operation.
Contains declaration for the AwaitPrimAsync operation.
Started operation was aborted.
Started operation timed out.
Exception thrown to carry reply errors.
Data acquisition sources.
std::vector< MetaSource > const & GetMetadataSources() const noexcept
std::vector< PrimSource > const & GetPrimarySources() const noexcept
Provides information of the location and source of a FITS file or keywords produced by a data acquisi...
Stores data acquisition status and allows subscription to status changes.
Stores data acquisition status and allows subscription to status changes.
static std::shared_ptr< OcmDaqController > Create(boost::asio::io_context &io_context, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations operations)
Construct object.
Keeps relevant state to be able to communicate with a primary data source.
Contains error related declarations for DAQ.
virtual void PreDaqControllerHook()
std::shared_ptr< MetaSource::RrClient > m_meta_rr_client
std::vector< DpPart > m_files
std::shared_ptr< ObservableEventLog > m_event_log
fits::KeywordVector m_keywords
std::shared_ptr< MetaDaqAsyncMock > m_meta_rr_client2
StatusObserverMock m_observer
std::shared_ptr< MetaDaqAsyncMock > m_meta_rr_client
std::shared_ptr< OcmDaqController > m_daq
TestOcmDaqControllerLifeCycle()
std::shared_ptr< PrimSource::RrClient > m_prim_rr_client
boost::asio::io_context m_io_ctx
void StartDaq()
Executes a successful StartAsync() call.
boost::asio::io_context m_io_ctx
virtual void PreStartAsyncHook()
std::shared_ptr< ObservableStatus > m_status
std::unique_ptr< MockAsyncOperations > m_mock_ops
std::shared_ptr< ObservableStatus > m_status
std::shared_ptr< ObservableEventLog > m_event_log
std::shared_ptr< PrimSource::RrClient > m_prim_rr_client
Fixture for daq::DaqController life cycle tests.
void MakeTestProgress(boost::asio::io_context &io_ctx, Future *fut=nullptr)
Test helper that progress the test by executing pending jobs and optionally wait for a future to be r...
Simple observer used for testing.
Developer notes: OcmDaqController use boost::when_all to compose futures.
Contains declarations for the helper functions to initiate operations.
std::tuple< std::unique_ptr< MockAsyncOperations >, daq::OcmAsyncOperations > CreateMockAsyncOperations()
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
R InitiateOperation(Params &&... params)
Constructs and initiates Op and return the future result.
std::pair< R, std::function< bool()> > InitiateAbortableOperation(Params &&... params)
Like InitiateOperation but in addition to returning the future it also returns an unspecified object ...
TEST_F(TestDpmDaqController, StatusUpdateInNotScheduledSucceeds)
ErrorPolicy
Error policy supported by certain operations.
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
State
Observable states of the data acquisition process.
@ Aborted
Data acquisition has been aborted by user.
@ Stopping
Transitional state between Acquiring and Stopped.
@ Acquiring
All data sources have reported data acquisition is in progress.
@ Stopped
All data sources have reported they have stopped acquiring data.
@ Starting
Transitional state between NotStarted and Acquiring when sources have not begun acquiring data yet.
@ AbortingAcquiring
Transitional state for aborting during acquisition.
@ NotStarted
Initial state of data acquisition.
TEST(TestDaqContext, Files)
Utility class that represents a result and an error.
Mockup of metadaqif classes.
Contains declaration for for DaqController.
Contains declaration for the StartAsync operation.
Contains declaration for the StopAsync operation.
void PreDaqControllerHook()
boost::promise< Result< DpParts > > m_await_promise
void PreDaqControllerHook()
void PreDaqControllerHook()
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
std::string id
DAQ identfier, possibly provided by user.
A type safe version of LiteralKeyword that consist of the three basic components of a FITS keyword ke...
A composite async operation that aborts a DAQ.
Parameters required for each async operation.
std::vector< Source< MetaSource > > & meta_sources
Note: Consider vector immutable!a.
std::vector< Source< PrimSource > > & prim_sources
Note: Consider vector immutable!
Await specific parameters that is not provided with AsyncOpParams.
A composite async operation that awaits primary data sources.
A composite async operation that starts DAQ.
void SetAllSourceState(op::AsyncOpParams ¶ms, State state)
void SetSourceState(Iterator begin, Iterator end, State state)
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)
Defines shared test utilities.
void MakeTestProgressUntil(boost::asio::io_context &io_ctx, Predicate &&pred, std::chrono::milliseconds timeout=std::chrono::seconds(3))
Executes io_ctx::poll until pred returns true or it times out.