00001 #ifndef _DDS_SUBSCRIBER_H
00002 #define _DDS_SUBSCRIBER_H
00003
00004 #include <DDSHelper.h>
00005 #include <acsddsncDataReaderListener.h>
00006 #include <dds/DCPS/SubscriberImpl.h>
00007
00008 namespace ddsnc{
00009
00016 class DDSSubscriber : public ddsnc::DDSHelper{
00017 private:
00018 DDS::Subscriber_var sub;
00019 OpenDDS::DCPS::SubscriberImpl *sub_impl;
00020 DDS::DataReaderListener_var *listener;
00021
00022 int attachToTransport();
00023
00029 int createSubscriber();
00030
00031 protected:
00032 DDS::SubscriberQos subQos;
00033
00034 public:
00035 DDS::DataReaderQos drQos;
00050 DDSSubscriber(const char *channelName) :
00051 ddsnc::DDSHelper(channelName)
00052 {
00053 }
00054
00055 ~DDSSubscriber()
00056 {
00057 delete listener;
00058 }
00059
00066 void consumerReady();
00067
00084 template <class DRV, class DR, class D>
00085 void addSubscription(
00086 void (*templateFunction)(D, void *), void *handlerParam=0)
00087 {
00088 std::cerr << "DDSSubscriber::addSubscription" << std::endl;
00089
00090 listener = new DDS::DataReaderListener_var
00091 (new ddsnc::ACSDDSNCDataReaderListener
00092 <DRV,DR,D>(templateFunction, handlerParam));
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102 }
00103
00104
00124 template <class D, class TSV, class TSI>
00125 void initialize()
00126 {
00127 std::cerr<< "DDSSubscriber::initialize()" << std::endl;
00128 createParticipant();
00129 if (CORBA::is_nil (participant.in()))
00130 std::cerr << "Participant is nil" << std::endl;
00131
00132 if(partitionName!=NULL){
00133 participant->get_default_subscriber_qos(subQos);
00134 subQos.partition.name.length(1);
00135 subQos.partition.name[0]=CORBA::string_dup(partitionName);
00136 }
00137 initializeTransport();
00138
00139 createSubscriber();
00140
00141
00142 TSV ts;
00143 ts = new TSI();
00144 if (DDS::RETCODE_OK != ts->register_type(participant.in(),"")){
00145 std::cerr << "register_type failed" << std::endl;
00146 }
00147
00148
00149 initializeTopic(ts->get_type_name());
00150 if(CORBA::is_nil(topic.in()))
00151 std::cerr<< "Topic is nil" << std::endl;
00152
00153 sub->get_default_datareader_qos (drQos);
00154
00155 drQos.reliability.kind = ::DDS::RELIABLE_RELIABILITY_QOS;
00156 drQos.reliability.max_blocking_time.sec = 1;
00157
00158 drQos.history.kind = ::DDS::KEEP_LAST_HISTORY_QOS;
00159 drQos.history.depth = 100;
00160 }
00161 };
00162 }
00163
00174 #define ACS_NEW_DDS_SUBSCRIBER(subscriber_p, idlStruct, channelName, handlerFunc, handlerParam) \
00175 { \
00176 subscriber_p= new ddsnc::DDSSubscriber(channelName); \
00177 subscriber_p->initialize<idlStruct, idlStruct##TypeSupport_var, idlStruct##TypeSupportImpl>(); \
00178 subscriber_p->addSubscription<idlStruct##DataReader_var, idlStruct##DataReader, idlStruct>(handlerFunc, handlerParam); \
00179 }
00180
00181 #endif