ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
testDaqController.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdpm
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief Unit tests for daq::dpm::DaqControllerImpl
9  */
10 #include <daq/dpm/scheduler.hpp>
11 
12 #include <log4cplus/loggingmacros.h>
13 #include <utility>
14 
16 #include "mock/mockScheduler.hpp"
17 #include "mock/mockWorkspace.hpp"
18 
19 #include <gmock/gmock.h>
20 #include <gtest/gtest.h>
21 
22 namespace daq::dpm {
23 
24 using namespace ::testing;
25 
27  using Hook = std::function<void(MockRsyncAsyncProcess&)>;
28  std::unique_ptr<RsyncAsyncProcessIf> operator()(boost::asio::io_context&,
29  std::string,
30  std::string,
31  RsyncOptions const&,
33  auto rsync = std::make_unique<MockRsyncAsyncProcess>();
34  if (hook) {
35  hook(*rsync);
36  }
37  procs.push_back(rsync.get());
38  return rsync;
39  }
40  void SetHook(Hook f) {
41  hook = std::move(f);
42  }
43  std::vector<MockRsyncAsyncProcess*> procs;
45 };
46 
48  using Hook = std::function<void(MockAsyncProcess&)>;
49  std::unique_ptr<AsyncProcessIf> operator()(boost::asio::io_context&, std::vector<std::string>) {
50  auto rsync = std::make_unique<MockRsyncAsyncProcess>();
51  if (hook) {
52  hook(*rsync);
53  }
54  procs.push_back(rsync.get());
55  return rsync;
56  }
57  void SetHook(Hook f) {
58  hook = std::move(f);
59  }
60  std::vector<MockAsyncProcess*> procs;
62 };
63 
64 class TestDaqControllerBase : public ::testing::Test {
65 public:
67  }
68 
69  void SetUp() override {
70  }
71 
72  void TearDown() override {
73  // Execute possibly pending completions handlers.
74  EXPECT_NO_THROW(m_io_ctx.poll());
75  }
76 
77 protected:
79  boost::asio::io_context m_io_ctx;
82 };
83 
85 
87 public:
89  }
90  void SetUp() override {
92  // Set default status. This can be modified by each test-case before calling PostSetup()
93  m_initial_status.id = "TEST.ID";
94  m_initial_status.state = State::Scheduled;
95  m_initial_status.error = false;
96  m_initial_status.timestamp = Status::TimePoint::clock::now();
97  m_dpspec.target.file_id = "TEST.ID";
98  m_dpspec.target.source = DpSpec::SourceFitsFile{"source1", "host:target.fits", {}};
99  m_dpspec.sources.push_back(DpSpec::SourceFitsFile{"source2", "host:source.fits", {}});
100  }
101 
102  /**
103  * Specifically sets up expectations and other things based on current state.
104  * It considers:
105  * - m_dpspec for sources and populates m_resolver based on that.
106  * - m_initial_status controls which state to start from.
107  */
108  void PostSetUp() {
109  // Set up resolver so that it matches m_dpspec sources.
110  std::filesystem::path sources_root = "sources";
111  if (m_dpspec.target.source.has_value()) {
112  m_resolver.Add({m_dpspec.target.source->source_name, m_dpspec.target.source->origin},
113  sources_root / ParseSourceOrigin(m_dpspec.target.source->origin).path);
114  }
115  for (auto const& s : m_dpspec.sources) {
116  if (std::holds_alternative<DpSpec::SourceFitsFile>(s)) {
117  auto const& source = std::get<DpSpec::SourceFitsFile>(s);
118  m_resolver.Add({source.source_name, source.origin},
119  sources_root / ParseSourceOrigin(source.origin).path);
120  }
121  }
122  auto ws = std::make_unique<daq::dpm::MockDaqWorkspace>();
123  m_ws_mock_ptr = ws.get();
124  EXPECT_CALL(*ws, LoadStatus()).WillRepeatedly(Return(m_initial_status));
125  EXPECT_CALL(*ws, LoadSourceLookup()).WillRepeatedly(Return(m_resolver.GetMapping()));
126  EXPECT_CALL(*ws, LoadSpecification()).WillRepeatedly(Return(m_dpspec));
127  EXPECT_CALL(*ws, GetSourcesPath()).WillRepeatedly(Return(std::filesystem::path("sources")));
128  EXPECT_CALL(*ws, GetSourceLookupPath())
129  .WillRepeatedly(Return(std::filesystem::path("sources.json")));
130  EXPECT_CALL(*ws, GetPath())
131  .Times(AnyNumber())
132  .WillRepeatedly(Return(std::filesystem::path("")));
133  m_daq = std::make_unique<DaqControllerImpl>(m_executor,
134  std::move(ws),
135  m_resources,
136  std::reference_wrapper(m_rsync_factory),
137  std::reference_wrapper(m_proc_factory),
138  m_options);
139  m_daq->Start();
140  }
141 
142  void TearDown() override {
143  m_daq.reset();
145  }
146 
147  /**
148  * Poll one handler at a time until predicate has been satisifed or io_context runs out of work.
149  */
150  template <class Pred>
151  void PollUntil(Pred&& p) {
152  while (true) {
153  m_io_ctx.restart();
154  if (m_io_ctx.poll_one() == 0 || p()) {
155  break;
156  }
157  }
158  }
159 
160 protected:
164  // Not-owned pointer to a DaqWorkspace
166  std::unique_ptr<DaqControllerImpl> m_daq;
169 };
170 
171 using namespace ::testing;
172 
173 TEST_F(TestDaqController, ScheduledTransitionsToTransferring) {
174  // Additional setup
175  PostSetUp();
176  EXPECT_CALL(*m_ws_mock_ptr, StoreSourceLookup(_));
177  EXPECT_CALL(*m_ws_mock_ptr, StoreStatus(Field(&Status::state, State::Transferring)));
178 
179  // Run
180  // Only run handlers until we reach State::Transferring
181  PollUntil([&] { return m_daq->GetState() == State::Transferring; });
182  EXPECT_EQ(m_daq->GetState(), State::Transferring);
183 }
184 
185 TEST_F(TestDaqController, TransferringWithoutFilesTransitionsToMerging) {
186  // Additional setup
187  m_dpspec.target.source = std::nullopt;
188  m_dpspec.sources.clear();
189  m_initial_status.state = State::Transferring;
190  PostSetUp();
191 
192  EXPECT_CALL(*m_ws_mock_ptr, StoreStatus(Field(&Status::state, State::Merging)));
193 
194  // Run
195  // Only run handlers until we reach State::Merging
196  PollUntil([&] { return m_daq->GetState() == State::Merging; });
197  EXPECT_EQ(m_daq->GetState(), State::Merging);
198 }
199 
200 TEST_F(TestDaqController, TransferringStartsTransfersAndWhenCompletedTransitionsToMerging) {
201  // Additional setup
202  ASSERT_TRUE(m_dpspec.target.source);
203  ASSERT_TRUE(m_dpspec.sources.size() == 1);
204  m_initial_status.state = State::Transferring;
205  m_resources.net_receive.SetLimit(0);
206  PostSetUp();
207 
208  EXPECT_EQ(m_resolver.GetMapping().size(), 2u)
209  << "Setup should have added target and one extra source";
210  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("sources/target.fits")))
211  .Times(AnyNumber())
212  .WillRepeatedly(Return(false));
213  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("sources/source.fits")))
214  .Times(AnyNumber())
215  .WillRepeatedly(Return(false));
216 
217  // Run
218  m_io_ctx.poll();
219  EXPECT_EQ(m_daq->GetState(), State::Transferring);
220  // After promise is completed the files will exist
221  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("sources/source.fits")))
222  .WillOnce(Return(true));
223  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("sources/target.fits")))
224  .WillOnce(Return(true));
225  EXPECT_CALL(*m_ws_mock_ptr, StoreStatus(Field(&Status::state, State::Merging)));
226 
227  // Complete async file operations by setting promises
228  for (auto& proc : m_rsync_factory.procs) {
229  proc->promise.set_value(0);
230  }
231  PollUntil([&] { return m_daq->GetState() == State::Merging; });
232 
233  // Execute pending handlers that should have been registered after promise was set.
234  EXPECT_EQ(m_daq->GetState(), State::Merging);
235 }
236 
237 TEST_F(TestDaqController, TransferringFailsDoesNotTransition) {
238  // Additional setup
239  ASSERT_TRUE(m_dpspec.target.source);
240  ASSERT_TRUE(m_dpspec.sources.size() == 1);
241  m_initial_status.state = State::Transferring;
242  m_resources.net_receive.SetLimit(0);
243  PostSetUp();
244 
245  EXPECT_EQ(m_resolver.GetMapping().size(), 2u)
246  << "Setup should have added target and one extra source";
247  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("sources/target.fits")))
248  .WillOnce(Return(false));
249  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("sources/source.fits")))
250  .WillOnce(Return(false));
251 
252  // Test
253  // Initiates transfers
254  m_io_ctx.poll();
255 
256  EXPECT_EQ(m_daq->GetState(), State::Transferring);
257  EXPECT_CALL(*m_ws_mock_ptr,
258  StoreStatus(AllOf(Field(&Status::state, State::Transferring),
259  Field(&Status::error, true))));
260 
261  // Run test that emulate a successful first file transfer and then subsequent failures.
262  // Complete async file operations by setting promises
263  int error = 0;
264  for (auto& proc : m_rsync_factory.procs) {
265  proc->promise.set_value(error++);
266  }
267  // Execute pending handlers that should have been registered after promise was set.
268  m_io_ctx.restart();
269  m_io_ctx.poll();
270 
271  EXPECT_TRUE(m_daq->IsStopped());
272  EXPECT_EQ(m_daq->GetState(), State::Transferring);
273 }
274 
275 /**
276  * [recovery]
277  * If transfer previously failed and then stopped this should be automatically recoverable if
278  * DAQ is started again and requested files are available on FS.
279  */
280 TEST_F(TestDaqController, RecoverAutomaticallyFromTransferringIfFilesExist) {
281  // Additional setup
282  m_initial_status.state = State::Transferring;
283  m_initial_status.error = true;
284  PostSetUp();
285 
286  // Files exist!
287  EXPECT_CALL(*m_ws_mock_ptr, Exists(_)).WillRepeatedly(Return(true));
288  EXPECT_CALL(
289  *m_ws_mock_ptr,
290  StoreStatus(AllOf(Field(&Status::state, State::Merging), Field(&Status::error, false))));
291 
292  // Run
293  // Only run handlers until we reach State::Merging
294  PollUntil([&] { return m_daq->GetState() == State::Merging; });
295  EXPECT_EQ(m_daq->GetState(), State::Merging);
296 }
297 
298 TEST_F(TestDaqController, MergingSuccessful) {
299  // Setup
300  m_initial_status.state = State::Merging;
301  PostSetUp();
302 
303  EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
304  .WillRepeatedly(Return("specification.json"));
305  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("TEST.ID.fits")))
306  .WillOnce(Return(false));
307 
308  // Run
309  // First the process will be created
310  m_io_ctx.poll();
311 
312  ASSERT_EQ(m_proc_factory.procs.size(), 1u);
313  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("TEST.ID.fits")))
314  .WillRepeatedly(Return(true));
315 
316  // expect alert to be set
317  EXPECT_CALL(*m_ws_mock_ptr,
318  StoreStatus(AllOf(
319  Field(&Status::alerts,
320  ElementsAre(Field(&Alert::description,
321  HasSubstr("Writing keywords required resizing")))),
322  Field(&Status::state, State::Merging),
323  Field(&Status::error, false))));
324  EXPECT_CALL(
325  *m_ws_mock_ptr,
326  StoreStatus(AllOf(Field(&Status::state, State::Completed), Field(&Status::error, false))));
327  EXPECT_CALL(*m_ws_mock_ptr, MakeResultSymlink(std::filesystem::path("TEST.ID.fits")));
328 
329  // Emit alert from merger
330  std::string message =
331  R"({"content":{"id":"primary_hdu_resize","message":"Writing keywords required resizing of primary HDU: Add space for at least 73 keywords to avoid resize"},"timestamp":1650964356522093759,"type":"alert"})";
332  m_proc_factory.procs[0]->stdout(8, message);
333 
334  // Invalid messages should be ignored
335  m_proc_factory.procs[0]->stdout(8, "This is invalid JSON");
336 
337  // Then we complete
338  m_proc_factory.procs[0]->promise.set_value(0);
339 
340  // Only run handlers until we reach State::Merging
341  PollUntil([&] { return m_daq->GetState() == State::Completed; });
342  EXPECT_EQ(m_daq->GetState(), State::Completed);
343 }
344 
345 /**
346  * [recovery]
347  * - User manually invokes daqDpmMerge to create output.
348  */
349 TEST_F(TestDaqController, RecoverAutomaticallyFromMergeFailureIfResultExists) {
350  // Setup
351  // Start in Merging with failure.
352  m_initial_status.state = State::Merging;
353  m_initial_status.error = true;
354  PostSetUp();
355 
356  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("TEST.ID.fits")))
357  .WillRepeatedly(Return(true));
358  EXPECT_CALL(
359  *m_ws_mock_ptr,
360  StoreStatus(AllOf(Field(&Status::state, State::Completed), Field(&Status::error, false))));
361 
362  // Run
363  // First the process will be created
364  m_io_ctx.poll();
365  ASSERT_EQ(m_proc_factory.procs.size(), 0u);
366  EXPECT_EQ(m_daq->GetState(), State::Completed);
367 }
368 
369 } // namespace daq::dpm
daq::dpm::FakeProcFactory::hook
Hook hook
Definition: testDaqController.cpp:61
daq::dpm::TestDaqController::SetUp
void SetUp() override
Definition: testDaqController.cpp:90
daq::DpSpec::SourceFitsFile
Definition: dpSpec.hpp:45
daq::dpm::TestDaqControllerBase::SetUp
void SetUp() override
Definition: testDaqController.cpp:69
daq::Status::error
bool error
Definition: status.hpp:139
daq::dpm::FakeProcFactory::SetHook
void SetHook(Hook f)
Definition: testDaqController.cpp:57
daq::DpSpec
Close representation of the JSON structure but with stronger types.
Definition: dpSpec.hpp:28
daq::Status::alerts
std::vector< Alert > alerts
Active alerts.
Definition: status.hpp:143
daq::dpm::TestDaqControllerBase::m_options
DaqControllerOptions m_options
Definition: testDaqController.cpp:81
daq::dpm::FakeProcFactory::operator()
std::unique_ptr< AsyncProcessIf > operator()(boost::asio::io_context &, std::vector< std::string >)
Definition: testDaqController.cpp:49
daq::DpSpec::Target::file_id
std::string file_id
Definition: dpSpec.hpp:51
daq::RsyncAsyncProcessIf::DryRun
DryRun
Definition: rsyncAsyncProcess.hpp:79
daq::RsyncOptions
Options controlling rsync invocation.
Definition: rsyncAsyncProcess.hpp:28
rad::IoExecutor
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
daq::dpm::SourceResolver::GetMapping
auto GetMapping() const noexcept -> Mapping const &
Get native representation of source mapping for serialization.
Definition: sourceResolver.cpp:42
daq::dpm::TestDaqControllerBase::m_executor
rad::IoExecutor m_executor
Definition: testDaqController.cpp:80
daq::Origin::path
std::filesystem::path path
Definition: dpSpec.hpp:83
daq::dpm::TestDaqController::m_rsync_factory
FakeRsyncFactory m_rsync_factory
Definition: testDaqController.cpp:167
daq::State::Transferring
@ Transferring
Input files are being transferred.
daq::Status::timestamp
TimePoint timestamp
Definition: status.hpp:149
daq::dpm::TestDaqController::m_initial_status
Status m_initial_status
Definition: testDaqController.cpp:161
daq::DpSpec::sources
std::vector< SourceTypes > sources
Definition: dpSpec.hpp:63
daq::dpm::TestDaqController::PostSetUp
void PostSetUp()
Specifically sets up expectations and other things based on current state.
Definition: testDaqController.cpp:108
scheduler.hpp
daq::dpm::Scheduler and related class declarations.
daq::MockRsyncAsyncProcess
Definition: mockAsyncProcess.hpp:45
daq::Alert::description
std::string description
Definition: status.hpp:76
daq::dpm::TestDaqController::m_resolver
SourceResolver m_resolver
Definition: testDaqController.cpp:163
daq::dpm::FakeProcFactory::Hook
std::function< void(MockAsyncProcess &)> Hook
Definition: testDaqController.cpp:48
daq::dpm::TestDaqControllerInit
Definition: testDaqController.cpp:84
daq::dpm::TestDaqControllerBase::TestDaqControllerBase
TestDaqControllerBase()
Definition: testDaqController.cpp:66
daq::dpm::FakeRsyncFactory::Hook
std::function< void(MockRsyncAsyncProcess &)> Hook
Definition: testDaqController.cpp:27
daq::dpm::TestDaqControllerBase::TearDown
void TearDown() override
Definition: testDaqController.cpp:72
daq::State::Merging
@ Merging
DAQ is being merged.
daq::dpm::FakeRsyncFactory
Definition: testDaqController.cpp:26
daq::dpm::TestDaqControllerBase::m_resources
Resources m_resources
Definition: testDaqController.cpp:78
daq::DpSpec::Target::source
std::optional< SourceFitsFile > source
Definition: dpSpec.hpp:56
daq::MockAsyncProcess
Combined fake/mock.
Definition: mockAsyncProcess.hpp:21
daq::dpm::TestDaqController::TearDown
void TearDown() override
Definition: testDaqController.cpp:142
daq::dpm::TestDaqControllerBase
Definition: testDaqController.cpp:64
daq::dpm
Definition: testDpSpec.cpp:16
daq::dpm::SourceResolver::Add
void Add(SourceFile const &source, std::filesystem::path const &path)
Adds path so it is resolved using source_name and @origin .
Definition: sourceResolver.cpp:25
daq::dpm::FakeProcFactory::procs
std::vector< MockAsyncProcess * > procs
Definition: testDaqController.cpp:60
daq::dpm::TestDaqController::m_dpspec
DpSpec m_dpspec
Definition: testDaqController.cpp:162
daq::Status::id
std::string id
Definition: status.hpp:136
mockAsyncProcess.hpp
Mocks for daq::RsyncAsyncProcessIf.
daq::dpm::SourceResolver
Provides location of fits source file.
Definition: sourceResolver.hpp:27
daq::dpm::FakeRsyncFactory::operator()
std::unique_ptr< RsyncAsyncProcessIf > operator()(boost::asio::io_context &, std::string, std::string, RsyncOptions const &, RsyncAsyncProcess::DryRun)
Definition: testDaqController.cpp:28
daq::dpm::TestDaqController
Definition: testDaqController.cpp:86
daq::dpm::TEST_F
TEST_F(TestParseOrigin, ParseSucceeds)
Definition: testDpSpec.cpp:20
daq::Status
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:120
daq::Status::state
State state
Definition: status.hpp:138
daq::dpm::DaqControllerOptions
Options for DaqController.
Definition: scheduler.hpp:162
daq::dpm::TestDaqController::PollUntil
void PollUntil(Pred &&p)
Poll one handler at a time until predicate has been satisifed or io_context runs out of work.
Definition: testDaqController.cpp:151
daq::DpSpec::target
Target target
Definition: dpSpec.hpp:62
daq::dpm::TestDaqController::m_daq
std::unique_ptr< DaqControllerImpl > m_daq
Definition: testDaqController.cpp:166
daq::dpm::TestDaqController::m_ws_mock_ptr
daq::dpm::MockDaqWorkspace * m_ws_mock_ptr
Definition: testDaqController.cpp:165
daq::dpm::TestDaqController::TestDaqController
TestDaqController()
Definition: testDaqController.cpp:88
daq::dpm::TestDaqController::m_proc_factory
FakeProcFactory m_proc_factory
Definition: testDaqController.cpp:168
daq::dpm::Resources
Limited resources.
Definition: scheduler.hpp:231
daq::dpm::FakeRsyncFactory::hook
Hook hook
Definition: testDaqController.cpp:44
daq::dpm::FakeProcFactory
Definition: testDaqController.cpp:47
daq::dpm::FakeRsyncFactory::procs
std::vector< MockRsyncAsyncProcess * > procs
Definition: testDaqController.cpp:43
daq::dpm::MockDaqWorkspace
Definition: mockWorkspace.hpp:33
daq::dpm::FakeRsyncFactory::SetHook
void SetHook(Hook f)
Definition: testDaqController.cpp:40
error
Definition: main.cpp:23
mockScheduler.hpp
Mocks for daq::dpm::Scheduler and daq::dpm::DaqScheduler.
daq::dpm::TestDaqControllerBase::m_io_ctx
boost::asio::io_context m_io_ctx
Definition: testDaqController.cpp:79
daq::State::DAQ states handled by OCM.
DAQ states handled by OCM.
Initial state of data acquisition.
daq::ParseSourceOrigin
Origin ParseSourceOrigin(std::string const &origin_str)
Parse origin string from DpSpec into component parts.
Definition: dpSpec.cpp:251