ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
testScheduler.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdpm
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief Unit tests for daq::dpm::SchedulerImpl
9  */
10 
11 #include <daq/dpm/scheduler.hpp>
12 
13 #include <utility>
14 
15 #include "mock/mockScheduler.hpp"
16 #include "mock/mockWorkspace.hpp"
17 
18 #include <gmock/gmock.h>
19 #include <gtest/gtest.h>
20 
21 namespace daq::dpm {
22 
24  using Hook = std::function<void(MockDaqController&)>;
25  std::unique_ptr<DaqController> operator()(std::unique_ptr<DaqWorkspace>, Resources&) {
26  auto daq = std::make_unique<MockDaqController>();
27  if (hook) {
28  hook(*daq);
29  }
30  daq_controllers.push_back(daq.get());
31  return daq;
32  }
33  void SetHook(Hook f) {
34  hook = std::move(f);
35  }
36  std::vector<DaqController*> daq_controllers;
38 };
39 
40 class TestSchedulerBase : public ::testing::Test {
41 public:
43  }
44 
45  void SetUp() override {
46  }
47  void TearDown() override {
48  // Execute possibly pending completions handlers.
49  EXPECT_NO_THROW(m_io_ctx.poll());
50  }
51 
52 protected:
53  boost::asio::io_context m_io_ctx;
55  std::vector<std::string> m_queue;
56 
58 };
59 
61 
63 public:
64  TestScheduler() : m_status("TEST.ID", "TEST.FILEID") {
65  }
66  void SetUp() override {
68  m_spec_str = R"(
69  {
70  "id": "TEST.ID",
71  "target": {
72  "fileId": "TEST.FILEID",
73  "source": {
74  "sourceName": "dcs",
75  "location": "dcs-host:/path/to/somefile.fits",
76  "path": "dcs/somefile.fits"
77  }
78  },
79  "sources": [
80  {
81  "type": "fitsFile",
82  "sourceName": "fcs",
83  "location": "fcs-host:/path/to/somefile.fits",
84  "path": "fcs/somefile.fits"
85  }
86  ]
87  }
88  )";
89  }
90  void PostSetup() {
91  EXPECT_CALL(m_ws_mock, LoadQueue()).WillOnce(::testing::Return(m_queue));
92 
93  m_scheduler = std::make_unique<SchedulerImpl>(
94  m_executor, m_ws_mock, std::reference_wrapper(m_daq_controller_factory), m_options);
95  }
96  void TearDown() override {
97  m_scheduler.reset();
99  }
100 
101 protected:
103  std::unique_ptr<SchedulerImpl> m_scheduler;
104  std::string m_spec_str;
107 };
108 
109 using namespace ::testing;
110 
111 TEST_F(TestSchedulerInit, Construction) {
112  SchedulerOptions m_options;
113  EXPECT_CALL(m_ws_mock, LoadQueue()).WillOnce(Return(m_queue));
114  EXPECT_NO_THROW(SchedulerImpl(m_executor, m_ws_mock, FakeDaqControllerFactory(), m_options));
115 }
116 
117 TEST_F(TestScheduler, QueueDaqIsSuccessful) {
118  // Setup
119  auto daq_ws = std::make_unique<MockDaqWorkspace>();
120  Status status = {};
121  status.id = "TEST.ID";
122  status.file_id = "TEST.FILEID";
123  status.state = State::Scheduled;
124  status.error = false;
125 
126  EXPECT_CALL(*daq_ws, StoreStatus(status));
127  EXPECT_CALL(*daq_ws, StoreSpecification(m_spec_str));
128  EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
129  EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>{"TEST.ID"}));
130 
131  PostSetup();
132 
133  // Test
134  m_scheduler->QueueDaq(m_spec_str);
135  EXPECT_TRUE(m_scheduler->IsQueued("TEST.ID"));
136 
137  // Queue same DAQ should fail
138  EXPECT_THROW(m_scheduler->QueueDaq(m_spec_str), std::invalid_argument);
139  EXPECT_TRUE(m_scheduler->IsQueued("TEST.ID"));
140 }
141 
142 TEST_F(TestScheduler, QueueDaqRollbackIfStoreQueueFails) {
143  // Setup
144  auto daq_ws = std::make_unique<MockDaqWorkspace>();
145  Status status = {};
146  status.id = "TEST.ID";
147  status.file_id = "TEST.FILEID";
148  status.state = State::Scheduled;
149 
150  Sequence init, rollback;
151 
152  EXPECT_CALL(m_ws_mock, GetPath()).Times(AnyNumber()).WillRepeatedly(Return("/tmp/workspace"));
153 
154  EXPECT_CALL(*daq_ws, StoreStatus(status));
155  EXPECT_CALL(*daq_ws, StoreSpecification(m_spec_str));
156  EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID"))
157  .InSequence(init, rollback)
158  .WillOnce(Return(ByMove(std::move(daq_ws))));
159  EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>{"TEST.ID"}))
160  .InSequence(init, rollback)
161  .WillRepeatedly(Throw(std::runtime_error("FAILED")));
162 
163  // Rollback
164  EXPECT_CALL(m_ws_mock, RemoveDaq("TEST.ID")).InSequence(rollback);
165 
166  PostSetup();
167 
168  // Test
169  EXPECT_THROW(m_scheduler->QueueDaq(m_spec_str), std::exception);
170  EXPECT_FALSE(m_scheduler->IsQueued("TEST.ID"));
171 }
172 
173 TEST_F(TestScheduler, QueueDaqRollbackIfStoreSpecificationFails) {
174  // Setup
175  auto daq_ws = std::make_unique<MockDaqWorkspace>();
176  Status status = {};
177  status.id = "TEST.ID";
178  status.file_id = "TEST.FILEID";
179  status.state = State::Scheduled;
180 
181  Sequence init, rollback;
182 
183  EXPECT_CALL(m_ws_mock, GetPath()).Times(AnyNumber()).WillRepeatedly(Return("/tmp/workspace"));
184 
185  EXPECT_CALL(*daq_ws, StoreStatus(status));
186  EXPECT_CALL(*daq_ws, StoreSpecification(m_spec_str))
187  .WillOnce(Throw(std::runtime_error("ERROR")));
188 
189  EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID"))
190  .InSequence(init, rollback)
191  .WillOnce(Return(ByMove(std::move(daq_ws))));
192 
193  // Rollback
194  EXPECT_CALL(m_ws_mock, RemoveDaq("TEST.ID")).InSequence(rollback);
195 
196  PostSetup();
197 
198  // Test
199  EXPECT_THROW(m_scheduler->QueueDaq(m_spec_str), std::exception);
200  EXPECT_FALSE(m_scheduler->IsQueued("TEST.ID"));
201 }
202 
203 TEST_F(TestScheduler, QueueDaqRollbackIfInitializeDaqWorkspaceFails) {
204  // Setup
205  Sequence init, rollback;
206  EXPECT_CALL(m_ws_mock, GetPath()).Times(AnyNumber()).WillRepeatedly(Return("/tmp/workspace"));
207 
208  EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID"))
209  .InSequence(init, rollback)
210  .WillRepeatedly(Throw(std::runtime_error("ERROR")));
211 
212  // Rollback
213  EXPECT_CALL(m_ws_mock, RemoveDaq("TEST.ID")).InSequence(rollback);
214 
215  PostSetup();
216 
217  // Test
218  EXPECT_THROW(m_scheduler->QueueDaq(m_spec_str), std::exception);
219  EXPECT_FALSE(m_scheduler->IsQueued("TEST.ID"));
220 }
221 
222 TEST_F(TestScheduler, QueueDaqFailsIfParseDpFails) {
223  // Setup
224  Sequence init, rollback;
225  EXPECT_CALL(m_ws_mock, GetPath()).Times(AnyNumber()).WillRepeatedly(Return("/tmp/workspace"));
226 
227  EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID")).Times(0);
228 
229  PostSetup();
230 
231  // Test
232  EXPECT_THROW(m_scheduler->QueueDaq("not a specification"), std::invalid_argument);
233  EXPECT_FALSE(m_scheduler->IsQueued("TEST.ID"));
234 }
235 
236 TEST_F(TestScheduler, PollActivatesDaqWhichStartsTransfer) {
237  // Setup
238  PostSetup();
239  m_scheduler->Start();
240 
241  auto daq_ws = std::make_unique<MockDaqWorkspace>();
242  Status status = {};
243  status.id = "TEST.ID";
244  status.file_id = "TEST.FILEID";
245  status.state = State::Scheduled;
246 
247  EXPECT_CALL(*daq_ws, StoreStatus(status));
248  EXPECT_CALL(*daq_ws, StoreSpecification(m_spec_str));
249  EXPECT_CALL(m_ws_mock, InitializeDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
250  EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>{"TEST.ID"}));
251 
252  m_scheduler->QueueDaq(m_spec_str);
253  EXPECT_TRUE(m_scheduler->IsQueued("TEST.ID"));
254 
255  // Queue same DAQ should fail
256  EXPECT_THROW(m_scheduler->QueueDaq(m_spec_str), std::invalid_argument);
257  EXPECT_TRUE(m_scheduler->IsQueued("TEST.ID"));
258 
259  // We expect Poll() should load from workspace and create DaqController from factory
260  // and then start that.
261  daq_ws = std::make_unique<MockDaqWorkspace>();
262  EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
263  m_daq_controller_factory.SetHook([&](MockDaqController& mock) {
264  EXPECT_CALL(mock, GetStatus()).WillOnce(ReturnRef(m_status));
265  EXPECT_CALL(mock, GetId()).WillRepeatedly(ReturnRef(m_status.GetId()));
266  EXPECT_CALL(mock, GetState()).WillRepeatedly(Return(State::Transferring));
267  EXPECT_CALL(mock, Start());
268  });
269 
270  // Test[
271  m_io_ctx.poll();
272 }
273 
274 TEST_F(TestScheduler, PollCompletesDaq) {
275  // Setup
276  m_queue = {"TEST.ID"};
277  PostSetup();
278 
279  auto daq_ws = std::make_unique<MockDaqWorkspace>();
280 
281  m_status.SetState(State::Transferring);
282  EXPECT_TRUE(m_scheduler->IsQueued("TEST.ID"));
283 
284  // We expect Poll() should load from workspace
285  daq_ws = std::make_unique<MockDaqWorkspace>();
286 
287  EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
288  EXPECT_CALL(m_ws_mock, ArchiveDaq("TEST.ID"));
289  EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>()));
290 
291  m_daq_controller_factory.SetHook([&](MockDaqController& mock) {
292  EXPECT_CALL(mock, GetStatus()).WillOnce(ReturnRef(m_status));
293  EXPECT_CALL(mock, GetId()).WillRepeatedly(ReturnRef(m_status.GetId()));
294  EXPECT_CALL(mock, GetState())
295  .WillRepeatedly(Invoke(&m_status, &ObservableStatus::GetState));
296 
297  // When Start() is invoked on DaqController we set the state to Completed
298  // so that it will be recognized as completed and archived (Scheduler should monitor
299  // DaqControllers via signals).
300  EXPECT_CALL(mock, Start()).WillOnce(Invoke([&]() { m_status.SetState(State::Completed); }));
301  });
302 
303  // Test
304  // First poll initiates all operations.
305  m_scheduler->Start();
306  m_io_ctx.poll();
307 }
308 
309 TEST_F(TestScheduler, AbortDaqSucceedsIfNotActive) {
310  // Setup
311  m_queue = {"TEST.ID"};
312 
313  auto daq_ws = std::make_unique<MockDaqWorkspace>();
314  Status status = {};
315  status.id = "TEST.ID";
316  status.file_id = "TEST.FILEID";
317  status.state = State::Scheduled;
318  status.error = false;
319 
320  EXPECT_CALL(*daq_ws, LoadStatus()).WillOnce(Return(status));
321  EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
322  EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>{}));
323  EXPECT_CALL(m_ws_mock, RemoveDaq("TEST.ID"));
324 
325  PostSetup();
326 
327  // Test
328  ASSERT_TRUE(m_scheduler->IsQueued(status.id));
329  m_scheduler->AbortDaq(status.id);
330  EXPECT_FALSE(m_scheduler->IsQueued(status.id));
331 }
332 
333 TEST_F(TestScheduler, AbortDaqSucceedsIfActive) {
334  // Setup
335  m_queue = {"TEST.ID"};
336  m_status.SetState(State::Scheduled);
337  auto daq_ws_init = std::make_unique<MockDaqWorkspace>();
338  auto daq_ws_abort = std::make_unique<MockDaqWorkspace>();
339 
340  EXPECT_CALL(*daq_ws_init, LoadStatus()).WillRepeatedly(Return(m_status.GetStatus()));
341  EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws_init))));
342 
343  m_daq_controller_factory.SetHook([&](MockDaqController& mock) {
344  EXPECT_CALL(mock, GetStatus()).WillOnce(ReturnRef(m_status));
345  EXPECT_CALL(mock, GetId()).WillRepeatedly(ReturnRef(m_status.GetId()));
346  EXPECT_CALL(mock, GetState()).WillRepeatedly(Return(State::Transferring));
347  EXPECT_CALL(mock, Start());
348  });
349 
350  PostSetup();
351 
352  // Poll to activate DAQ
353  m_scheduler->Start();
354  m_io_ctx.poll();
355 
356  EXPECT_CALL(*daq_ws_abort, LoadStatus()).WillRepeatedly(Return(m_status.GetStatus()));
357  EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws_abort))));
358  EXPECT_CALL(m_ws_mock, RemoveDaq("TEST.ID"));
359  EXPECT_CALL(m_ws_mock, StoreQueue(std::vector<std::string>{}));
360 
361  // Test
362  ASSERT_TRUE(m_scheduler->IsQueued("TEST.ID"));
363  m_scheduler->AbortDaq("TEST.ID");
364  EXPECT_FALSE(m_scheduler->IsQueued("TEST.ID"));
365 }
366 
367 TEST_F(TestScheduler, AbortDaqFailsIfDaqDoesNotExit) {
368  // Setup
369  PostSetup();
370 
371  // Test
372  EXPECT_THROW(m_scheduler->AbortDaq("UNKOWN"), std::runtime_error);
373 }
374 
375 TEST_F(TestScheduler, GetStatusSucceeds) {
376  // Setup
377  m_queue = {"TEST.ID"};
378  auto daq_ws = std::make_unique<MockDaqWorkspace>();
379 
380  EXPECT_CALL(*daq_ws, LoadStatus()).WillRepeatedly(Return(m_status.GetStatus()));
381  EXPECT_CALL(m_ws_mock, LoadDaq("TEST.ID")).WillOnce(Return(ByMove(std::move(daq_ws))));
382 
383  PostSetup();
384 
385  // Test
386  ASSERT_TRUE(m_scheduler->IsQueued("TEST.ID"));
387  auto status = m_scheduler->GetDaqStatus("TEST.ID");
388  EXPECT_EQ(status.id, "TEST.ID");
389  EXPECT_EQ(status.state, m_status.GetState());
390 }
391 
392 } // namespace daq::dpm
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:165
State GetState() const noexcept
Definition: status.cpp:215
daq::dpm::MockWorkspace m_ws_mock
boost::asio::io_context m_io_ctx
std::vector< std::string > m_queue
SchedulerOptions m_options
std::unique_ptr< SchedulerImpl > m_scheduler
ObservableStatus m_status
FakeDaqControllerFactory m_daq_controller_factory
void TearDown() override
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
Mocks for daq::dpm::Scheduler and daq::dpm::DaqScheduler.
TEST_F(TestDaqController, ScheduledTransitionsToTransferring)
Limited resources.
Definition: scheduler.hpp:231
Options controlling scheduler operations.
Definition: scheduler.hpp:132
@ Completed
Completed DAQ.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Transferring
Input files are being transferred.
daq::dpm::Scheduler and related class declarations.
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:124
State state
Definition: status.hpp:142
std::string id
Definition: status.hpp:140
std::string file_id
Definition: status.hpp:141
bool error
Definition: status.hpp:143
std::unique_ptr< DaqController > operator()(std::unique_ptr< DaqWorkspace >, Resources &)
std::vector< DaqController * > daq_controllers
std::function< void(MockDaqController &)> Hook
EXPECT_EQ(meta.rr_uri, "zpb.rr://meta")