RTC Toolkit  1.0.0
shmPub.hpp
Go to the documentation of this file.
1 
12 #ifndef RTCTK_STANDALONETOOLS_SHMPUB_H
13 #define RTCTK_STANDALONETOOLS_SHMPUB_H
14 
15 // arg parsing
16 #include <boost/program_options.hpp>
17 
18 // cfitsio
19 #include <cfitsio/fitsio.h>
20 
21 // include the numapp for threading
22 #include <numapp/mempolicy.hpp>
23 #include <numapp/numapolicies.hpp>
24 #include <numapp/thread.hpp>
25 
26 // include the ipcq for writer
27 #include <ipcq/writer.hpp>
28 
29 #include <iostream>
30 #include <vector>
31 #include <atomic>
32 #include <chrono>
33 #include <ctime>
34 #include <functional>
35 #include <numeric>
36 
43 static bool g_stop = false;
44 
51 void SignalHandler(int signal)
52 {
53  std::cout << std::endl << "Signal to exit received " << std::endl;
54  g_stop = true;
55 }
56 
57 
82 template<class TopicType, class WriterType = ipcq::Writer<TopicType>>
83 class ShmPub {
84  public:
85  ShmPub(int argc, char* argv[]) : m_help_only(false)
86  {
87  using namespace boost::program_options;
88 
89  try {
90  options_description desc("Allowed options");
91  desc.add_options()
92  ("help,h", "produce help message")
93  ("fits-file,f", value<std::string>(&m_filename)->default_value(""), "fits input file: if not provided the app will generate data")
94  ("queue-name,q", value<std::string>(&m_queue_name)->default_value("default_shm_queue"), "shm queue name")
95  ("queue-size,s", value<size_t>(&m_queue_size)->default_value(1000), "size of the queue")
96  ("sample-delay,d", value<int>(&m_sample_delay)->default_value(10), "inter-sample delay in ms")
97  ("numa-node,n", value<int>(&m_numa), "numa node for shm queue")
98  ("print-every,p", value<int>(&m_print_every)->default_value(0), "when to print to screen the number of sample written")
99  ("gen-frames,g",value<int>(&m_gen_frames)->default_value(100), "Number of frames to generate")
100  ("repeat-mode,r",bool_switch(&m_repeat_mode), "Repeat output when all samples are written");
101 
102  variables_map vm;
103  store(command_line_parser(argc, argv).options(desc).run(), vm);
104  notify(vm);
105 
106  if (vm.count("help")) {
107  m_help_only = true;
108  std::cout << desc << std::endl;
109  }else {
110  m_help_only = false;
111  std::cout << "fits-file: " << m_filename << std::endl;
112  std::cout << "queue-name: " << m_queue_name << std::endl;
113  std::cout << "queue-size: " << m_queue_size << std::endl;
114  std::cout << "sample-delay: " << m_sample_delay << std::endl;
115  if(vm.count("numa-node")) {
116  std::cout << "numa-node: " << m_numa << std::endl;
117  }
118  std::cout << "print-every: " << m_print_every << std::endl;
119  std::cout << "gen-frames: " << m_gen_frames << std::endl;
120  std::cout << "repeat-mode: " << m_repeat_mode << std::endl;
121 
122  if(vm.count("numa-node")) {
123  m_writer = std::make_unique<WriterType>(m_queue_name.c_str(),
124  m_queue_size,
125  numapp::MemPolicy::MakeBindNode(m_numa));
126  }else {
127  m_writer = std::make_unique<WriterType>(m_queue_name.c_str(),
128  m_queue_size);
129  }
130  }
131  } catch(const std::exception &e) {
132  std::cerr << "Exception:" << e.what()<< std::endl;
133  }
134  }
135 
136  virtual ~ShmPub() = default;
137 
148  int Run()
149  {
150  if(m_help_only) { return 0; }
151 
152  int ret_val = 0;
153 
154  try {
155 
156  signal(SIGINT,SignalHandler);
157 
158  std::vector<TopicType> data;
159 
160  // checks if filename has been indicated if it has loads data by calling the user
161  // overloaded function ReadFits if not provided calls the user overloaded function
162  // GenData
163  if(not m_filename.empty()) {
164  std::cout << "Reading data from FITS file: " << m_filename << std::endl;
165  data = ReadFits(m_filename);
166  }else {
167  std::cout << "Generating data" << std::endl;
168  data = GenData(m_gen_frames);
169  }
170 
171  // check to make sure m_data is populated
172  if(data.empty()) {
173  throw std::runtime_error("Data vector is not populated so will exit");
174  }
175 
176  // calls main loop
177  std::cout << "Writing data to shared memory queue" << std::endl;
178  WriteToShm(data);
179 
180  }catch(std::exception const& e) {
181 
182  std::cout << e.what() << std::endl;
183  ret_val = -1;
184  }
185 
186  // Close queue to signal and give readers time detach from queue
187  m_writer->Close();
188 #ifndef UNIT_TEST
189  std::this_thread::sleep_for(std::chrono::seconds(2));
190 #endif
191 
192  return ret_val;
193  }
194 
202  virtual std::vector<TopicType> ReadFits(std::string filename) = 0;
203 
211  virtual std::vector<TopicType> GenData(int num_frames) = 0;
212 
213  private:
224  void WriteToShm(std::vector<TopicType>& data)
225  {
226  using namespace std::chrono;
227 
228  size_t n_written = 0;
229  auto t_sent = system_clock::now();
230  auto t_last = t_sent;
231  do {
232  for(auto& sample : data) {
233  if(g_stop) { return; }
234  sample.sample_id = n_written;
235  t_sent = system_clock::now();
236  std::error_code err = m_writer->Write(sample, ipcq::Notify::All);
237  if(err) {
238  throw std::runtime_error("Error writing to shm: " + err.message());
239  }
240 
241  n_written++;
242  if(n_written && m_print_every && (n_written % m_print_every == 0)) {
243  auto dur = duration_cast<milliseconds>(t_sent - t_last).count();
244  std::cout << "Samples written: " << n_written << std::endl;
245  std::cout << "Total time to write " << m_print_every << " : " << dur << " ms" << std::endl;
246  std::cout << "Average frame time: " << (float)dur/m_print_every << " ms" << std::endl;
247  t_last = t_sent;
248  }
249 
250  std::this_thread::sleep_until(t_sent + milliseconds(m_sample_delay));
251  }
252 
253  }while(m_repeat_mode);
254  }
255 
256  std::string m_queue_name; //< Queue name to be used by the writer
257  size_t m_queue_size; //< number of position in shm queue
258  std::string m_filename; //< path to fits file being read
259  int m_sample_delay; //< delay between samples being writter (ms)
260  int m_numa; //< which numa node to provide writer
261 
262  int m_print_every; //< print status every N samples
263  int m_gen_frames; //< if generation data how many sample to be generated
264  bool m_repeat_mode; //< data will loop forever with an ever increasing sample_id
265  bool m_help_only; //< if help only will not enter writing loop
266 
267  std::unique_ptr<WriterType> m_writer; //< the ipcq writer
268 };
269 
284  template<class T>
285  std::vector<T> read_col_from_fits(fitsfile* fptr, char* name, long nrows, bool output=false)
286  {
287  int status = 0;
288  int col, typecode, anynul;
289  long repeat, width;
290  float nulval;
291 
292  fits_get_colnum(fptr, CASESEN, name, &col, &status);
293  if (status) {
294  fits_report_error(stderr, status);
295  throw std::runtime_error("Error getting column: " + std::string(name));
296  }
297 
298  fits_get_coltype(fptr, col, &typecode, &repeat, &width, &status);
299  if (status) {
300  fits_report_error(stderr, status);
301  throw std::runtime_error("Error getting coltype of:" + std::string(name));
302  }
303 
304  if(output) {
305  std::cout << "name: " << name << std::endl;
306  std::cout << "col: " << col << std::endl;
307  std::cout << "typecode: " << typecode << std::endl;
308  std::cout << "repeat: " << repeat << std::endl;
309  std::cout << "width: " << width << std::endl;
310  }
311 
312  // load in required data
313  std::vector<T> data;
314  data.resize(repeat*nrows); // we are assuming the vector to be matrix with row major.
315  T* d = data.data();
316  fits_read_col(fptr, typecode, col, 1, 1, repeat*nrows, &nulval, d, &anynul, &status);
317  if (status) {
318  fits_report_error(stderr, status);
319  throw std::runtime_error("Error reading column: " + std::string(name));
320  }
321  return data;
322  }
323 
324 }
325 #endif
rtctk::standaloneTools::ShmPub::ReadFits
virtual std::vector< TopicType > ReadFits(std::string filename)=0
Pure virtual function to be overloaded by child class.
wscript.name
name
Definition: wscript:15
rtctk::standaloneTools::ShmPub::Run
int Run()
Entry point for running the ShmPub.
Definition: shmPub.hpp:148
rtctk::standaloneTools::ShmPub
ShmPub parent class.
Definition: shmPub.hpp:83
rtctk::standaloneTools::ShmPub::ShmPub
ShmPub(int argc, char *argv[])
Definition: shmPub.hpp:85
rtctk::standaloneTools::read_col_from_fits
std::vector< T > read_col_from_fits(fitsfile *fptr, char *name, long nrows, bool output=false)
helper function for reading columns of fits table
Definition: shmPub.hpp:285
rtctk::standaloneTools::ShmPub::GenData
virtual std::vector< TopicType > GenData(int num_frames)=0
Pure virtual function to be overloaded by child class.
rtctk::standaloneTools::ShmPub::~ShmPub
virtual ~ShmPub()=default
rtctk::standaloneTools
Definition: shmPub.hpp:37
rtctk::standaloneTools::SignalHandler
void SignalHandler(int signal)
handles basic singals to allow simple exiting of ShmPub
Definition: shmPub.hpp:51
rtctkExampleDataTaskGenFitsData.filename
filename
Definition: rtctkExampleDataTaskGenFitsData.py:11