ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
workspace.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_dpm
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daq::dpm::Workspace interface and implementation declaration
9  */
10 #include <daq/dpm/workspace.hpp>
11 
12 #include <fstream>
13 #include <stdexcept>
14 #include <system_error>
15 
16 #include <fmt/format.h>
17 #include <nlohmann/json.hpp>
18 
19 #include <daq/json.hpp>
20 
21 namespace fs = std::filesystem;
22 
23 namespace daq::dpm {
24 namespace {
25 
26 constexpr const std::string_view PATH_IN_PROGRESS = "in-progress";
27 constexpr const std::string_view PATH_ARCHIVE = "archive";
28 constexpr const std::string_view PATH_RESULT = "result";
29 constexpr const std::string_view PATH_SOURCES = "sources";
30 constexpr const std::string_view FILE_DAQ_QUEUE = "queue.json";
31 constexpr const std::string_view FILE_DP_SPEC = "specification.json";
32 constexpr const std::string_view FILE_DAQ_STATUS = "status.json";
33 constexpr const std::string_view FILE_DAQ_SOURCES = "sources.json";
34 constexpr const std::string_view FILE_TEMP_EXT = ".swap";
35 constexpr const std::string_view FILE_SYMLINK = "result";
36 
37 template <class T>
38 void SafeWrite(T const& content, fs::path const& destination) {
39  try {
40  auto temp = destination;
41  auto temp_file = destination.filename().native();
42  temp_file += FILE_TEMP_EXT;
43  temp.replace_filename(temp_file);
44  // Write to temporary file
45  {
46  std::fstream temp_file(temp, std::ios::out | std::ios::trunc);
47  temp_file << std::setw(4) << content;
48  if (!temp_file) {
49  throw std::runtime_error(
50  fmt::format("File contains error after write: '{}'", temp.native()));
51  }
52  temp_file.close();
53  }
54  // Move (atomically) into location
55  fs::rename(temp, destination);
56  } catch (...) {
57  std::throw_with_nested(
58  std::runtime_error(fmt::format("Failed to write file '{}'", destination.native())));
59  }
60 }
61 
62 /**
63  * Initialize workspace
64  *
65  * @note Is safe to run for existing and valid workspace.
66  */
67 void InitRootWorkspace(fs::path const& root) {
68  // If there are non-managed files refuse to initialize new worksace as we are misconfigured
69  auto status = fs::status(root);
70  if (fs::exists(status)) {
71  if (!fs::is_directory(status)) {
72  throw std::invalid_argument(
73  fmt::format("Workspace root exist but is not a directory: {}", root.native()));
74  }
75 
76  for (auto const& entry : fs::directory_iterator(root)) {
77  if (entry.path().extension() == FILE_TEMP_EXT) {
78  // Old temporary files -> delete
79  fs::remove(entry);
80  continue;
81  }
82  if (entry.path().filename() == FILE_DAQ_QUEUE) {
83  continue;
84  }
85  if (entry.path().filename() == PATH_IN_PROGRESS) {
86  continue;
87  }
88  if (entry.path().filename() == PATH_ARCHIVE) {
89  continue;
90  }
91  if (entry.path().filename() == PATH_RESULT) {
92  continue;
93  }
94  throw std::invalid_argument(
95  fmt::format("Workspace {} has unexpected contents - aborting to prevent filesystem "
96  "modifications due to misconfiguration",
97  root.native()));
98  }
99  }
100 
101  fs::create_directories(root);
102  fs::create_directory(root / PATH_IN_PROGRESS);
103  fs::create_directory(root / PATH_ARCHIVE);
104  fs::create_directory(root / PATH_RESULT);
105 }
106 
107 /**
108  * Init new data acquisition directory inside root.
109  * It assumes root already exits.
110  *
111  * @param root Workspace root (not DAQ workspace)
112  */
113 auto MakeInProgressDaqPath(fs::path const& root, std::string const& daq_id) -> fs::path {
114  return root / PATH_IN_PROGRESS / daq_id;
115 }
116 
117 auto InitDaqWorkspace(fs::path const& root, std::string const& daq_id) -> fs::path {
118  auto daq_root = MakeInProgressDaqPath(root, daq_id);
119  if (fs::exists(daq_root)) {
120  throw std::invalid_argument(
121  fmt::format("DAQ workspace directory '{}' already exists", daq_root.string()).c_str());
122  }
123  fs::create_directory(daq_root);
124  fs::create_directory(daq_root / PATH_SOURCES);
125  return daq_root;
126 }
127 
128 } // namespace
129 
130 WorkspaceImpl::WorkspaceImpl(fs::path root) : m_root(std::move(root)) {
131  auto status = fs::status(m_root);
132  if (status.type() != fs::file_type::directory && status.type() != fs::file_type::not_found) {
133  throw std::invalid_argument(
134  fmt::format("Workspace root directory '{}' is not a directory", m_root.string())
135  .c_str());
136  }
137 
138  InitRootWorkspace(m_root);
139  // Initialize empty DAQ queue if necessary
140  if (!fs::exists(m_root / FILE_DAQ_QUEUE)) {
141  WorkspaceImpl::StoreQueue({});
142  }
143 }
144 
145 auto WorkspaceImpl::EnumerateDaqs() const -> std::vector<std::string> {
146  return {};
147 }
148 
149 fs::space_info WorkspaceImpl::QueryStorageStatus() const {
150  return fs::space(m_root);
151 }
152 
153 auto WorkspaceImpl::InitializeDaq(std::string const& daq_id) -> std::unique_ptr<DaqWorkspace> {
154  auto root = InitDaqWorkspace(m_root, daq_id);
155  return std::make_unique<DaqWorkspaceImpl>(root, m_root / PATH_RESULT);
156 }
157 
158 auto WorkspaceImpl::LoadDaq(std::string const& daq_id) -> std::unique_ptr<DaqWorkspace> {
159  auto root = MakeInProgressDaqPath(m_root, daq_id);
160  return std::make_unique<DaqWorkspaceImpl>(root, m_root / PATH_RESULT);
161 }
162 
163 void WorkspaceImpl::RemoveDaq(std::string const& daq_id) {
164  fs::remove_all(m_root / PATH_IN_PROGRESS / daq_id);
165 }
166 
167 auto WorkspaceImpl::ArchiveDaq(std::string const& daq_id) -> std::filesystem::path {
168  auto target = m_root / PATH_ARCHIVE / daq_id;
169  if (fs::exists(target)) {
170  target += ".";
171  for (auto idx = 1u; idx < 0xffff; ++idx) {
172  auto tmp = target;
173  tmp += std::to_string(idx);
174  if (!fs::exists(tmp)) {
175  target.swap(tmp);
176  break;
177  }
178  }
179  }
180  fs::rename(m_root / PATH_IN_PROGRESS / daq_id, target);
181  return target;
182 }
183 
184 auto WorkspaceImpl::LoadQueue() const -> std::vector<std::string> {
185  auto file = m_root / FILE_DAQ_QUEUE;
186  try {
187  std::fstream fs(file, std::ios::in);
188  auto json = nlohmann::json::parse(fs);
189  return json.get<std::vector<std::string>>();
190  } catch (...) {
191  std::throw_with_nested(
192  std::runtime_error(fmt::format("Failed to load DAQ queue '{}'", file.native())));
193  }
194 }
195 
196 void WorkspaceImpl::StoreQueue(std::vector<std::string> const& queue) const {
197  try {
198  nlohmann::json json;
199  json = queue;
200  SafeWrite(json, m_root / FILE_DAQ_QUEUE);
201  } catch (...) {
202  std::throw_with_nested(std::runtime_error("Failed to store DAQ queue"));
203  }
204 }
205 
206 DaqWorkspaceImpl::DaqWorkspaceImpl(fs::path root, fs::path result)
207  : m_root(std::move(root)), m_result(std::move(result)) {
208  if (!m_root.is_absolute()) {
209  throw std::invalid_argument(
210  fmt::format("DAQ workspace root path must be absolute: {}", m_root.native()));
211  }
212  if (!m_result.is_absolute()) {
213  throw std::invalid_argument(
214  fmt::format("DAQ workspace result path must be absolute: {}", m_result.native()));
215  }
216 }
217 
218 auto DaqWorkspaceImpl::GetSourcesPath() const -> std::filesystem::path {
219  return PATH_SOURCES;
220 }
221 
223  try {
224  std::fstream fs(m_root / FILE_DAQ_STATUS, std::ios::in);
225  auto json = nlohmann::json::parse(fs);
226  return json.get<Status>();
227  } catch (...) {
228  std::throw_with_nested(std::runtime_error("Failed to load DAQ status"));
229  }
230 }
231 
232 void DaqWorkspaceImpl::StoreStatus(Status const& status) const {
233  try {
234  nlohmann::json json;
235  json = status;
236  SafeWrite(json, m_root / FILE_DAQ_STATUS);
237  } catch (...) {
238  std::throw_with_nested(std::runtime_error("Failed to store DAQ status"));
239  }
240 }
241 
242 auto DaqWorkspaceImpl::GetSourceLookupPath() const -> std::filesystem::path {
243  return FILE_DAQ_SOURCES;
244 }
245 
246 void DaqWorkspaceImpl::MakeResultSymlink(std::filesystem::path const& result) const {
247  auto relative = result.lexically_relative(m_root);
248  fs::create_symlink(relative, m_root / FILE_SYMLINK);
249 }
250 
252  try {
253  std::fstream fs(m_root / FILE_DAQ_SOURCES, std::ios::in);
254  auto json = nlohmann::json::parse(fs);
255  return json.get<SourceResolver::Mapping>();
256  } catch (...) {
257  std::throw_with_nested(std::runtime_error("Failed to load DAQ source lookup"));
258  }
259 }
260 
262  try {
263  nlohmann::json json;
264  json = lookup;
265  SafeWrite(json, m_root / FILE_DAQ_SOURCES);
266  } catch (...) {
267  std::throw_with_nested(std::runtime_error("Failed to store DAQ source lookup"));
268  }
269 }
270 
271 auto DaqWorkspaceImpl::GetSpecificationPath() const -> std::filesystem::path {
272  return FILE_DP_SPEC;
273 }
274 
275 auto DaqWorkspaceImpl::LoadSpecification() const -> json::DpSpec {
276  auto file = m_root / FILE_DP_SPEC;
277  try {
278  std::fstream fs(file, std::ios::in);
279  auto json = nlohmann::json::parse(fs);
280  return json::ParseDpSpec(json);
281  } catch (...) {
282  std::throw_with_nested(std::runtime_error(
283  fmt::format("Failed to load Data Product Specification '{}'", file.native())));
284  }
285 }
286 
287 void DaqWorkspaceImpl::StoreSpecification(std::string const& specification) const {
288  try {
289  SafeWrite(specification, m_root / FILE_DP_SPEC);
290  } catch (...) {
291  std::throw_with_nested(std::runtime_error("Failed to store DAQ status"));
292  }
293 }
294 
295 } // namespace daq::dpm
void StoreSpecification(std::string const &specification) const override
Get file name of the data product specification stored in StoreSpecification()
Definition: workspace.cpp:287
void StoreSourceLookup(SourceResolver::Mapping const &status) const override
Definition: workspace.cpp:261
auto LoadSpecification() const -> json::DpSpec override
Get file name of the data product specification stored in StoreSpecification()
Definition: workspace.cpp:275
auto GetSourcesPath() const -> std::filesystem::path override
Definition: workspace.cpp:218
auto GetSourceLookupPath() const -> std::filesystem::path override
Definition: workspace.cpp:242
void StoreStatus(Status const &status) const override
Definition: workspace.cpp:232
auto GetSpecificationPath() const -> std::filesystem::path override
Get file name of the data product specification stored in StoreSpecification()
Definition: workspace.cpp:271
auto LoadSourceLookup() const -> SourceResolver::Mapping override
Definition: workspace.cpp:251
auto LoadStatus() const -> Status override
Definition: workspace.cpp:222
void MakeResultSymlink(std::filesystem::path const &result) const override
Create symlink to result file.
Definition: workspace.cpp:246
Provides location of fits source file.
std::map< SourceFile, std::string > Mapping
WorkspaceImpl(std::filesystem::path root)
Opens or creates workspace in the specified location, using that as a root.
Declares JSON support for serialization.
daq::dpm::Workspace interface and implementation declaration
DpSpec ParseDpSpec(Json const &json)
Parse JSON to construct the DpSpec structure.
Definition: dpSpec.cpp:47
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:124