RTC Toolkit  2.0.0
shmPub.hpp
Go to the documentation of this file.
1 
12 #ifndef RTCTK_STANDALONETOOLS_SHMPUB_H
13 #define RTCTK_STANDALONETOOLS_SHMPUB_H
14 
15 // arg parsing
16 #include <boost/program_options.hpp>
17 
18 // cfitsio
19 #include <cfitsio/fitsio.h>
20 
21 // include the numapp for threading
22 #include <numapp/mempolicy.hpp>
23 #include <numapp/numapolicies.hpp>
24 #include <numapp/thread.hpp>
25 
26 // include the ipcq for writer
27 #include <ipcq/writer.hpp>
28 
29 #include <iostream>
30 #include <vector>
31 #include <atomic>
32 #include <chrono>
33 #include <ctime>
34 #include <functional>
35 #include <numeric>
36 
37 namespace rtctk::standaloneTools {
43 static bool g_stop = false;
44 
50 void SignalHandler(int signal)
51 {
52  std::cout << std::endl << "Signal to exit received " << std::endl;
53  g_stop = true;
54 }
55 
56 
81 template<class TopicType, class WriterType = ipcq::Writer<TopicType>>
82 class ShmPub {
83  public:
84  ShmPub(int argc, char* argv[]) : m_help_only(false)
85  {
86  using namespace boost::program_options;
87 
88  try {
89  options_description desc("Allowed options");
90  desc.add_options()
91  ("help,h", "produce help message")
92  ("fits-file,f", value<std::string>(&m_filename)->default_value(""), "fits input file: if not provided the app will generate data")
93  ("queue-name,q", value<std::string>(&m_queue_name)->default_value("default_shm_queue"), "shm queue name")
94  ("queue-size,s", value<size_t>(&m_queue_size)->default_value(1000), "size of the queue")
95  ("sample-delay,d", value<int>(&m_sample_delay)->default_value(10), "inter-sample delay in ms")
96  ("numa-node,n", value<int>(&m_numa), "numa node for shm queue")
97  ("print-every,p", value<int>(&m_print_every)->default_value(0), "when to print to screen the number of sample written")
98  ("gen-frames,g",value<int>(&m_gen_frames)->default_value(100), "Number of frames to generate")
99  ("repeat-mode,r",bool_switch(&m_repeat_mode), "Repeat output when all samples are written");
100 
101  variables_map vm;
102  store(command_line_parser(argc, argv).options(desc).run(), vm);
103  notify(vm);
104 
105  if (vm.count("help")) {
106  m_help_only = true;
107  std::cout << desc << std::endl;
108  }else {
109  m_help_only = false;
110  std::cout << "fits-file: " << m_filename << std::endl;
111  std::cout << "queue-name: " << m_queue_name << std::endl;
112  std::cout << "queue-size: " << m_queue_size << std::endl;
113  std::cout << "sample-delay: " << m_sample_delay << std::endl;
114  if(vm.count("numa-node")) {
115  std::cout << "numa-node: " << m_numa << std::endl;
116  }
117  std::cout << "print-every: " << m_print_every << std::endl;
118  std::cout << "gen-frames: " << m_gen_frames << std::endl;
119  std::cout << "repeat-mode: " << m_repeat_mode << std::endl;
120 
121  if(vm.count("numa-node")) {
122  m_writer = std::make_unique<WriterType>(m_queue_name.c_str(),
123  m_queue_size,
124  numapp::MemPolicy::MakeBindNode(m_numa));
125  }else {
126  m_writer = std::make_unique<WriterType>(m_queue_name.c_str(),
127  m_queue_size);
128  }
129  }
130  } catch(const std::exception &e) {
131  std::cerr << "Exception:" << e.what()<< std::endl;
132  }
133  }
134 
135  virtual ~ShmPub() = default;
136 
147  int Run()
148  {
149  if(m_help_only) { return 0; }
150 
151  int ret_val = 0;
152 
153  try {
154 
155  signal(SIGINT,SignalHandler);
156 
157  std::vector<TopicType> data;
158 
159  // checks if filename has been indicated if it has loads data by calling the user
160  // overloaded function ReadFits if not provided calls the user overloaded function
161  // GenData
162  if(not m_filename.empty()) {
163  std::cout << "Reading data from FITS file: " << m_filename << std::endl;
164  data = ReadFits(m_filename);
165  }else {
166  std::cout << "Generating data" << std::endl;
167  data = GenData(m_gen_frames);
168  }
169 
170  // check to make sure m_data is populated
171  if(data.empty()) {
172  throw std::runtime_error("Data vector is not populated so will exit");
173  }
174 
175  // calls main loop
176  std::cout << "Writing data to shared memory queue" << std::endl;
177  WriteToShm(data);
178 
179  }catch(std::exception const& e) {
180 
181  std::cout << e.what() << std::endl;
182  ret_val = -1;
183  }
184 
185  // Close queue to signal and give readers time detach from queue
186  m_writer->Close();
187 #ifndef UNIT_TEST
188  std::this_thread::sleep_for(std::chrono::seconds(2));
189 #endif
190 
191  return ret_val;
192  }
193 
208  virtual std::vector<TopicType> ReadFits(std::string filename) = 0;
209 
224  virtual std::vector<TopicType> GenData(int num_frames) = 0;
225 
226  private:
238  void WriteToShm(std::vector<TopicType>& data)
239  {
240  using namespace std::chrono;
241 
242  size_t n_written = 0;
243  auto t_sent = system_clock::now();
244  auto t_last = t_sent;
245  do {
246  for(auto& sample : data) {
247  if(g_stop) { return; }
248  sample.sample_id = n_written;
249  t_sent = system_clock::now();
250  std::error_code err = m_writer->Write(sample, ipcq::Notify::All);
251  if(err) {
252  throw std::runtime_error("Error writing to shm: " + err.message());
253  }
254 
255  n_written++;
256  if(n_written && m_print_every && (n_written % m_print_every == 0)) {
257  auto dur = duration_cast<milliseconds>(t_sent - t_last).count();
258  std::cout << "Samples written: " << n_written << std::endl;
259  std::cout << "Total time to write " << m_print_every << " : " << dur << " ms" << std::endl;
260  std::cout << "Average frame time: " << (float)dur/m_print_every << " ms" << std::endl;
261  t_last = t_sent;
262  }
263 
264  std::this_thread::sleep_until(t_sent + milliseconds(m_sample_delay));
265  }
266 
267  }while(m_repeat_mode);
268  }
269 
270  std::string m_queue_name; //< Queue name to be used by the writer
271  size_t m_queue_size; //< number of position in shm queue
272  std::string m_filename; //< path to fits file being read
273  int m_sample_delay; //< delay between samples being writter (ms)
274  int m_numa; //< which numa node to provide writer
275 
276  int m_print_every; //< print status every N samples
277  int m_gen_frames; //< if generation data how many sample to be generated
278  bool m_repeat_mode; //< data will loop forever with an ever increasing sample_id
279  bool m_help_only; //< if help only will not enter writing loop
280 
281  std::unique_ptr<WriterType> m_writer; //< the ipcq writer
282 };
283 
298  template<class T>
299  std::vector<T> read_col_from_fits(fitsfile* fptr, char* name, long nrows, bool output=false)
300  {
301  int status = 0;
302  int col, typecode, anynul;
303  long repeat, width;
304  float nulval;
305 
306  fits_get_colnum(fptr, CASESEN, name, &col, &status);
307  if (status) {
308  fits_report_error(stderr, status);
309  throw std::runtime_error("Error getting column: " + std::string(name));
310  }
311 
312  fits_get_coltype(fptr, col, &typecode, &repeat, &width, &status);
313  if (status) {
314  fits_report_error(stderr, status);
315  throw std::runtime_error("Error getting coltype of:" + std::string(name));
316  }
317 
318  if(output) {
319  std::cout << "name: " << name << std::endl;
320  std::cout << "col: " << col << std::endl;
321  std::cout << "typecode: " << typecode << std::endl;
322  std::cout << "repeat: " << repeat << std::endl;
323  std::cout << "width: " << width << std::endl;
324  }
325 
326  // load in required data
327  std::vector<T> data;
328  data.resize(repeat*nrows); // we are assuming the vector to be matrix with row major.
329  T* d = data.data();
330  fits_read_col(fptr, typecode, col, 1, 1, repeat*nrows, &nulval, d, &anynul, &status);
331  if (status) {
332  fits_report_error(stderr, status);
333  throw std::runtime_error("Error reading column: " + std::string(name));
334  }
335  return data;
336  }
337 
338 }
339 #endif
rtctk::standaloneTools::ShmPub::ReadFits
virtual std::vector< TopicType > ReadFits(std::string filename)=0
Reads in data from a FITS file.
wscript.name
name
Definition: wscript:15
rtctk::standaloneTools::ShmPub::Run
int Run()
Entry point for running the ShmPub.
Definition: shmPub.hpp:147
rtctk::standaloneTools::ShmPub
ShmPub parent class.
Definition: shmPub.hpp:82
rtctk::standaloneTools::ShmPub::ShmPub
ShmPub(int argc, char *argv[])
Definition: shmPub.hpp:84
rtctk::standaloneTools::read_col_from_fits
std::vector< T > read_col_from_fits(fitsfile *fptr, char *name, long nrows, bool output=false)
helper function for reading columns of fits table
Definition: shmPub.hpp:299
rtctk::standaloneTools::ShmPub::GenData
virtual std::vector< TopicType > GenData(int num_frames)=0
Generates data to be circulated.
rtctk::standaloneTools::ShmPub::~ShmPub
virtual ~ShmPub()=default
rtctk::standaloneTools
Definition: rtctkGenDdsPublisher.h:18
rtctk::standaloneTools::SignalHandler
void SignalHandler(int signal)
Handles basic signals to allow simple exiting from a ShmPub process.
Definition: shmPub.hpp:50
rtctkExampleDataTaskGenFitsData.filename
filename
Definition: rtctkExampleDataTaskGenFitsData.py:11