00001 #ifndef _BULKDATA_DISTRIBUTER_H
00002 #define _BULKDATA_DISTRIBUTER_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #include "bulkDataSender.h"
00033 #include "bulkDataReceiver.h"
00034
00035 #include <Pair_T.h>
00036
00037 #include <acsQoS.h>
00038
00042 namespace AcsBulkdata
00043 {
00062
00063 template<class TReceiverCallback, class TSenderCallback>
00064 class BulkDataDistributerNotifCb;
00065
00066 template<class TReceiverCallback, class TSenderCallback>
00067 class BulkDataDistributer
00068 {
00069
00070 enum Flow_Status
00071 {
00072 FLOW_AVAILABLE,
00073 FLOW_NOT_AVAILABLE
00074 };
00075
00076 typedef ACE_Pair< bulkdata::BulkDataReceiver_ptr, BulkDataSender<TSenderCallback> *> Sender_Map_Pair;
00077
00078
00079
00080
00081
00082 typedef ACE_Hash_Map_Manager <ACE_CString, Sender_Map_Pair, ACE_Null_Mutex> Sender_Map;
00083 typedef ACE_Hash_Map_Entry <ACE_CString, Sender_Map_Pair > Sender_Map_Entry;
00084 typedef ACE_Hash_Map_Iterator <ACE_CString, Sender_Map_Pair ,ACE_Null_Mutex> Sender_Map_Iterator;
00085
00086 typedef ACE_Hash_Map_Manager <CORBA::ULong, Flow_Status, ACE_Null_Mutex> Flows_Status_Map;
00087 typedef ACE_Hash_Map_Entry <CORBA::ULong, Flow_Status> Flows_Status_Map_Entry;
00088 typedef ACE_Hash_Map_Iterator <CORBA::ULong, Flow_Status, ACE_Null_Mutex> Flows_Status_Map_Iterator;
00089
00090 typedef ACE_Hash_Map_Manager <ACE_CString, CORBA::ULong, ACE_Null_Mutex> Recv_Status_Map;
00091 typedef ACE_Hash_Map_Entry <ACE_CString, CORBA::ULong> Recv_Status_Map_Entry;
00092 typedef ACE_Hash_Map_Iterator <ACE_CString, CORBA::ULong, ACE_Null_Mutex> Recv_Status_Map_Iterator;
00093
00094
00095
00096 public:
00097
00101 BulkDataDistributer();
00102
00106 virtual ~BulkDataDistributer();
00107
00111 virtual void multiConnect(bulkdata::BulkDataReceiverConfig *recvConfig_p, const char *fepsConfig, const ACE_CString& receiverName);
00112
00116 virtual void multiDisconnect(const ACE_CString& receiverName);
00117
00118 virtual BulkDataReceiver<TReceiverCallback> *getReceiver()
00119 {
00120 return &receiver_m;
00121 }
00122
00123 virtual Sender_Map *getSenderMap()
00124 {
00125 return &senderMap_m;
00126 }
00127
00128 virtual bool isRecvConnected (const ACE_CString& receiverName);
00129
00130 virtual bool isSenderConnected (const ACE_CString& receiverName);
00131
00132 virtual bool isReceiverConnected (const ACE_CString& receiverName);
00133
00134 virtual void distSendStart (ACE_CString& flowName, CORBA::ULong flowNumber);
00135
00136 virtual int distSendDataHsk (ACE_CString& flowName, ACE_Message_Block * frame_p, CORBA::ULong flowNumber);
00137
00138 virtual int distSendData (ACE_CString& flowName, ACE_Message_Block * frame_p, CORBA::ULong flowNumber);
00139
00140 virtual CORBA::Boolean distSendStopTimeout (ACE_CString& flowName, CORBA::ULong flowNumber);
00141
00142 virtual void distSendStop (ACE_CString& flowName, CORBA::ULong flowNumber);
00143
00144 void setTimeout (CORBA::ULong user_timeout)
00145 { timeout_m = user_timeout; }
00146
00147 void setContSvc (maci::ContainerServices * services_p)
00148 { contSvc_p = services_p; }
00149
00153 void subscribeNotification(ACS::CBvoid_ptr notifCb);
00154
00158 void notifySender(const ACSErr::Completion& comp);
00159
00160 bulkdata::Connection getSenderConnectionState()
00161 {
00162 return getReceiver()->getSenderConnectionState();
00163 }
00164
00165 private:
00166
00167 CORBA::Boolean getFlowReceiverStatus(const ACE_CString& receiverName, CORBA::ULong flowNumber);
00168
00169 CORBA::Boolean isFlowReceiverAvailable(const ACE_CString& receiverName, CORBA::ULong flowNumber);
00170
00171 BulkDataSender<TSenderCallback> *sender_p;
00172
00173 BulkDataReceiver<TReceiverCallback> receiver_m;
00174
00175 Sender_Map senderMap_m;
00176
00177 Recv_Status_Map recvStatusMap_m;
00178 Flows_Status_Map flowsStatusMap_m;
00179
00180 CORBA::ULong timeout_m;
00181 CORBA::ULong numberOfFlows;
00182 CORBA::ULong offset;
00183
00184 maci::ContainerServices *contSvc_p;
00185
00186 BulkDataDistributerNotifCb<TReceiverCallback, TSenderCallback> *distributerNotifCb_p;
00187
00188 ACS::CBvoid_ptr locNotifCb_p;
00189 };
00190
00191
00192
00193 template<class TReceiverCallback, class TSenderCallback = BulkDataSenderDefaultCallback>
00194 class BulkDataDistributerNotifCb: public virtual POA_ACS::CBvoid
00195 {
00196
00197 public:
00198
00199 BulkDataDistributerNotifCb(BulkDataDistributer<TReceiverCallback, TSenderCallback> *distr)
00200 {
00201 ACS_TRACE("BulkDataDistributerNotifCb<>::BulkDataDistributerNotifCb");
00202
00203 distr_p = distr;
00204 }
00205
00206 ~BulkDataDistributerNotifCb()
00207 {
00208 ACS_TRACE("BulkDataDistributerNotifCb<>::~BulkDataDistributerNotifCb");
00209 }
00210
00211 void working(const Completion &comp, const ACS::CBDescOut &desc)
00212 {
00213 }
00214
00215 void done(const Completion &comp, const ACS::CBDescOut &desc)
00216 {
00217 try
00218 {
00219 distr_p->notifySender(comp);
00220 }
00221 catch(ACSErr::ACSbaseExImpl &ex)
00222 {
00223 ACS_SHORT_LOG((LM_ERROR,"BulkDataDistributerNotifCb::done error"));
00224 ex.log();
00225 }
00226 catch(...)
00227 {
00228 ACS_SHORT_LOG((LM_ERROR,"BulkDataDistributerNotifCb::done unknown error"));
00229 }
00230 }
00231
00232 CORBA::Boolean negotiate (ACS::TimeInterval timeToTransmit, const ACS::CBDescOut &desc)
00233 {
00234 return true;
00235 }
00236
00237 private:
00238
00239 BulkDataDistributer<TReceiverCallback, TSenderCallback> *distr_p;
00240 };
00241
00242
00243 }
00244
00245
00246 #include "bulkDataDistributer.i"
00247
00248 #endif