ddt  0.1
ddtMemoryAccessor.hpp
Go to the documentation of this file.
1 // @copyright
2 // (c) Copyright ESO 2020
3 // All Rights Reserved
4 // ESO (eso.org) is an Intergovernmental Organization, and therefore special
5 // legal conditions apply.
6 //
7 // @file ddtMemoryAccessor.hpp
8 // @brief Accessor for a shared memory.
9 //
10 // This class provides the functionalities to access created shared memories
11 // (especially read and write functionality).
12 //
13 // @author Matthias Grimm, CGI
14 // @since 2020/01/16
15 //
16 
17 #ifndef DDTMEMORYACCESSOR_H_
18 #define DDTMEMORYACCESSOR_H_
19 
20 #include <boost/circular_buffer.hpp>
21 #include <boost/interprocess/containers/string.hpp>
22 #include <boost/interprocess/containers/vector.hpp>
23 #include <boost/interprocess/managed_shared_memory.hpp>
24 #include <boost/signals2/signal.hpp>
25 #include <fstream>
26 #include <future>
27 #include <iostream>
28 
29 #include "ddt/ddtConstants.hpp"
30 #include "ddt/ddtCrc32.hpp"
31 #include "ddt/ddtLogger.hpp"
32 
33 namespace ip = boost::interprocess;
34 
38 typedef ip::managed_shared_memory::segment_manager segment_manager_t;
39 
43 typedef ip::allocator<void, segment_manager_t> void_allocator;
44 
48 typedef ip::allocator<uint8_t, segment_manager_t> uint8_allocator;
49 
53 typedef ip::vector<uint8_t, uint8_allocator> uint8_vector;
54 
58 typedef ip::allocator<uint16_t, segment_manager_t> uint16_allocator;
59 
63 typedef ip::vector<uint16_t, uint16_allocator> uint16_vector;
64 
68 typedef ip::allocator<char, segment_manager_t> char_allocator;
69 
73 typedef ip::basic_string<char, std::char_traits<char>, char_allocator>
75 
79 typedef boost::signals2::signal<void()> signal_t;
80 
81 namespace ddt {
82 
90  int32_t topic_id = 0;
91 
96 
101 
105  int32_t sample_id;
106 
111 
115  DataSampleShared(const int32_t id, const int md_length,
116  const int vector_length, const void_allocator &void_alloc)
117  : meta_data_length(md_length),
118  meta_data(md_length, uint8_t(), void_alloc),
119  sample_id(id),
120  data(vector_length, uint8_t(), void_alloc) {}
121 };
122 
133 
137  uint32_t checksum;
138 
142  int32_t sample_length;
143 
147  int64_t writer_index = -1;
148 
152  uint64_t timestamp = 0;
153 
158 
162  DataPacketShared(const char *const ds_id, const int32_t check,
163  const int vector_length, const void_allocator &void_alloc)
164  : data_stream_identifier(ds_id, void_alloc),
165  checksum(check),
166  sample_length(vector_length),
167  sample(0, META_DATA_LENGTH, vector_length, void_alloc) {}
168 };
169 
173 struct DataSample {
177  int32_t topic_id = 0;
178 
183 
187  std::vector<uint8_t> meta_data;
188 
192  int32_t sample_id;
193 
197  std::vector<uint8_t> data;
198 
202  DataSample(const int32_t id, const int md_length, const int vector_length)
203  : meta_data_length(md_length),
204  meta_data(md_length),
205  sample_id(id),
206  data(vector_length) {}
207 };
208 
212 struct DataPacket {
217 
221  uint32_t checksum;
222 
226  int32_t sample_length;
227 
231  int64_t writer_index = -1;
232 
236  uint64_t timestamp = 0;
237 
242 
246  DataPacket(const char *const ds_id, const int32_t check,
247  const int vector_length)
248  : data_stream_identifier(ds_id),
249  checksum(check),
250  sample_length(vector_length),
251  sample(0, META_DATA_LENGTH, vector_length) {}
252 };
253 
258  public:
263 
267  explicit DdtMemoryAccessor(const std::string &shm_id,
268  const std::string &data_stream_identifier,
269  DdtLogger *logger, const uint64_t time_window = 0,
270  const int32_t reading_interval = 10);
271 
275  virtual ~DdtMemoryAccessor();
276 
280  const uint32_t ComputeChecksum(DataSampleShared *const data_sample_shared);
281 
285  const uint32_t ComputeChecksum(DataSample *const data_sample);
286 
290  int32_t OpenSharedMemory();
291 
295  void CloseSharedMemory();
296 
300  void WriteData(const int32_t writer_index, const int32_t topic_id,
301  const int32_t sample_id, const uint8_t *datavec,
302  const int32_t datavec_size, const uint8_t *metadata_vec,
303  const int32_t metadatavec_size, const uint64_t timestamp);
304 
308  void StartReading();
309 
313  void StopReading();
314 
320  void SetSizeConstraints(const int32_t max_sample_size, const int32_t space);
321 
325  void Reattach();
326 
332  void NewData();
333 
337  void get_data_packet(std::string *stream_identifier, uint32_t *checksum,
338  int32_t *sample_length, int64_t *writer_idx,
339  uint64_t *timestamp, DataSample **sample);
340 
344  bool get_data_available();
345 
350 
355 
360  void Reset();
361 
365  void set_pub_unreg(const bool STATE);
366 
370  void set_compute_checksum(const bool compute_crc);
371 
375  bool get_compute_checksum() const;
376 
380  bool get_is_initialized() const;
381 
382  private:
383  signal_t data_available_signal;
384 
388  void ReadData();
389 
393  void SetDefaults();
394 
403  void Init(const std::string &mem_id, const std::string &stream_id,
404  DdtLogger *ddt_logger, const uint64_t time_win,
405  const int32_t interval);
406 
410  void PrintData();
411 
416  int32_t CreateNewShm();
417 
422  int32_t SearchCircBuffer();
423 
428  int32_t SearchWriterIndex();
429 
430  ip::managed_shared_memory *managed_shm;
431 
436  typedef ip::allocator<DataPacketShared,
437  ip::managed_shared_memory::segment_manager>
438  cb_alloc;
439 
443  typedef boost::circular_buffer<DataPacketShared, cb_alloc> cb;
444 
445  std::string shm_id;
446  std::string data_stream_identifier;
447  uint64_t time_window; // in [ms]
448  int32_t reading_interval; // in [ms]
449  cb *circ_buffer;
450  std::atomic<int64_t> *writer_index;
451  int32_t local_index;
452  int64_t reader_index;
453 
454  int32_t number_of_unread_elements;
455  int32_t circ_buf_capacity;
456  int32_t number_of_lost_packages;
457 
458  std::mutex circ_buffer_mutex;
459  std::mutex packets_mutex;
460 
461  bool is_initialized;
462 
463  std::promise<void> exit_signal;
464  std::future<void> future_object;
465 
466  std::atomic<bool> reading_active;
467  std::atomic<bool> pub_unreg;
468  std::atomic<bool> compute_checksum;
469 
470  int32_t max_data_sample_size;
471  int additional_space;
472 
473  std::list<DataPacketShared *> packets;
474 
475  DdtLogger *logger;
476 };
477 
478 } // namespace ddt
479 
480 #endif /* DDTMEMORYACCESSOR_H_ */
ddt::DataPacketShared::writer_index
int64_t writer_index
Definition: ddtMemoryAccessor.hpp:147
ddt::DataSample::topic_id
int32_t topic_id
Definition: ddtMemoryAccessor.hpp:177
ddt::DdtMemoryAccessor::ComputeChecksum
const uint32_t ComputeChecksum(DataSampleShared *const data_sample_shared)
Definition: ddtMemoryAccessor.cpp:84
signal_t
boost::signals2::signal< void()> signal_t
Definition: ddtMemoryAccessor.hpp:79
ddt::DataPacket::sample
DataSample sample
Definition: ddtMemoryAccessor.hpp:241
ddt::META_DATA_LENGTH
const int META_DATA_LENGTH
Definition: ddtConstants.hpp:65
ddt::DdtMemoryAccessor::NewData
void NewData()
Definition: ddtMemoryAccessor.cpp:349
ddt::DataPacket::DataPacket
DataPacket(const char *const ds_id, const int32_t check, const int vector_length)
Definition: ddtMemoryAccessor.hpp:246
ddt::DataPacketShared
Definition: ddtMemoryAccessor.hpp:128
ddt::DataSample::meta_data_length
int32_t meta_data_length
Definition: ddtMemoryAccessor.hpp:182
ddt::DataSampleShared::DataSampleShared
DataSampleShared(const int32_t id, const int md_length, const int vector_length, const void_allocator &void_alloc)
Definition: ddtMemoryAccessor.hpp:115
ddt::DataPacket::sample_length
int32_t sample_length
Definition: ddtMemoryAccessor.hpp:226
ddt::DataSample::DataSample
DataSample(const int32_t id, const int md_length, const int vector_length)
Definition: ddtMemoryAccessor.hpp:202
ddt::DdtMemoryAccessor::Reattach
void Reattach()
Definition: ddtMemoryAccessor.cpp:341
ddt::DdtMemoryAccessor::get_data_packet
void get_data_packet(std::string *stream_identifier, uint32_t *checksum, int32_t *sample_length, int64_t *writer_idx, uint64_t *timestamp, DataSample **sample)
Definition: ddtMemoryAccessor.cpp:413
ddt::DataSampleShared
Definition: ddtMemoryAccessor.hpp:86
ddt::DdtLogger
Definition: ddtLogger.hpp:48
ddt::DdtMemoryAccessor
Definition: ddtMemoryAccessor.hpp:257
ddt::DdtMemoryAccessor::OpenSharedMemory
int32_t OpenSharedMemory()
Definition: ddtMemoryAccessor.cpp:121
ddt::DdtMemoryAccessor::StopReading
void StopReading()
Definition: ddtMemoryAccessor.cpp:321
ddt::DataSample::sample_id
int32_t sample_id
Definition: ddtMemoryAccessor.hpp:192
ddtConstants.hpp
ddt
Definition: ddtClient.hpp:36
ddt::DataPacketShared::sample_length
int32_t sample_length
Definition: ddtMemoryAccessor.hpp:142
ddt::DataSampleShared::meta_data_length
int32_t meta_data_length
Definition: ddtMemoryAccessor.hpp:95
ddt::DdtMemoryAccessor::get_compute_checksum
bool get_compute_checksum() const
Definition: ddtMemoryAccessor.cpp:527
ddt::DataPacket::timestamp
uint64_t timestamp
Definition: ddtMemoryAccessor.hpp:236
char_allocator
ip::allocator< char, segment_manager_t > char_allocator
Definition: ddtMemoryAccessor.hpp:68
ddt::DdtMemoryAccessor::SetSizeConstraints
void SetSizeConstraints(const int32_t max_sample_size, const int32_t space)
Definition: ddtMemoryAccessor.cpp:335
ddt::DdtMemoryAccessor::WriteData
void WriteData(const int32_t writer_index, const int32_t topic_id, const int32_t sample_id, const uint8_t *datavec, const int32_t datavec_size, const uint8_t *metadata_vec, const int32_t metadatavec_size, const uint64_t timestamp)
Definition: ddtMemoryAccessor.cpp:187
ddt::DdtMemoryAccessor::~DdtMemoryAccessor
virtual ~DdtMemoryAccessor()
Definition: ddtMemoryAccessor.cpp:32
ddt::DdtMemoryAccessor::get_data_available
bool get_data_available()
Definition: ddtMemoryAccessor.cpp:398
ddt::DdtMemoryAccessor::CloseSharedMemory
void CloseSharedMemory()
Definition: ddtMemoryAccessor.cpp:77
ddt::DdtMemoryAccessor::StartReading
void StartReading()
Definition: ddtMemoryAccessor.cpp:306
ddt::DataPacketShared::DataPacketShared
DataPacketShared(const char *const ds_id, const int32_t check, const int vector_length, const void_allocator &void_alloc)
Definition: ddtMemoryAccessor.hpp:162
ddt::DataSample::meta_data
std::vector< uint8_t > meta_data
Definition: ddtMemoryAccessor.hpp:187
void_allocator
ip::allocator< void, segment_manager_t > void_allocator
Definition: ddtMemoryAccessor.hpp:43
ddt::DataSample::data
std::vector< uint8_t > data
Definition: ddtMemoryAccessor.hpp:197
ddt::DataPacketShared::data_stream_identifier
char_string data_stream_identifier
Definition: ddtMemoryAccessor.hpp:132
ddt::DataSample
Definition: ddtMemoryAccessor.hpp:173
ddt::DataPacket::data_stream_identifier
std::string data_stream_identifier
Definition: ddtMemoryAccessor.hpp:216
ddt::DataPacket
Definition: ddtMemoryAccessor.hpp:212
char_string
ip::basic_string< char, std::char_traits< char >, char_allocator > char_string
Definition: ddtMemoryAccessor.hpp:74
ddt::DataSampleShared::sample_id
int32_t sample_id
Definition: ddtMemoryAccessor.hpp:105
ddt::DdtMemoryAccessor::set_compute_checksum
void set_compute_checksum(const bool compute_crc)
Definition: ddtMemoryAccessor.cpp:514
ddt::DataPacket::writer_index
int64_t writer_index
Definition: ddtMemoryAccessor.hpp:231
ddt::DataSampleShared::topic_id
int32_t topic_id
Definition: ddtMemoryAccessor.hpp:90
ddt::DdtMemoryAccessor::Reset
void Reset()
Definition: ddtMemoryAccessor.cpp:499
ddt::DataSampleShared::meta_data
uint8_vector meta_data
Definition: ddtMemoryAccessor.hpp:100
ddt::DdtMemoryAccessor::get_number_of_unread_elements
int32_t get_number_of_unread_elements()
Definition: ddtMemoryAccessor.cpp:408
uint16_allocator
ip::allocator< uint16_t, segment_manager_t > uint16_allocator
Definition: ddtMemoryAccessor.hpp:58
uint16_vector
ip::vector< uint16_t, uint16_allocator > uint16_vector
Definition: ddtMemoryAccessor.hpp:63
uint8_allocator
ip::allocator< uint8_t, segment_manager_t > uint8_allocator
Definition: ddtMemoryAccessor.hpp:48
ddt::DataSampleShared::data
uint8_vector data
Definition: ddtMemoryAccessor.hpp:110
ddtLogger.hpp
ddt::DdtMemoryAccessor::set_pub_unreg
void set_pub_unreg(const bool STATE)
Definition: ddtMemoryAccessor.cpp:512
ddt::DdtMemoryAccessor::get_is_initialized
bool get_is_initialized() const
Definition: ddtMemoryAccessor.cpp:531
ddt::DataPacket::checksum
uint32_t checksum
Definition: ddtMemoryAccessor.hpp:221
ddt::DataPacketShared::timestamp
uint64_t timestamp
Definition: ddtMemoryAccessor.hpp:152
ddt::DdtMemoryAccessor::DataAvailableSignal
signal_t * DataAvailableSignal()
Definition: ddtMemoryAccessor.cpp:495
ddt::DataPacketShared::checksum
uint32_t checksum
Definition: ddtMemoryAccessor.hpp:137
ddt::DdtMemoryAccessor::DdtMemoryAccessor
DdtMemoryAccessor()
Definition: ddtMemoryAccessor.cpp:21
segment_manager_t
ip::managed_shared_memory::segment_manager segment_manager_t
Definition: ddtMemoryAccessor.hpp:38
uint8_vector
ip::vector< uint8_t, uint8_allocator > uint8_vector
Definition: ddtMemoryAccessor.hpp:53
ddtCrc32.hpp
ddt::DataPacketShared::sample
DataSampleShared sample
Definition: ddtMemoryAccessor.hpp:157