12 #ifndef RTCTK_STANDALONETOOLS_SHMPUB_H
13 #define RTCTK_STANDALONETOOLS_SHMPUB_H
16 #include <boost/program_options.hpp>
19 #include <cfitsio/fitsio.h>
22 #include <numapp/mempolicy.hpp>
23 #include <numapp/numapolicies.hpp>
24 #include <numapp/thread.hpp>
27 #include <ipcq/writer.hpp>
43 static bool g_stop =
false;
53 std::cout << std::endl <<
"Signal to exit received " << std::endl;
82 template<
class TopicType,
class WriterType = ipcq::Writer<TopicType>>
85 ShmPub(
int argc,
char* argv[]) : m_help_only(false)
87 using namespace boost::program_options;
90 options_description desc(
"Allowed options");
92 (
"help,h",
"produce help message")
93 (
"fits-file,f", value<std::string>(&m_filename)->default_value(
""),
"fits input file: if not provided the app will generate data")
94 (
"queue-name,q", value<std::string>(&m_queue_name)->default_value(
"default_shm_queue"),
"shm queue name")
95 (
"queue-size,s", value<size_t>(&m_queue_size)->default_value(1000),
"size of the queue")
96 (
"sample-delay,d", value<int>(&m_sample_delay)->default_value(10),
"inter-sample delay in ms")
97 (
"numa-node,n", value<int>(&m_numa),
"numa node for shm queue")
98 (
"print-every,p", value<int>(&m_print_every)->default_value(0),
"when to print to screen the number of sample written")
99 (
"gen-frames,g",value<int>(&m_gen_frames)->default_value(100),
"Number of frames to generate")
100 (
"repeat-mode,r",bool_switch(&m_repeat_mode),
"Repeat output when all samples are written");
103 store(command_line_parser(argc, argv).options(desc).run(), vm);
106 if (vm.count(
"help")) {
108 std::cout << desc << std::endl;
111 std::cout <<
"fits-file: " << m_filename << std::endl;
112 std::cout <<
"queue-name: " << m_queue_name << std::endl;
113 std::cout <<
"queue-size: " << m_queue_size << std::endl;
114 std::cout <<
"sample-delay: " << m_sample_delay << std::endl;
115 if(vm.count(
"numa-node")) {
116 std::cout <<
"numa-node: " << m_numa << std::endl;
118 std::cout <<
"print-every: " << m_print_every << std::endl;
119 std::cout <<
"gen-frames: " << m_gen_frames << std::endl;
120 std::cout <<
"repeat-mode: " << m_repeat_mode << std::endl;
122 if(vm.count(
"numa-node")) {
123 m_writer = std::make_unique<WriterType>(m_queue_name.c_str(),
125 numapp::MemPolicy::MakeBindNode(m_numa));
127 m_writer = std::make_unique<WriterType>(m_queue_name.c_str(),
131 }
catch(
const std::exception &e) {
132 std::cerr <<
"Exception:" << e.what()<< std::endl;
150 if(m_help_only) {
return 0; }
158 std::vector<TopicType> data;
163 if(not m_filename.empty()) {
164 std::cout <<
"Reading data from FITS file: " << m_filename << std::endl;
167 std::cout <<
"Generating data" << std::endl;
173 throw std::runtime_error(
"Data vector is not populated so will exit");
177 std::cout <<
"Writing data to shared memory queue" << std::endl;
180 }
catch(std::exception
const& e) {
182 std::cout << e.what() << std::endl;
189 std::this_thread::sleep_for(std::chrono::seconds(2));
211 virtual std::vector<TopicType>
GenData(
int num_frames) = 0;
224 void WriteToShm(std::vector<TopicType>& data)
226 using namespace std::chrono;
228 size_t n_written = 0;
229 auto t_sent = system_clock::now();
230 auto t_last = t_sent;
232 for(
auto& sample : data) {
233 if(g_stop) {
return; }
234 sample.sample_id = n_written;
235 t_sent = system_clock::now();
236 std::error_code err = m_writer->Write(sample, ipcq::Notify::All);
238 throw std::runtime_error(
"Error writing to shm: " + err.message());
242 if(n_written && m_print_every && (n_written % m_print_every == 0)) {
243 auto dur = duration_cast<milliseconds>(t_sent - t_last).count();
244 std::cout <<
"Samples written: " << n_written << std::endl;
245 std::cout <<
"Total time to write " << m_print_every <<
" : " << dur <<
" ms" << std::endl;
246 std::cout <<
"Average frame time: " << (float)dur/m_print_every <<
" ms" << std::endl;
250 std::this_thread::sleep_until(t_sent + milliseconds(m_sample_delay));
253 }
while(m_repeat_mode);
256 std::string m_queue_name;
258 std::string m_filename;
267 std::unique_ptr<WriterType> m_writer;
288 int col, typecode, anynul;
292 fits_get_colnum(fptr, CASESEN,
name, &col, &status);
294 fits_report_error(stderr, status);
295 throw std::runtime_error(
"Error getting column: " + std::string(
name));
298 fits_get_coltype(fptr, col, &typecode, &repeat, &width, &status);
300 fits_report_error(stderr, status);
301 throw std::runtime_error(
"Error getting coltype of:" + std::string(
name));
305 std::cout <<
"name: " <<
name << std::endl;
306 std::cout <<
"col: " << col << std::endl;
307 std::cout <<
"typecode: " << typecode << std::endl;
308 std::cout <<
"repeat: " << repeat << std::endl;
309 std::cout <<
"width: " << width << std::endl;
314 data.resize(repeat*nrows);
316 fits_read_col(fptr, typecode, col, 1, 1, repeat*nrows, &nulval, d, &anynul, &status);
318 fits_report_error(stderr, status);
319 throw std::runtime_error(
"Error reading column: " + std::string(
name));