12 #ifndef RTCTK_STANDALONETOOLS_SHMPUB_H
13 #define RTCTK_STANDALONETOOLS_SHMPUB_H
16 #include <boost/program_options.hpp>
19 #include <cfitsio/fitsio.h>
22 #include <numapp/mempolicy.hpp>
23 #include <numapp/numapolicies.hpp>
24 #include <numapp/thread.hpp>
27 #include <ipcq/writer.hpp>
43 static bool g_stop =
false;
52 std::cout << std::endl <<
"Signal to exit received " << std::endl;
81 template<
class TopicType,
class WriterType = ipcq::Writer<TopicType>>
84 ShmPub(
int argc,
char* argv[]) : m_help_only(false)
86 using namespace boost::program_options;
89 options_description desc(
"Allowed options");
91 (
"help,h",
"produce help message")
92 (
"fits-file,f", value<std::string>(&m_filename)->default_value(
""),
"fits input file: if not provided the app will generate data")
93 (
"queue-name,q", value<std::string>(&m_queue_name)->default_value(
"default_shm_queue"),
"shm queue name")
94 (
"queue-size,s", value<size_t>(&m_queue_size)->default_value(1000),
"size of the queue")
95 (
"sample-delay,d", value<int>(&m_sample_delay)->default_value(10),
"inter-sample delay in ms")
96 (
"numa-node,n", value<int>(&m_numa),
"numa node for shm queue")
97 (
"print-every,p", value<int>(&m_print_every)->default_value(0),
"when to print to screen the number of sample written")
98 (
"gen-frames,g",value<int>(&m_gen_frames)->default_value(100),
"Number of frames to generate")
99 (
"repeat-mode,r",bool_switch(&m_repeat_mode),
"Repeat output when all samples are written");
102 store(command_line_parser(argc, argv).options(desc).run(), vm);
105 if (vm.count(
"help")) {
107 std::cout << desc << std::endl;
110 std::cout <<
"fits-file: " << m_filename << std::endl;
111 std::cout <<
"queue-name: " << m_queue_name << std::endl;
112 std::cout <<
"queue-size: " << m_queue_size << std::endl;
113 std::cout <<
"sample-delay: " << m_sample_delay << std::endl;
114 if(vm.count(
"numa-node")) {
115 std::cout <<
"numa-node: " << m_numa << std::endl;
117 std::cout <<
"print-every: " << m_print_every << std::endl;
118 std::cout <<
"gen-frames: " << m_gen_frames << std::endl;
119 std::cout <<
"repeat-mode: " << m_repeat_mode << std::endl;
121 if(vm.count(
"numa-node")) {
122 m_writer = std::make_unique<WriterType>(m_queue_name.c_str(),
124 numapp::MemPolicy::MakeBindNode(m_numa));
126 m_writer = std::make_unique<WriterType>(m_queue_name.c_str(),
130 }
catch(
const std::exception &e) {
131 std::cerr <<
"Exception:" << e.what()<< std::endl;
149 if(m_help_only) {
return 0; }
157 std::vector<TopicType> data;
162 if(not m_filename.empty()) {
163 std::cout <<
"Reading data from FITS file: " << m_filename << std::endl;
166 std::cout <<
"Generating data" << std::endl;
172 throw std::runtime_error(
"Data vector is not populated so will exit");
176 std::cout <<
"Writing data to shared memory queue" << std::endl;
179 }
catch(std::exception
const& e) {
181 std::cout << e.what() << std::endl;
188 std::this_thread::sleep_for(std::chrono::seconds(2));
224 virtual std::vector<TopicType>
GenData(
int num_frames) = 0;
238 void WriteToShm(std::vector<TopicType>& data)
240 using namespace std::chrono;
242 size_t n_written = 0;
243 auto t_sent = system_clock::now();
244 auto t_last = t_sent;
246 for(
auto& sample : data) {
247 if(g_stop) {
return; }
248 sample.sample_id = n_written;
249 t_sent = system_clock::now();
250 std::error_code err = m_writer->Write(sample, ipcq::Notify::All);
252 throw std::runtime_error(
"Error writing to shm: " + err.message());
256 if(n_written && m_print_every && (n_written % m_print_every == 0)) {
257 auto dur = duration_cast<milliseconds>(t_sent - t_last).count();
258 std::cout <<
"Samples written: " << n_written << std::endl;
259 std::cout <<
"Total time to write " << m_print_every <<
" : " << dur <<
" ms" << std::endl;
260 std::cout <<
"Average frame time: " << (float)dur/m_print_every <<
" ms" << std::endl;
264 std::this_thread::sleep_until(t_sent + milliseconds(m_sample_delay));
267 }
while(m_repeat_mode);
270 std::string m_queue_name;
272 std::string m_filename;
281 std::unique_ptr<WriterType> m_writer;
302 int col, typecode, anynul;
306 fits_get_colnum(fptr, CASESEN,
name, &col, &status);
308 fits_report_error(stderr, status);
309 throw std::runtime_error(
"Error getting column: " + std::string(
name));
312 fits_get_coltype(fptr, col, &typecode, &repeat, &width, &status);
314 fits_report_error(stderr, status);
315 throw std::runtime_error(
"Error getting coltype of:" + std::string(
name));
319 std::cout <<
"name: " <<
name << std::endl;
320 std::cout <<
"col: " << col << std::endl;
321 std::cout <<
"typecode: " << typecode << std::endl;
322 std::cout <<
"repeat: " << repeat << std::endl;
323 std::cout <<
"width: " << width << std::endl;
328 data.resize(repeat*nrows);
330 fits_read_col(fptr, typecode, col, 1, 1, repeat*nrows, &nulval, d, &anynul, &status);
332 fits_report_error(stderr, status);
333 throw std::runtime_error(
"Error reading column: " + std::string(
name));