14 #include <system_error>
16 #include <fmt/format.h>
17 #include <nlohmann/json.hpp>
21 namespace fs = std::filesystem;
26 constexpr
const std::string_view PATH_IN_PROGRESS =
"in-progress";
27 constexpr
const std::string_view PATH_ARCHIVE =
"archive";
28 constexpr
const std::string_view PATH_RESULT =
"result";
29 constexpr
const std::string_view PATH_SOURCES =
"sources";
30 constexpr
const std::string_view FILE_DAQ_QUEUE =
"queue.json";
31 constexpr
const std::string_view FILE_DP_SPEC =
"specification.json";
32 constexpr
const std::string_view FILE_DAQ_STATUS =
"status.json";
33 constexpr
const std::string_view FILE_DAQ_SOURCES =
"sources.json";
34 constexpr
const std::string_view FILE_TEMP_EXT =
".swap";
35 constexpr
const std::string_view FILE_SYMLINK =
"result";
38 void SafeWrite(T
const& content, fs::path
const& destination) {
40 auto temp = destination;
41 auto temp_file = destination.filename().native();
42 temp_file += FILE_TEMP_EXT;
43 temp.replace_filename(temp_file);
46 std::fstream temp_file(temp, std::ios::out | std::ios::trunc);
47 temp_file << std::setw(4) << content;
49 throw std::runtime_error(
50 fmt::format(
"File contains error after write: '{}'", temp.native()));
55 fs::rename(temp, destination);
57 std::throw_with_nested(
58 std::runtime_error(fmt::format(
"Failed to write file '{}'", destination.native())));
67 void InitRootWorkspace(fs::path
const& root) {
69 auto status = fs::status(root);
70 if (fs::exists(status)) {
71 if (!fs::is_directory(status)) {
72 throw std::invalid_argument(
73 fmt::format(
"Workspace root exist but is not a directory: {}", root.native()));
76 for (
auto const& entry : fs::directory_iterator(root)) {
77 if (entry.path().extension() == FILE_TEMP_EXT) {
82 if (entry.path().filename() == FILE_DAQ_QUEUE) {
85 if (entry.path().filename() == PATH_IN_PROGRESS) {
88 if (entry.path().filename() == PATH_ARCHIVE) {
91 if (entry.path().filename() == PATH_RESULT) {
94 throw std::invalid_argument(
95 fmt::format(
"Workspace {} has unexpected contents - aborting to prevent filesystem "
96 "modifications due to misconfiguration",
101 fs::create_directories(root);
102 fs::create_directory(root / PATH_IN_PROGRESS);
103 fs::create_directory(root / PATH_ARCHIVE);
104 fs::create_directory(root / PATH_RESULT);
113 auto MakeInProgressDaqPath(fs::path
const& root, std::string
const& daq_id) -> fs::path {
114 return root / PATH_IN_PROGRESS / daq_id;
117 auto InitDaqWorkspace(fs::path
const& root, std::string
const& daq_id) -> fs::path {
118 auto daq_root = MakeInProgressDaqPath(root, daq_id);
119 if (fs::exists(daq_root)) {
120 throw std::invalid_argument(
121 fmt::format(
"DAQ workspace directory '{}' already exists", daq_root.string()).c_str());
123 fs::create_directory(daq_root);
124 fs::create_directory(daq_root / PATH_SOURCES);
131 auto status = fs::status(m_root);
132 if (status.type() != fs::file_type::directory && status.type() != fs::file_type::not_found) {
133 throw std::invalid_argument(
134 fmt::format(
"Workspace root directory '{}' is not a directory", m_root.string())
138 InitRootWorkspace(m_root);
140 if (!fs::exists(m_root / FILE_DAQ_QUEUE)) {
141 WorkspaceImpl::StoreQueue({});
145 auto WorkspaceImpl::EnumerateDaqs() const -> std::vector<std::
string> {
149 fs::space_info WorkspaceImpl::QueryStorageStatus()
const {
150 return fs::space(m_root);
153 auto WorkspaceImpl::InitializeDaq(std::string
const& daq_id) -> std::unique_ptr<DaqWorkspace> {
154 auto root = InitDaqWorkspace(m_root, daq_id);
155 return std::make_unique<DaqWorkspaceImpl>(root, m_root / PATH_RESULT);
158 auto WorkspaceImpl::LoadDaq(std::string
const& daq_id) -> std::unique_ptr<DaqWorkspace> {
159 auto root = MakeInProgressDaqPath(m_root, daq_id);
160 return std::make_unique<DaqWorkspaceImpl>(root, m_root / PATH_RESULT);
163 void WorkspaceImpl::RemoveDaq(std::string
const& daq_id) {
164 fs::remove_all(m_root / PATH_IN_PROGRESS / daq_id);
167 auto WorkspaceImpl::ArchiveDaq(std::string
const& daq_id) -> std::filesystem::path {
168 auto target = m_root / PATH_ARCHIVE / daq_id;
169 if (fs::exists(target)) {
171 for (
auto idx = 1u; idx < 0xffff; ++idx) {
173 tmp += std::to_string(idx);
174 if (!fs::exists(tmp)) {
180 fs::rename(m_root / PATH_IN_PROGRESS / daq_id, target);
184 auto WorkspaceImpl::LoadQueue() const -> std::vector<std::
string> {
185 auto file = m_root / FILE_DAQ_QUEUE;
187 std::fstream fs(file, std::ios::in);
188 auto json = nlohmann::json::parse(fs);
189 return json.get<std::vector<std::string>>();
191 std::throw_with_nested(
192 std::runtime_error(fmt::format(
"Failed to load DAQ queue '{}'", file.native())));
196 void WorkspaceImpl::StoreQueue(std::vector<std::string>
const& queue)
const {
200 SafeWrite(json, m_root / FILE_DAQ_QUEUE);
202 std::throw_with_nested(std::runtime_error(
"Failed to store DAQ queue"));
206 DaqWorkspaceImpl::DaqWorkspaceImpl(fs::path root, fs::path result)
207 : m_root(std::move(root)), m_result(std::move(result)) {
208 if (!m_root.is_absolute()) {
209 throw std::invalid_argument(
210 fmt::format(
"DAQ workspace root path must be absolute: {}", m_root.native()));
212 if (!m_result.is_absolute()) {
213 throw std::invalid_argument(
214 fmt::format(
"DAQ workspace result path must be absolute: {}", m_result.native()));
224 std::fstream fs(m_root / FILE_DAQ_STATUS, std::ios::in);
225 auto json = nlohmann::json::parse(fs);
226 return json.get<
Status>();
228 std::throw_with_nested(std::runtime_error(
"Failed to load DAQ status"));
236 SafeWrite(json, m_root / FILE_DAQ_STATUS);
238 std::throw_with_nested(std::runtime_error(
"Failed to store DAQ status"));
243 return FILE_DAQ_SOURCES;
247 auto relative = result.lexically_relative(m_root);
248 fs::create_symlink(relative, m_root / FILE_SYMLINK);
253 std::fstream fs(m_root / FILE_DAQ_SOURCES, std::ios::in);
254 auto json = nlohmann::json::parse(fs);
257 std::throw_with_nested(std::runtime_error(
"Failed to load DAQ source lookup"));
265 SafeWrite(json, m_root / FILE_DAQ_SOURCES);
267 std::throw_with_nested(std::runtime_error(
"Failed to store DAQ source lookup"));
276 auto file = m_root / FILE_DP_SPEC;
278 std::fstream fs(file, std::ios::in);
279 auto json = nlohmann::json::parse(fs);
282 std::throw_with_nested(std::runtime_error(
283 fmt::format(
"Failed to load Data Product Specification '{}'", file.native())));
289 SafeWrite(specification, m_root / FILE_DP_SPEC);
291 std::throw_with_nested(std::runtime_error(
"Failed to store DAQ status"));
void StoreSpecification(std::string const &specification) const override
Get file name of the data product specification stored in StoreSpecification()
void StoreSourceLookup(SourceResolver::Mapping const &status) const override
auto LoadSpecification() const -> json::DpSpec override
Get file name of the data product specification stored in StoreSpecification()
auto GetSourcesPath() const -> std::filesystem::path override
auto GetSourceLookupPath() const -> std::filesystem::path override
void StoreStatus(Status const &status) const override
auto GetSpecificationPath() const -> std::filesystem::path override
Get file name of the data product specification stored in StoreSpecification()
auto LoadSourceLookup() const -> SourceResolver::Mapping override
auto LoadStatus() const -> Status override
void MakeResultSymlink(std::filesystem::path const &result) const override
Create symlink to result file.
Provides location of fits source file.
std::map< SourceFile, std::string > Mapping
WorkspaceImpl(std::filesystem::path root)
Opens or creates workspace in the specified location, using that as a root.
Declares JSON support for serialization.
daq::dpm::Workspace interface and implementation declaration
DpSpec ParseDpSpec(Json const &json)
Parse JSON to construct the DpSpec structure.
Non observable status object that keeps stores status of data acquisition.