Go to the documentation of this file.
17 #ifndef DDTMEMORYACCESSOR_H_
18 #define DDTMEMORYACCESSOR_H_
20 #include <boost/circular_buffer.hpp>
21 #include <boost/interprocess/containers/string.hpp>
22 #include <boost/interprocess/containers/vector.hpp>
23 #include <boost/interprocess/managed_shared_memory.hpp>
24 #include <boost/signals2/signal.hpp>
33 namespace ip = boost::interprocess;
73 typedef ip::basic_string<char, std::char_traits<char>,
char_allocator>
79 typedef boost::signals2::signal<void()>
signal_t;
118 meta_data(md_length, uint8_t(), void_alloc),
120 data(vector_length, uint8_t(), void_alloc) {}
202 DataSample(
const int32_t
id,
const int md_length,
const int vector_length)
206 data(vector_length) {}
247 const int vector_length)
268 const std::string &data_stream_identifier,
269 DdtLogger *logger,
const uint64_t time_window = 0,
270 const int32_t reading_interval = 10);
300 void WriteData(
const int32_t writer_index,
const int32_t topic_id,
301 const int32_t sample_id,
const uint8_t *datavec,
302 const int32_t datavec_size,
const uint8_t *metadata_vec,
303 const int32_t metadatavec_size,
const uint64_t timestamp);
337 void get_data_packet(std::string *stream_identifier, uint32_t *checksum,
338 int32_t *sample_length, int64_t *writer_idx,
403 void Init(
const std::string &mem_id,
const std::string &stream_id,
404 DdtLogger *ddt_logger,
const uint64_t time_win,
405 const int32_t interval);
416 int32_t CreateNewShm();
422 int32_t SearchCircBuffer();
428 int32_t SearchWriterIndex();
430 ip::managed_shared_memory *managed_shm;
437 ip::managed_shared_memory::segment_manager>
443 typedef boost::circular_buffer<DataPacketShared, cb_alloc> cb;
446 std::string data_stream_identifier;
447 uint64_t time_window;
448 int32_t reading_interval;
450 std::atomic<int64_t> *writer_index;
452 int64_t reader_index;
454 int32_t number_of_unread_elements;
455 int32_t circ_buf_capacity;
456 int32_t number_of_lost_packages;
458 std::mutex circ_buffer_mutex;
459 std::mutex packets_mutex;
463 std::promise<void> exit_signal;
464 std::future<void> future_object;
466 std::atomic<bool> reading_active;
467 std::atomic<bool> pub_unreg;
468 std::atomic<bool> compute_checksum;
470 int32_t max_data_sample_size;
471 int additional_space;
473 std::list<DataPacketShared *> packets;
int64_t writer_index
Definition: ddtMemoryAccessor.hpp:147
int32_t topic_id
Definition: ddtMemoryAccessor.hpp:177
const uint32_t ComputeChecksum(DataSampleShared *const data_sample_shared)
Definition: ddtMemoryAccessor.cpp:84
boost::signals2::signal< void()> signal_t
Definition: ddtMemoryAccessor.hpp:79
DataSample sample
Definition: ddtMemoryAccessor.hpp:241
const int META_DATA_LENGTH
Definition: ddtConstants.hpp:65
void NewData()
Definition: ddtMemoryAccessor.cpp:349
DataPacket(const char *const ds_id, const int32_t check, const int vector_length)
Definition: ddtMemoryAccessor.hpp:246
Definition: ddtMemoryAccessor.hpp:128
int32_t meta_data_length
Definition: ddtMemoryAccessor.hpp:182
DataSampleShared(const int32_t id, const int md_length, const int vector_length, const void_allocator &void_alloc)
Definition: ddtMemoryAccessor.hpp:115
int32_t sample_length
Definition: ddtMemoryAccessor.hpp:226
DataSample(const int32_t id, const int md_length, const int vector_length)
Definition: ddtMemoryAccessor.hpp:202
void Reattach()
Definition: ddtMemoryAccessor.cpp:341
void get_data_packet(std::string *stream_identifier, uint32_t *checksum, int32_t *sample_length, int64_t *writer_idx, uint64_t *timestamp, DataSample **sample)
Definition: ddtMemoryAccessor.cpp:413
Definition: ddtMemoryAccessor.hpp:86
Definition: ddtLogger.hpp:48
Definition: ddtMemoryAccessor.hpp:257
int32_t OpenSharedMemory()
Definition: ddtMemoryAccessor.cpp:121
void StopReading()
Definition: ddtMemoryAccessor.cpp:321
int32_t sample_id
Definition: ddtMemoryAccessor.hpp:192
Definition: ddtClient.hpp:36
int32_t sample_length
Definition: ddtMemoryAccessor.hpp:142
int32_t meta_data_length
Definition: ddtMemoryAccessor.hpp:95
bool get_compute_checksum() const
Definition: ddtMemoryAccessor.cpp:527
uint64_t timestamp
Definition: ddtMemoryAccessor.hpp:236
ip::allocator< char, segment_manager_t > char_allocator
Definition: ddtMemoryAccessor.hpp:68
void SetSizeConstraints(const int32_t max_sample_size, const int32_t space)
Definition: ddtMemoryAccessor.cpp:335
void WriteData(const int32_t writer_index, const int32_t topic_id, const int32_t sample_id, const uint8_t *datavec, const int32_t datavec_size, const uint8_t *metadata_vec, const int32_t metadatavec_size, const uint64_t timestamp)
Definition: ddtMemoryAccessor.cpp:187
virtual ~DdtMemoryAccessor()
Definition: ddtMemoryAccessor.cpp:32
bool get_data_available()
Definition: ddtMemoryAccessor.cpp:398
void CloseSharedMemory()
Definition: ddtMemoryAccessor.cpp:77
void StartReading()
Definition: ddtMemoryAccessor.cpp:306
DataPacketShared(const char *const ds_id, const int32_t check, const int vector_length, const void_allocator &void_alloc)
Definition: ddtMemoryAccessor.hpp:162
std::vector< uint8_t > meta_data
Definition: ddtMemoryAccessor.hpp:187
ip::allocator< void, segment_manager_t > void_allocator
Definition: ddtMemoryAccessor.hpp:43
std::vector< uint8_t > data
Definition: ddtMemoryAccessor.hpp:197
char_string data_stream_identifier
Definition: ddtMemoryAccessor.hpp:132
Definition: ddtMemoryAccessor.hpp:173
std::string data_stream_identifier
Definition: ddtMemoryAccessor.hpp:216
Definition: ddtMemoryAccessor.hpp:212
ip::basic_string< char, std::char_traits< char >, char_allocator > char_string
Definition: ddtMemoryAccessor.hpp:74
int32_t sample_id
Definition: ddtMemoryAccessor.hpp:105
void set_compute_checksum(const bool compute_crc)
Definition: ddtMemoryAccessor.cpp:514
int64_t writer_index
Definition: ddtMemoryAccessor.hpp:231
int32_t topic_id
Definition: ddtMemoryAccessor.hpp:90
void Reset()
Definition: ddtMemoryAccessor.cpp:499
uint8_vector meta_data
Definition: ddtMemoryAccessor.hpp:100
int32_t get_number_of_unread_elements()
Definition: ddtMemoryAccessor.cpp:408
ip::allocator< uint16_t, segment_manager_t > uint16_allocator
Definition: ddtMemoryAccessor.hpp:58
ip::vector< uint16_t, uint16_allocator > uint16_vector
Definition: ddtMemoryAccessor.hpp:63
ip::allocator< uint8_t, segment_manager_t > uint8_allocator
Definition: ddtMemoryAccessor.hpp:48
uint8_vector data
Definition: ddtMemoryAccessor.hpp:110
void set_pub_unreg(const bool STATE)
Definition: ddtMemoryAccessor.cpp:512
bool get_is_initialized() const
Definition: ddtMemoryAccessor.cpp:531
uint32_t checksum
Definition: ddtMemoryAccessor.hpp:221
uint64_t timestamp
Definition: ddtMemoryAccessor.hpp:152
signal_t * DataAvailableSignal()
Definition: ddtMemoryAccessor.cpp:495
uint32_t checksum
Definition: ddtMemoryAccessor.hpp:137
DdtMemoryAccessor()
Definition: ddtMemoryAccessor.cpp:21
ip::managed_shared_memory::segment_manager segment_manager_t
Definition: ddtMemoryAccessor.hpp:38
ip::vector< uint8_t, uint8_allocator > uint8_vector
Definition: ddtMemoryAccessor.hpp:53
DataSampleShared sample
Definition: ddtMemoryAccessor.hpp:157