ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
testDaqController.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdpm
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief Unit tests for daq::dpm::DaqControllerImpl
9  */
10 #include <daq/dpm/scheduler.hpp>
11 
12 #include <log4cplus/loggingmacros.h>
13 #include <utility>
14 
16 #include "mock/mockScheduler.hpp"
17 #include "mock/mockWorkspace.hpp"
18 
19 #include <gmock/gmock.h>
20 #include <gtest/gtest.h>
21 
22 namespace daq::dpm {
23 
24 using namespace ::testing;
25 
27  using Hook = std::function<void(MockRsyncAsyncProcess&)>;
28  std::unique_ptr<RsyncAsyncProcessIf> operator()(boost::asio::io_context&,
29  std::string, // NOLINT
30  std::string, // NOLINT
31  RsyncOptions const&,
33  auto rsync = std::make_unique<MockRsyncAsyncProcess>();
34  if (hook) {
35  hook(*rsync);
36  }
37  procs.push_back(rsync.get());
38  return rsync;
39  }
40  void SetHook(Hook f) {
41  hook = std::move(f);
42  }
43  std::vector<MockRsyncAsyncProcess*> procs;
45 };
46 
48  using Hook = std::function<void(MockAsyncProcess&)>;
49  // NOLINTNEXTLINE
50  std::unique_ptr<AsyncProcessIf> operator()(boost::asio::io_context&, std::vector<std::string>) {
51  auto rsync = std::make_unique<MockRsyncAsyncProcess>();
52  if (hook) {
53  hook(*rsync);
54  }
55  procs.push_back(rsync.get());
56  return rsync;
57  }
58  void SetHook(Hook f) {
59  hook = std::move(f);
60  }
61  std::vector<MockAsyncProcess*> procs;
63 };
64 
65 class TestDaqControllerBase : public ::testing::Test {
66 public:
68  }
69 
70  void SetUp() override {
71  }
72 
73  void TearDown() override {
74  // Execute possibly pending completions handlers.
75  EXPECT_NO_THROW(m_io_ctx.poll());
76  }
77 
78 protected:
80  boost::asio::io_context m_io_ctx;
83 };
84 
86 
88 public:
90  }
91  void SetUp() override {
93  // Set default status. This can be modified by each test-case before calling PostSetup()
94  m_initial_status.id = "TEST.ID";
96  m_initial_status.error = false;
97  m_initial_status.timestamp = Status::TimePoint::clock::now();
98  m_dpspec.target.file_id = "TEST.ID";
99  m_dpspec.target.source = json::FitsFileSource{"source1", "host:target.fits", {}};
100  m_dpspec.sources.push_back(json::FitsFileSource{"source2", "host:source.fits", {}});
101  }
102 
103  /**
104  * Specifically sets up expectations and other things based on current state.
105  * It considers:
106  * - m_dpspec for sources and populates m_resolver based on that.
107  * - m_initial_status controls which state to start from.
108  */
109  void PostSetUp() {
110  // Set up resolver so that it matches m_dpspec sources.
111  std::filesystem::path sources_root = "sources";
112  if (m_dpspec.target.source.has_value()) {
113  m_resolver.Add(
114  {m_dpspec.target.source->source_name, m_dpspec.target.source->location},
115  sources_root / json::ParseSourceLocation(m_dpspec.target.source->location).path);
116  }
117  for (auto const& s : m_dpspec.sources) {
118  if (std::holds_alternative<json::FitsFileSource>(s)) {
119  auto const& source = std::get<json::FitsFileSource>(s);
120  m_resolver.Add({source.source_name, source.location},
121  sources_root / json::ParseSourceLocation(source.location).path);
122  }
123  }
124  auto ws = std::make_unique<daq::dpm::MockDaqWorkspace>();
125  m_ws_mock_ptr = ws.get();
126  EXPECT_CALL(*ws, LoadStatus()).WillRepeatedly(Return(m_initial_status));
127  EXPECT_CALL(*ws, LoadSourceLookup()).WillRepeatedly(Return(m_resolver.GetMapping()));
128  EXPECT_CALL(*ws, LoadSpecification()).WillRepeatedly(Return(m_dpspec));
129  EXPECT_CALL(*ws, GetSourcesPath()).WillRepeatedly(Return(std::filesystem::path("sources")));
130  EXPECT_CALL(*ws, GetSourceLookupPath())
131  .WillRepeatedly(Return(std::filesystem::path("sources.json")));
132  EXPECT_CALL(*ws, GetPath())
133  .Times(AnyNumber())
134  .WillRepeatedly(Return(std::filesystem::path("")));
135  m_daq = std::make_unique<DaqControllerImpl>(m_executor,
136  std::move(ws),
137  m_resources,
138  std::reference_wrapper(m_rsync_factory),
139  std::reference_wrapper(m_proc_factory),
140  m_options);
141  m_daq->Start();
142  }
143 
144  void TearDown() override {
145  m_daq.reset();
147  }
148 
149  /**
150  * Poll one handler at a time until predicate has been satisifed or io_context runs out of work.
151  */
152  template <class Pred>
153  void PollUntil(Pred&& p) {
154  while (true) {
155  m_io_ctx.restart();
156  if (m_io_ctx.poll_one() == 0 || p()) {
157  break;
158  }
159  }
160  }
161 
162 protected:
166  // Not-owned pointer to a DaqWorkspace
168  std::unique_ptr<DaqControllerImpl> m_daq;
171 };
172 
173 using namespace ::testing;
174 
175 TEST_F(TestDaqController, ScheduledTransitionsToTransferring) {
176  // Additional setup
177  PostSetUp();
178  EXPECT_CALL(*m_ws_mock_ptr, StoreSourceLookup(_));
179  EXPECT_CALL(*m_ws_mock_ptr, StoreStatus(Field(&Status::state, State::Transferring)));
180 
181  // Run
182  // Only run handlers until we reach State::Transferring
183  PollUntil([&] { return m_daq->GetState() == State::Transferring; });
184  EXPECT_EQ(m_daq->GetState(), State::Transferring);
185 }
186 
187 TEST_F(TestDaqController, TransferringWithoutFilesTransitionsToMerging) {
188  // Additional setup
189  m_dpspec.target.source = std::nullopt;
190  m_dpspec.sources.clear();
191  m_initial_status.state = State::Transferring;
192  PostSetUp();
193 
194  EXPECT_CALL(*m_ws_mock_ptr, StoreStatus(Field(&Status::state, State::Merging)));
195 
196  // Run
197  // Only run handlers until we reach State::Merging
198  PollUntil([&] { return m_daq->GetState() == State::Merging; });
199  EXPECT_EQ(m_daq->GetState(), State::Merging);
200 }
201 
202 TEST_F(TestDaqController, TransferringStartsTransfersAndWhenCompletedTransitionsToMerging) {
203  // Additional setup
204  ASSERT_TRUE(m_dpspec.target.source);
205  ASSERT_TRUE(m_dpspec.sources.size() == 1);
206  m_initial_status.state = State::Transferring;
207  m_resources.net_receive.SetLimit(0);
208  PostSetUp();
209 
210  EXPECT_EQ(m_resolver.GetMapping().size(), 2u)
211  << "Setup should have added target and one extra source";
212  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("sources/target.fits")))
213  .Times(AnyNumber())
214  .WillRepeatedly(Return(false));
215  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("sources/source.fits")))
216  .Times(AnyNumber())
217  .WillRepeatedly(Return(false));
218 
219  // Run
220  m_io_ctx.poll();
221  EXPECT_EQ(m_daq->GetState(), State::Transferring);
222  // After promise is completed the files will exist
223  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("sources/source.fits")))
224  .WillOnce(Return(true));
225  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("sources/target.fits")))
226  .WillOnce(Return(true));
227  EXPECT_CALL(*m_ws_mock_ptr, StoreStatus(Field(&Status::state, State::Merging)));
228 
229  // Complete async file operations by setting promises
230  for (auto& proc : m_rsync_factory.procs) {
231  proc->promise.set_value(0);
232  }
233  PollUntil([&] { return m_daq->GetState() == State::Merging; });
234 
235  // Execute pending handlers that should have been registered after promise was set.
236  EXPECT_EQ(m_daq->GetState(), State::Merging);
237 }
238 
239 TEST_F(TestDaqController, TransferringFailsDoesNotTransition) {
240  // Additional setup
241  ASSERT_TRUE(m_dpspec.target.source);
242  ASSERT_TRUE(m_dpspec.sources.size() == 1);
243  m_initial_status.state = State::Transferring;
244  m_resources.net_receive.SetLimit(0);
245  PostSetUp();
246 
247  EXPECT_EQ(m_resolver.GetMapping().size(), 2u)
248  << "Setup should have added target and one extra source";
249  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("sources/target.fits")))
250  .WillOnce(Return(false));
251  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("sources/source.fits")))
252  .WillOnce(Return(false));
253 
254  // Test
255  // Initiates transfers
256  m_io_ctx.poll();
257 
258  EXPECT_EQ(m_daq->GetState(), State::Transferring);
259  EXPECT_CALL(*m_ws_mock_ptr,
260  StoreStatus(AllOf(Field(&Status::state, State::Transferring),
261  Field(&Status::error, true))));
262 
263  // Run test that emulate a successful first file transfer and then subsequent failures.
264  // Complete async file operations by setting promises
265  int error = 0;
266  for (auto& proc : m_rsync_factory.procs) {
267  proc->promise.set_value(error++);
268  }
269  // Execute pending handlers that should have been registered after promise was set.
270  m_io_ctx.restart();
271  m_io_ctx.poll();
272 
273  EXPECT_TRUE(m_daq->IsStopped());
274  EXPECT_EQ(m_daq->GetState(), State::Transferring);
275 }
276 
277 /**
278  * [recovery]
279  * If transfer previously failed and then stopped this should be automatically recoverable if
280  * DAQ is started again and requested files are available on FS.
281  */
282 TEST_F(TestDaqController, RecoverAutomaticallyFromTransferringIfFilesExist) {
283  // Additional setup
284  m_initial_status.state = State::Transferring;
285  m_initial_status.error = true;
286  PostSetUp();
287 
288  // Files exist!
289  EXPECT_CALL(*m_ws_mock_ptr, Exists(_)).WillRepeatedly(Return(true));
290  EXPECT_CALL(
291  *m_ws_mock_ptr,
292  StoreStatus(AllOf(Field(&Status::state, State::Merging), Field(&Status::error, false))));
293 
294  // Run
295  // Only run handlers until we reach State::Merging
296  PollUntil([&] { return m_daq->GetState() == State::Merging; });
297  EXPECT_EQ(m_daq->GetState(), State::Merging);
298 }
299 
300 TEST_F(TestDaqController, MergingSuccessful) {
301  // Setup
302  m_initial_status.state = State::Merging;
303  PostSetUp();
304 
305  EXPECT_CALL(*m_ws_mock_ptr, GetSpecificationPath())
306  .WillRepeatedly(Return("specification.json"));
307  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("TEST.ID.fits")))
308  .WillOnce(Return(false));
309 
310  // Run
311  // First the process will be created
312  m_io_ctx.poll();
313 
314  ASSERT_EQ(m_proc_factory.procs.size(), 1u);
315  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("TEST.ID.fits")))
316  .WillRepeatedly(Return(true));
317 
318  // expect alert to be set
319  EXPECT_CALL(*m_ws_mock_ptr,
320  StoreStatus(AllOf(
321  Field(&Status::alerts,
322  ElementsAre(Field(&Alert::description,
323  HasSubstr("Writing keywords required resizing")))),
324  Field(&Status::state, State::Merging),
325  Field(&Status::error, false))));
326  EXPECT_CALL(
327  *m_ws_mock_ptr,
328  StoreStatus(AllOf(Field(&Status::state, State::Completed), Field(&Status::error, false))));
329  EXPECT_CALL(*m_ws_mock_ptr, MakeResultSymlink(std::filesystem::path("TEST.ID.fits")));
330 
331  // Emit alert from merger
332  std::string message =
333  R"({"content":{"id":"primary_hdu_resize","message":"Writing keywords required resizing of primary HDU: Add space for at least 73 keywords to avoid resize"},"timestamp":1650964356522093759,"type":"alert"})";
334  m_proc_factory.procs[0]->stdout(8, message);
335 
336  // Invalid messages should be ignored
337  m_proc_factory.procs[0]->stdout(8, "This is invalid JSON");
338 
339  // Then we complete
340  m_proc_factory.procs[0]->promise.set_value(0);
341 
342  // Only run handlers until we reach State::Merging
343  PollUntil([&] { return m_daq->GetState() == State::Completed; });
344  EXPECT_EQ(m_daq->GetState(), State::Completed);
345 }
346 
347 /**
348  * [recovery]
349  * - User manually invokes daqDpmMerge to create output.
350  */
351 TEST_F(TestDaqController, RecoverAutomaticallyFromMergeFailureIfResultExists) {
352  // Setup
353  // Start in Merging with failure.
354  m_initial_status.state = State::Merging;
355  m_initial_status.error = true;
356  PostSetUp();
357 
358  EXPECT_CALL(*m_ws_mock_ptr, Exists(std::filesystem::path("TEST.ID.fits")))
359  .WillRepeatedly(Return(true));
360  EXPECT_CALL(
361  *m_ws_mock_ptr,
362  StoreStatus(AllOf(Field(&Status::state, State::Completed), Field(&Status::error, false))));
363 
364  // Run
365  // First the process will be created
366  m_io_ctx.poll();
367  ASSERT_EQ(m_proc_factory.procs.size(), 0u);
368  EXPECT_EQ(m_daq->GetState(), State::Completed);
369 }
370 
371 } // namespace daq::dpm
Provides location of fits source file.
void Add(SourceFile const &source, std::filesystem::path const &path)
Adds path so it is resolved using source_name and location.
auto GetMapping() const noexcept -> Mapping const &
Get native representation of source mapping for serialization.
boost::asio::io_context m_io_ctx
std::unique_ptr< DaqControllerImpl > m_daq
daq::dpm::MockDaqWorkspace * m_ws_mock_ptr
void PollUntil(Pred &&p)
Poll one handler at a time until predicate has been satisifed or io_context runs out of work.
void PostSetUp()
Specifically sets up expectations and other things based on current state.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
Mocks for daq::RsyncAsyncProcessIf.
Mocks for daq::dpm::Scheduler and daq::dpm::DaqScheduler.
TEST_F(TestDaqController, ScheduledTransitionsToTransferring)
Options for DaqController.
Definition: scheduler.hpp:162
Limited resources.
Definition: scheduler.hpp:231
Location ParseSourceLocation(std::string const &location_str)
Parse location string from DpSpec into component parts.
Definition: dpSpec.cpp:90
std::optional< FitsFileSource > source
Definition: dpSpec.hpp:37
std::vector< SourceTypes > sources
Definition: dpSpec.hpp:44
Close representation of the JSON structure but with stronger types.
Definition: dpSpec.hpp:30
std::string description
Definition: status.hpp:77
@ Completed
Completed DAQ.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Merging
DAQ is being merged.
@ Transferring
Input files are being transferred.
Options controlling rsync invocation.
Definition: main.cpp:23
daq::dpm::Scheduler and related class declarations.
Combined fake/mock.
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:124
State state
Definition: status.hpp:142
std::string id
Definition: status.hpp:140
bool error
Definition: status.hpp:143
std::vector< Alert > alerts
Active alerts.
Definition: status.hpp:147
TimePoint timestamp
Definition: status.hpp:153
std::vector< MockAsyncProcess * > procs
std::unique_ptr< AsyncProcessIf > operator()(boost::asio::io_context &, std::vector< std::string >)
std::function< void(MockAsyncProcess &)> Hook
std::unique_ptr< RsyncAsyncProcessIf > operator()(boost::asio::io_context &, std::string, std::string, RsyncOptions const &, RsyncAsyncProcess::DryRun)
std::function< void(MockRsyncAsyncProcess &)> Hook
std::vector< MockRsyncAsyncProcess * > procs
std::filesystem::path path
Definition: dpSpec.hpp:64
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")
ASSERT_EQ(meta.keyword_rules.size(), 1u)