RTC Toolkit  2.0.0
shmSubscriber.hpp
Go to the documentation of this file.
1 
11 #ifndef RTCTK_STANDALONETOOLS_SHMSUBSCRIBER_H
12 #define RTCTK_STANDALONETOOLS_SHMSUBSCRIBER_H
13 
14 #include <memory>
15 #include <deque>
16 #include <string>
17 #include <chrono>
18 #include <iostream>
19 #include <iomanip>
20 #include <limits>
21 #include <cassert>
22 #include <ipcq/reader.hpp>
23 #include <ipcq/adapter.hpp>
24 #include <boost/io/ios_state.hpp>
25 
26 namespace rtctk::standaloneTools {
27 
34 public:
35 
36  ShmSubscriberBase() = default;
37  virtual ~ShmSubscriberBase() = default;
38  int Run(int argc, char *argv[]);
39 
40 protected:
41 
49  virtual void Initialise() = 0;
50 
56  virtual void Finalise() = 0;
57 
71  virtual bool ReadSample() = 0;
72 
79  virtual void PrintSample() = 0;
80 
84  virtual const void* GetSampleData() const = 0;
85 
89  virtual size_t GetSampleSize() const = 0;
90 
94  inline const std::string& GetQueueName() const {
95  return m_queue_name;
96  }
97 
102  inline const std::string& GetFilename() const {
103  return m_filename;
104  }
105 
109  inline const int64_t GetSampleNumber() const {
110  return m_sample_counter;
111  }
112 
117  inline const int64_t PrintWihtLongFormat() const {
118  return m_print_long;
119  }
120 
121 private:
122 
123  // Do not allow copying or moving of this object.
124  ShmSubscriberBase(const ShmSubscriberBase& rhs) = delete;
125  ShmSubscriberBase& operator=(const ShmSubscriberBase& rhs) = delete;
126  ShmSubscriberBase(ShmSubscriberBase&& rhs) = default;
127  ShmSubscriberBase& operator=(ShmSubscriberBase&& rhs) = default;
128 
129  bool ParseArguments(int argc, char *argv[]);
130  void WriteBufferToFile(const void* buffer, size_t size);
131  bool TerminateProcess();
132 
133  std::string m_queue_name;
134  std::string m_filename;
135  int64_t m_max_samples;
136  int64_t m_skip_samples;
137  bool m_print_samples;
138  bool m_print_long;
139  int64_t m_sample_counter;
140 };
141 
151 template<typename Topic,
152  class ConditionPolicy = ipcq::BoostConditionPolicy,
153  class ShmTraits = ipcq::detail::BoostInterprocessTraits>
155 public:
156  ShmSubscriber() = default;
157  virtual ~ShmSubscriber() = default;
158 
159 protected:
160 
171  virtual void PrintSample(const Topic& sample) {
172  boost::io::ios_flags_saver saved_state(std::cout);
173  std::cout << "Sample " << GetSampleNumber() << ":" << std::endl;
174  auto buffer = reinterpret_cast<const uint8_t*>(&sample);
175  size_t max_bytes_to_print = 64;
176  if (PrintWihtLongFormat()) {
177  max_bytes_to_print = std::numeric_limits<size_t>::max();
178  }
179  bool last_was_endl = false;
180  for (size_t n = 0; n < sizeof(Topic) and n < max_bytes_to_print; ++n) {
181  std::cout << "0x" << std::setfill('0') << std::setw(2) << std::right << std::noshowbase
182  << std::hex << (unsigned int)(buffer[n]);
183  if ((n+1) % 16 == 0) {
184  std::cout << std::endl;
185  last_was_endl = true;
186  } else {
187  std::cout << " ";
188  last_was_endl = false;
189  }
190  }
191  if (not last_was_endl) {
192  std::cout << std::endl;
193  }
194  if (sizeof(Topic) > max_bytes_to_print) {
195  std::cout << "... (data continues) ..." << std::endl;
196  }
197  }
198 
199 private:
200 
201  using Reader = ipcq::BasicReader<Topic, ConditionPolicy, ShmTraits>;
202 
203  // Do not allow copying or moving of this object.
204  ShmSubscriber(const ShmSubscriber& rhs) = delete;
205  ShmSubscriber& operator=(const ShmSubscriber& rhs) = delete;
206  ShmSubscriber(ShmSubscriber&& rhs) = default;
207  ShmSubscriber& operator=(ShmSubscriber&& rhs) = default;
208 
209 
213  void Initialise() override {
214  try {
215  m_reader = std::make_unique<Reader>(GetQueueName().c_str());
216  } catch (const std::exception& error) {
217  std::string msg = "Failed to create the shared memory reader for queue '"
218  + GetQueueName() + "': " + error.what();
219  throw std::runtime_error(msg);
220  }
221  }
222 
226  void Finalise() override {
227  try {
228  m_reader.reset(nullptr);
229  } catch (const std::exception& error) {
230  std::string msg = "Failed to destroy the shared memory reader for queue '"
231  + GetQueueName() + "': " + error.what();
232  throw std::runtime_error(msg);
233  }
234  m_samples.clear();
235  }
236 
244  bool ReadSample() override {
245  if (not m_samples.empty()) {
246  m_samples.pop_front();
247  }
248  if (not m_samples.empty()) {
249  return true;
250  }
251  using namespace std::chrono_literals;
252  auto count = m_reader->NumAvailable();
253  auto [error, num_elements] = m_reader->Read(ipcq::BackInserter(m_samples), count, 100ms);
254  if (error) {
255  if (error == ipcq::make_error_code(ipcq::Error::Timeout)) {
256  return false;
257  } else if (error == ipcq::make_error_code(ipcq::Error::InconsistentState)) {
258  // Use case for this tool is not conserned about missed samples. So when we get
259  // InconsistentState because of e.g. late joining we simply reset.
260  // Reset may fail if queue is Closed or if it is empty (nothing to reset to). We
261  // ignore that as well. Eventually there will be data in the queue and Reset() will
262  // succeed, or queue will be closed and Reset won't be attempted again.
263  if (!m_reader->Reset()) {
264  // Only log in the successful case as attempts to Reset will possibly otherwise
265  // flood the console with attempts if theres no data in the queue.
266  std::cerr << "Note: SHM reader state reset.\n";
267  }
268  return false;
269  } else {
270  std::string msg = "Failed to read from shared memory: " + error.message();
271  throw std::runtime_error(msg);
272  }
273  }
274  return num_elements > 0;
275  }
276 
280  void PrintSample() override {
281  assert(not m_samples.empty());
282  PrintSample(m_samples.front());
283  }
284 
288  const void* GetSampleData() const override {
289  assert(not m_samples.empty());
290  return reinterpret_cast<const void*>(&m_samples.front());
291  }
292 
296  size_t GetSampleSize() const override {
297  return sizeof(Topic);
298  }
299 
300  std::deque<Topic> m_samples;
301  std::unique_ptr<Reader> m_reader;
302 };
303 
304 } // namespace rtctk::standaloneTools
305 
306 #endif // RTCTK_STANDALONETOOLS_SHMSUBSCRIBER_H
rtctk::standaloneTools::ShmSubscriberBase::Initialise
virtual void Initialise()=0
Should perform any needed initialisation steps for the program.
rtctk::standaloneTools::ShmSubscriberBase::GetQueueName
const std::string & GetQueueName() const
Definition: shmSubscriber.hpp:94
rtctk::standaloneTools::ShmSubscriber::ShmSubscriber
ShmSubscriber()=default
rtctk::standaloneTools::ShmSubscriber::~ShmSubscriber
virtual ~ShmSubscriber()=default
rtctk::standaloneTools::ShmSubscriberBase::Run
int Run(int argc, char *argv[])
Executes the shared memory subscriber program.
Definition: shmSubscriber.cpp:46
rtctk::standaloneTools::ShmSubscriberBase::GetSampleData
virtual const void * GetSampleData() const =0
rtctk::standaloneTools::ShmSubscriberBase::GetSampleSize
virtual size_t GetSampleSize() const =0
rtctk::standaloneTools::ShmSubscriberBase::ShmSubscriberBase
ShmSubscriberBase()=default
rtctk::standaloneTools::ShmSubscriber
Implements basic features for a simple shared memory subscriber program.
Definition: shmSubscriber.hpp:154
rtctk::standaloneTools::ShmSubscriberBase::GetSampleNumber
const int64_t GetSampleNumber() const
Definition: shmSubscriber.hpp:109
rtctk::standaloneTools::ShmSubscriberBase::GetFilename
const std::string & GetFilename() const
Definition: shmSubscriber.hpp:102
rtctk::standaloneTools::ShmSubscriberBase::~ShmSubscriberBase
virtual ~ShmSubscriberBase()=default
rtctk::standaloneTools::ShmSubscriberBase::PrintWihtLongFormat
const int64_t PrintWihtLongFormat() const
Definition: shmSubscriber.hpp:117
rtctk::standaloneTools::ShmSubscriber::PrintSample
virtual void PrintSample(const Topic &sample)
Prints a hex dump of the sample.
Definition: shmSubscriber.hpp:171
rtctk::telRepub::make_error_code
std::error_code make_error_code(MudpiProcessingError e)
Create std::error_code from ProcessingError.
Definition: mudpiProcessingError.hpp:113
rtctk::standaloneTools
Definition: rtctkGenDdsPublisher.h:18
rtctk::standaloneTools::ShmSubscriberBase::ReadSample
virtual bool ReadSample()=0
Should read a sample into internal buffers from the shared memory.
rtctk::standaloneTools::ShmSubscriberBase::Finalise
virtual void Finalise()=0
Must cleanup any objects created in Initialise.
rtctk::standaloneTools::ShmSubscriberBase::PrintSample
virtual void PrintSample()=0
Should print the contents of the read sample to console in a human readable format.
rtctk::standaloneTools::ShmSubscriberBase
Base class to implement all non-template methods that can be pre-compiled for the ShmSubscriber.
Definition: shmSubscriber.hpp:33