00001 #ifndef _BULKDATA_RECEIVER_H
00002 #define _BULKDATA_RECEIVER_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include <vector>
00032
00033 #include "orbsvcs/AV/AVStreams_i.h"
00034 #include "orbsvcs/AV/Endpoint_Strategy.h"
00035 #include "orbsvcs/AV/Protocol_Factory.h"
00036 #include "orbsvcs/AV/Flows_T.h"
00037 #include "orbsvcs/AV/Transport.h"
00038 #include "orbsvcs/AV/Policy.h"
00039
00040 #include "ACSBulkDataError.h"
00041 #include "bulkDataFlowConsumer.h"
00042
00043 #include "bulkDataC.h"
00044
00048 namespace AcsBulkdata
00049 {
00068 template<class TReceiverCallback>
00069 class BulkDataReceiver
00070 {
00071 public:
00072
00076 BulkDataReceiver();
00077
00081 virtual ~BulkDataReceiver();
00082
00090 void initialize();
00091
00100 void createSingleFlow();
00101
00112 void createMultipleFlows(const char *fepsConfig);
00113
00121 bulkdata::BulkDataReceiverConfig * getReceiverConfig();
00122
00132 void getFlowCallback(ACE_CString &flowName, TReceiverCallback *&cb_p);
00133
00144 void getFlowCallback(CORBA::ULong flowNumber, TReceiverCallback *&cb_p);
00145
00153 void closeReceiver();
00154
00161 std::vector<std::string> getFlowNames();
00162
00170 void setReceiverName(ACE_CString recvName);
00171
00180 void subscribeNotification(ACS::CBvoid_ptr notifCb);
00181
00182
00183
00184 void notifySender(const ACSErr::Completion& comp);
00185
00186
00187
00188
00189 void addHandle(ACE_CString flowName, ACE_HANDLE handle)
00190 {
00191 handleMap_m.rebind(flowName,handle);
00192 }
00193
00194 void setCbTimeout(const char * cbTimeout);
00195
00196
00197 private:
00198
00199 typedef ACE_Hash_Map_Manager<ACE_CString, BulkDataFlowConsumer<TReceiverCallback> *, ACE_Null_Mutex> FepObjects;
00200 typedef ACE_Hash_Map_Iterator<ACE_CString, BulkDataFlowConsumer<TReceiverCallback> *, ACE_Null_Mutex> FepObjectsIterator;
00201
00202 typedef ACE_Hash_Map_Manager<ACE_CString, ACE_HANDLE, ACE_Null_Mutex> HandleMap;
00203 typedef ACE_Hash_Map_Iterator<ACE_CString, ACE_HANDLE, ACE_Null_Mutex> HandleMapIterator;
00204
00211 void initPartB();
00212
00219 AVStreams::StreamEndPoint_B_ptr createSepB();
00220
00230 AVStreams::FlowConsumer_ptr createFepConsumerB(ACE_CString &flowName, AVStreams::protocolSpec protocols, ACE_CString &format);
00231
00232
00241 void addFepToSep(AVStreams::StreamEndPoint_B_ptr locSepB_p,AVStreams::FlowConsumer_ptr locFepB_p);
00242
00249 AVStreams::StreamEndPoint_B_ptr getStreamEndPointB();
00250
00257 AVStreams::flowSpec * getFepsConfig();
00258
00265 void deleteFepsB();
00266
00267
00274 void deleteSepB();
00275
00276 void deleteAcceptor();
00277
00278 void deleteHandler();
00279
00280 void closeSocket();
00281
00290 const char * createFlowSpec(ACE_CString &flowName,
00291 ACE_CString &fepProtocol);
00292
00293
00294 public:
00298 bulkdata::Connection checkFlowCallbacks();
00299
00300 bulkdata::Connection getSenderConnectionState()
00301 {
00302 return recvConfig_p->connectionState;
00303 }
00304
00305 private:
00306
00307 FepObjects fepMap_m;
00308
00309 HandleMap handleMap_m;
00310
00314 TAO_AV_Endpoint_Reactive_Strategy_B <TAO_StreamEndPoint_B,TAO_VDev,AV_Null_MediaCtrl> reactiveStrategy_m;
00315
00316 AVStreams::StreamEndPoint_B_var sepB_p;
00317
00318 struct FepsCfgB
00319 {
00320 ACE_CString fepsFlowname;
00321 ACE_CString fepsFormat;
00322 ACE_CString fepsProtocol;
00323 };
00324
00325 AVStreams::flowSpec fepsData;
00326
00327
00328 bulkdata::BulkDataReceiverConfig * recvConfig_p;
00329
00330 TAO_StreamEndPoint_B *sepRefCount_p;
00331
00332 CORBA::Boolean closeReceiverFlag;
00333
00334 ACS::CBvoid_ptr locNotifCb_p;
00335
00336 ACE_Time_Value cbTimeout_m;
00337
00341
00342 };
00343 }
00344
00345 #include "bulkDataReceiver.i"
00346
00347 #endif