Go to the documentation of this file.
11 #ifndef RTCTK_STANDALONETOOLS_SHMSUBSCRIBER_H
12 #define RTCTK_STANDALONETOOLS_SHMSUBSCRIBER_H
22 #include <ipcq/reader.hpp>
23 #include <ipcq/adapter.hpp>
24 #include <boost/io/ios_state.hpp>
38 int Run(
int argc,
char *argv[]);
110 return m_sample_counter;
129 bool ParseArguments(
int argc,
char *argv[]);
130 void WriteBufferToFile(
const void* buffer,
size_t size);
131 bool TerminateProcess();
133 std::string m_queue_name;
134 std::string m_filename;
135 int64_t m_max_samples;
136 int64_t m_skip_samples;
137 bool m_print_samples;
139 int64_t m_sample_counter;
151 template<
typename Topic,
152 class ConditionPolicy = ipcq::BoostConditionPolicy,
153 class ShmTraits = ipcq::detail::BoostInterprocessTraits>
172 boost::io::ios_flags_saver saved_state(std::cout);
174 auto buffer =
reinterpret_cast<const uint8_t*
>(&sample);
175 size_t max_bytes_to_print = 64;
177 max_bytes_to_print = std::numeric_limits<size_t>::max();
179 bool last_was_endl =
false;
180 for (
size_t n = 0; n <
sizeof(Topic) and n < max_bytes_to_print; ++n) {
181 std::cout <<
"0x" << std::setfill(
'0') << std::setw(2) << std::right << std::noshowbase
182 << std::hex << (
unsigned int)(buffer[n]);
183 if ((n+1) % 16 == 0) {
184 std::cout << std::endl;
185 last_was_endl =
true;
188 last_was_endl =
false;
191 if (not last_was_endl) {
192 std::cout << std::endl;
194 if (
sizeof(Topic) > max_bytes_to_print) {
195 std::cout <<
"... (data continues) ..." << std::endl;
201 using Reader = ipcq::BasicReader<Topic, ConditionPolicy, ShmTraits>;
213 void Initialise()
override {
215 m_reader = std::make_unique<Reader>(
GetQueueName().c_str());
216 }
catch (
const std::exception& error) {
217 std::string msg =
"Failed to create the shared memory reader for queue '"
219 throw std::runtime_error(msg);
226 void Finalise()
override {
228 m_reader.reset(
nullptr);
229 }
catch (
const std::exception& error) {
230 std::string msg =
"Failed to destroy the shared memory reader for queue '"
232 throw std::runtime_error(msg);
244 bool ReadSample()
override {
245 if (not m_samples.empty()) {
246 m_samples.pop_front();
248 if (not m_samples.empty()) {
251 using namespace std::chrono_literals;
252 auto count = m_reader->NumAvailable();
253 auto [error, num_elements] = m_reader->Read(ipcq::BackInserter(m_samples), count, 100ms);
263 if (!m_reader->Reset()) {
266 std::cerr <<
"Note: SHM reader state reset.\n";
270 std::string msg =
"Failed to read from shared memory: " + error.message();
271 throw std::runtime_error(msg);
274 return num_elements > 0;
281 assert(not m_samples.empty());
288 const void* GetSampleData()
const override {
289 assert(not m_samples.empty());
290 return reinterpret_cast<const void*
>(&m_samples.front());
296 size_t GetSampleSize()
const override {
297 return sizeof(Topic);
300 std::deque<Topic> m_samples;
301 std::unique_ptr<Reader> m_reader;
306 #endif // RTCTK_STANDALONETOOLS_SHMSUBSCRIBER_H
std::error_code make_error_code(MudpiProcessingError e)
Create std::error_code from ProcessingError.
Definition: mudpiProcessingError.hpp:113