00001 #ifndef _BULKDATA_SENDER_H
00002 #define _BULKDATA_SENDER_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include <iostream>
00032
00033 #include "orbsvcs/AV/AVStreams_i.h"
00034 #include "orbsvcs/AV/Endpoint_Strategy.h"
00035 #include "orbsvcs/AV/Protocol_Factory.h"
00036 #include "orbsvcs/AV/Flows_T.h"
00037 #include "orbsvcs/AV/Transport.h"
00038 #include "orbsvcs/AV/Policy.h"
00039
00040 #include "ACSBulkDataError.h"
00041 #include "bulkDataSenderDefaultCb.h"
00042 #include "bulkDataFlowProducer.h"
00043
00044 #include "bulkDataC.h"
00045
00046
00047
00048
00049
00053 namespace AcsBulkdata
00054 {
00074 template<class TSenderCallback>
00075 class BulkDataSender
00076 {
00077 public:
00078
00082 BulkDataSender();
00083
00087 virtual ~BulkDataSender();
00088
00096 void initialize();
00097
00106 void createSingleFlow();
00107
00118 void createMultipleFlows(const char *fepsConfig);
00119
00128 void connectToPeer(bulkdata::BulkDataReceiverConfig *recvConfig_p);
00129
00140 void getFlowProtocol(ACE_CString &flowname, TAO_AV_Protocol_Object *¤tProtocol_p);
00141
00152 void startSend(CORBA::ULong flownumber, ACE_Message_Block *param = 0);
00153
00164 void startSend(CORBA::ULong flownumber, const char *param, size_t len);
00165
00176 void sendData(CORBA::ULong flownumber, ACE_Message_Block *buffer);
00177
00190 void sendData(CORBA::ULong flownumber, const char *buffer, size_t len);
00191
00202 void stopSend(CORBA::ULong flownumber);
00203
00211 void disconnectPeer();
00212
00213 TAO_StreamCtrl * getStreamCtrl()
00214 {
00215 return streamctrl_p;
00216 }
00217
00218 const char *getFlowSpec(const ACE_CString & flowName);
00219
00226 std::vector<std::string> getFlowNames();
00227
00228
00229
00230
00231 void startStream(CORBA::ULong flownumber);
00232
00233 void sendStream(CORBA::ULong flownumber, ACE_Message_Block *buffer);
00234
00235 void stopStream(CORBA::ULong flownumber);
00236
00237
00238
00239
00240 private:
00241
00242 typedef ACE_Hash_Map_Manager<ACE_CString, BulkDataFlowProducer<TSenderCallback> *, ACE_Null_Mutex> FepObjects;
00243 typedef ACE_Hash_Map_Iterator<ACE_CString, BulkDataFlowProducer<TSenderCallback> *, ACE_Null_Mutex> FepObjectsIterator;
00244
00245 typedef ACE_Hash_Map_Manager<ACE_CString, ACE_HANDLE, ACE_Null_Mutex> HandleMap;
00246 typedef ACE_Hash_Map_Iterator<ACE_CString, ACE_HANDLE, ACE_Null_Mutex> HandleMapIterator;
00247
00254 void initPartA();
00255
00262 AVStreams::StreamEndPoint_A_ptr createSepA();
00263
00273 AVStreams::FlowProducer_ptr createFepProducerA(ACE_CString &flowname,
00274 AVStreams::protocolSpec protocols,
00275 ACE_CString &format,
00276 TAO_StreamCtrl *strctrl_p);
00277
00278
00287 void addFepToSep(AVStreams::StreamEndPoint_A_ptr locSepA_p, AVStreams::FlowProducer_ptr locFepA_p);
00288
00289
00296 TAO_StreamCtrl *createStreamCtrl();
00297
00298
00305
00306
00307
00321 const char * createFwdFlowSpec(ACE_CString &flowname,
00322 ACE_CString &direction,
00323 ACE_CString &formatName,
00324 ACE_CString &flowProtocol,
00325 ACE_CString &carrierProtocol,
00326 ACE_CString &localAddress,
00327 ACE_CString &remoteAddress);
00328
00329
00337 void setReceiverConfig(bulkdata::BulkDataReceiverConfig *recvConfig_p);
00338
00339
00340
00347 AVStreams::StreamEndPoint_A_ptr getStreamEndPointA();
00348
00349
00356 void deleteStreamCtrl();
00357
00358
00359
00366 void deleteFepsA();
00367
00374 void deleteSepA();
00375
00376 void deleteConnector();
00377
00378 void deleteHandler();
00379
00388 const char * createFlowSpec(ACE_CString &flowname,
00389 ACE_CString &fepProtocol);
00390
00391 void mergeFlowSpecs();
00392
00396 TAO_AV_Endpoint_Reactive_Strategy_A<TAO_StreamEndPoint_A,TAO_VDev,AV_Null_MediaCtrl> endpointStrategy_m;
00397
00398 AVStreams::StreamEndPoint_A_var sepA_p;
00399
00400 AVStreams::StreamEndPoint_B_var sepB_p;
00401
00402
00403
00404 struct FepsCfgA
00405 {
00406 ACE_CString fepsFlowname;
00407 ACE_CString fepsFormat;
00408 ACE_CString fepsProtocol;
00409 };
00410
00411 FepObjects fepMap_m;
00412
00413 HandleMap handleMap_m;
00414
00415 AVStreams::flowSpec_var recvFeps_p;
00416
00417 AVStreams::flowSpec senderFeps_m;
00418
00419 TAO_StreamEndPoint_A * sepRefCount_p;
00420
00421 CORBA::Boolean disconnectPeerFlag;
00422
00423 AVStreams::flowSpec flowSpec_m;
00424
00425 TAO_StreamCtrl *streamctrl_p;
00426
00430
00431
00432
00433
00434
00435
00436
00437
00438
00439 };
00440 }
00441
00442 #include "bulkDataSender.i"
00443
00444 #endif