Server (daqDpmServer
)¶
The main DPM application is daqDpmServer, which coordinates the merging of Data Acquisition source files to create the final Data Product.
Note
Interaction with daqDpmServer is mainly reserved for daqOcmServer.
State Machine¶
The daqDpmServer does not implement a state machine, when started it becomes operational automatically.
MAL URI Paths¶
The following tables summarize the request/reply service paths and topic paths for pub/sub.
URI Path |
Root URI Configuration |
Description |
---|---|---|
|
|
DPM control interface |
|
|
DPM Data Acquisition control interface |
Topic Type |
URI Path |
Root URI Configuration |
Description |
---|---|---|---|
|
|
|
Status updates to DAQ is published as change occurs. It is supp. |
|
|
Storage status (importantly available space) is published at intervals in this topic. |
Command Line Arguments¶
Command line argument help is available under the option --help
.
--proc-name ARG| -n ARG
(string) [default: dpm]Process instance name.
--config ARG| -c ARG
(string) [default: config/daqDpmServer/config.yaml]Config Path to application configuration file e.g.
--config ocs/ocm.yaml
(see Configuration File for configuration file content).--log-level ARG| -l ARG
(enum) [default: INFO]Log level to use. One of ERROR, WARNING, STATE, EVENT, ACTION, INFO, DEBUG, TRACE.
--workspace ARG
(string) [default: dpm]Workspace used by daqDpmServer to store source files before merging as well as the result after merging is complete (see daqDpmServer workspace for details).
Absolute paths are used as is.
Relative paths are defined relative to
cfg.dataroot
.
--rr-uri ARG
(string) [default: zpb.rr://127.0.0.1:12083/]MAL server request root endpoint on which to accept requests. Trailing slashes are optional, e.g. example:
"zpb.ps://127.0.0.1:12345/"
or"zpb.ps://127.0.0.1:12345"
.Specifying endpoint as command line argument takes predecence over configuration file parameter
cfg.req.endpoint
.--ps-uri ARG
(string) [default: zpb.ps://127.0.0.1:12084/]MAL publish root endpoint on which to publish topics from. Trailing slashes are optional, e.g. example:
"zpb.ps://127.0.0.1:12345/"
or"zpb.ps://127.0.0.1:12345"
.Specifying endpoint as command line argument takes predecence over configuration file parameter
cfg.pub.endpoint
.--poll-once
Initiates operations once and then runs until there is no more work to do and then exits. Option is provided for interactive use, e.g. with manual error recovery.
Environment Variables¶
$DATAROOT
If defined it specifies the default value for for
cfg.dataroot
. If the configuration parameter is defined it takes precedence over$DATAROOT
.
Configuration File¶
The configuration file is currently based on YAML. This section describes what the configuration parameters are and how to set them.
If a configuration parameter can be provided via command line, configuration file and environment variable the precedence order (high to low priority) is:
Command line value
Configuration file value
Environment variable value
Enumeration of parameters:
cfg.dataroot
(string) [default: $DATAROOT]IFW standard output directory.
cfg.workspace
(string) [default: dpm]Workspace used by daqDpmServer to store source files before merging as well as the result after merging is complete (see daqDpmServer workspace for details).
Absolute paths are used as is (recommended).
Relative paths are defined relative to
cfg.dataroot
.
cfg.log.properties
(string)Config Path to a log4cplus log configuration file.
cfg.req.endpoint
(string) [default: zpb.rr://127.0.0.1:12085/]MAL server request root endpoint on which to accept requests. Trailing slashes are optional, e.g. example:
"zpb.ps://127.0.0.1:12345/"
or"zpb.ps://127.0.0.1:12345"
.cfg.pub.endpoint
(string) [default: zpb.ps://127.0.0.1:12086/]MAL server publish root endpoint on which to publish topics from. Trailing slashes are optional, e.g. example:
"zpb.ps://127.0.0.1:12345/"
or"zpb.ps://127.0.0.1:12345"
.cfg.limit.daq
(integer) [default: 1]Limits number of concurrent Data Acquisitions that daqDpmServer will process. Using 0 is infinite.
cfg.limit.merge
(integer) [default: 1]Limits number of concurrent merge processes. Using 0 is infinite.
cfg.limit.net.receive
(integer) [default: 0]Limits number of network receive tranfers. Using 0 is infinite.
cfg.limit.net.send
(integer) [default: 0]Limits number of network send transfers. Using 0 is infinite.
Note
The following parameters are provided but not expected to be modified.
cfg.bin.merge
(string) [default: daqDpmMerge]Merge application name.
cfg.bin.rsync
(string) [default: rsync]Rsync application name.
Example (partial) configuration:
cfg.workspace: "/absolute/path/to/workspace"
cfg.req.endpoint: "zpb.rr://127.0.0.1:12085/"
cfg.pub.endpoint: "zpb.ps://127.0.0.1:12086/"
cfg.log.properties: "log.properties"
# Concurrencly limits
cfg.limit.daq: 2
cfg.limit.merge: 2
cfg.limit.net.receive: 5
cfg.limit.net.send: 5
Workspace¶
The daqDpmServer workspace is the designated file system area used to store both intermediate and final result of Data Acquisitions:
Individual input files from data sources (i.e. FITS files).
Data Product Specification that specifies how inputs are merged together.
Various internal files used by DPM.
Final Data Product when merged.
The structure is as follows:
/
Workspace root as configured via configuration file, environment variable or command line.
/queue.json
Queue of Data Acquisitions, as an array of Data Acquisition identifiers, that have been scheduled but are not yet completed. The order is significant and are processed in FIFO order.
/result/
Directory containing Data Product results.
For each Data Acquisition the Data Product result is produced in state
Merging
by daqDpmMerge and is guraranteed to be available from stateReleasing
.The files follow the ICS name which is:
{fileId}.fits
{prefix}{fileId}.fits
if a prefix has been chosen.
Note
Data Acquisition results are moved atomically into this directory and are then immutable. daqDpmServer requires read access until Data Acquisition reach state
StateCompleted
.Like files in
/archive/
an operational procedure is foreseen that specifies when files should be deleted. This procedure can e.g. ensure that Data Product has been successfully ingested and backed up by OLAS./in-progress/
Root directory containing a directory for each Data Acquisition that has been queued for merging but is not yet completed. The directories are named after Data Acquisition id.
/in-progress/{id}/
Contains persistent state for each Data Acquisition (where
{id}
is the Data Acquisition identifier). Also referred to as the Data Acquisition Workspace.sources/
Subdirectory containing Data Product source FITS files. They are renamed to avoid possibility of name collisions but the origin data source is identifiable given the naming pattern:
{index}_{data source name}_{origin filename}
.logs/
May contain log files related to the merge operation.
specification.json
Data Product Specification specifying how to create Data Product. It is written and updated by daqDpmServer and read by daqDpmMerge.
The file is produced in state
Scheduled
after parsing the received Data Product Specification for source files.sources.json
Specifies the source FITS files required to execute the merge. The JSON file specifies the remote origin location as well as the local file path where it will be copied. daqDpmMerge will use this as a source lookup.
The file is produced in state
Scheduled
after parsing the received Data Product Specification for source files.status.json
Internal merge status maintained by daqDpmServer, used to be able to resume an interrupted merge process. The file is written for every state change.
result
Symlink to result file. The file is produced in state
Merging
by daqDpmMerge and is guaranteed to be available from stateReleasing
.
/archive/
Subdirectory for StateCompleted Data Acquisitions. Files in this directory are no longer used by daqDpmServer and can be removed.
/archive/{id}/
Subdirectory for each Data Acquisition. When Data Acquisition is completed it is moved from
/in-progress/{id}/
to/archive/{id}
so structure is identical.
The following shows an example of files and directories in the workspace with one completed Data Acquisition and two in progress of being merged:
.
├── archive/
│ └── TEST.2021-05-18T15:10:02.101/
│ ├── result -> ../../result/TEST.2021-05-18T15:10:02.101.fits
│ ├── sources/
│ ├── logs/
│ ├── sources.json
│ ├── specification.json
│ └── status.json
├── in-progress/
│ ├── TEST.2021-05-18T14:49:03.905/
│ │ ├── sources/
│ │ │ ├── 0_dcs_TEST.2021-05-18T14:49:03.905-dcs.fits
│ │ │ └── 1_fcs_TEST.2021-05-18T14:49:03.905-fcs.fits
│ │ ├── logs/
│ │ ├── sources.json
│ │ ├── specification.json
│ │ └── status.json
│ └── TEST.2021-05-18T15:10:02.101/
│ ├── sources/
│ ├── logs/
│ ├── sources.json
│ ├── specification.json
│ └── status.json
└── result/
└── TEST.2021-05-18T15:10:02.101.fits
Merger (daqDpmMerge
)¶
Standalone application that creates the final Data Product from a specification and is normally executed as a subprocess of daqDpmServer.
Command Line Arguments¶
Synopsis:
daqDpmMerge [options] <specification-file>
Where
<specification-file>
Data Product Specification file. To read from standard input use
-
. See Data Product Specification for file format details.
Options:
--root=DIR
Root directory DIR from which relative source paths in specification-file will be resolved.
By default the root directory will be set to:
The same directory that holds specification-file, if it is a regular file.
The current working directory, if specification-file is not a regular file.
--outfile | -o
FITS output file name, e.g.
-o myfits.fits
or –outfile=/path/to/myfits.fits`. By default the output name will be derived using the specification-filefileId
property:<fileId>.fits
.Relative paths are relative to the root directory.
--dry-run
Skips operations with visible side-effects. All inputs are opened in read-only mode. Some operations are performed using in-memory FITS file.
Useful for validating arguments and other inputs.
--json
Print status messages to standard output in JSON format with one message per line. By default status messages are printed in human readable form.
--verbose | -v
Increase the verbosity level. Can be passed multiple times (-v INFO, -vv DEBUG -vvv TRACE).
--help
Show help and exit.
--version
Show version information and exit.
Environment Variables¶
TBD