From 1078126a38ad7b7b3c4af52f6aba00c54916f8d2 Mon Sep 17 00:00:00 2001 From: csoler Date: Sun, 2 Nov 2014 09:23:37 +0000 Subject: [PATCH] added tempering system for GXS, so as to not send more in the outqueues than what the current bandwidth can handle. git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@7654 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/gxs/rsgxsnetservice.cc | 145 +++++++++++++++++++---- 1 file changed, 124 insertions(+), 21 deletions(-) diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index 38c35c5a5..ea3a531da 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -25,9 +25,11 @@ */ #include +#include #include #include "rsgxsnetservice.h" +#include "retroshare/rsconfig.h" #include "retroshare/rsgxsflags.h" #include "retroshare/rsgxscircles.h" #include "pgp/pgpauxutils.h" @@ -38,8 +40,8 @@ #define GIXS_CUT_OFF 0 -#define SYNC_PERIOD 60 // every 60 seconds (1 second for testing) -#define TRANSAC_TIMEOUT 30 // 30 seconds. Has been increased to avoid epidemic transaction cancelling. +#define SYNC_PERIOD 60 // every 60 seconds (12 seconds for testing) +#define TRANSAC_TIMEOUT 30 // 30 seconds. Has been increased to avoid epidemic transaction cancelling due to overloaded outqueues. const uint32_t RsGxsNetService::FRAGMENT_SIZE = 150000; @@ -79,7 +81,7 @@ int RsGxsNetService::tick() if((elapsed) < now) { - syncWithPeers(); + syncWithPeers(); mSyncTs = now; } @@ -92,6 +94,91 @@ int RsGxsNetService::tick() return 1; } +// This class collects outgoing items due to the broadcast of Nxs messages. It computes +// a probability that can be used to temper the broadcast of items so as to match the +// residual bandwidth (difference between max allowed bandwidth and current outgoing rate. + +class NxsBandwidthRecorder +{ +public: + static void recordEvent(uint16_t service_type, RsItem *item) + { + RsStackMutex m(mtx) ; + + uint32_t bw = RsNxsSerialiser(service_type).size(item) ; // this is used to estimate bandwidth. + timeval tv ; + gettimeofday(&tv,NULL) ; + + // compute time(NULL) in msecs, for a more accurate bw estimate. + + uint64_t now = tv.tv_sec * 1000 + tv.tv_usec/1000 ; + + total_record += bw ; + ++total_events ; + + std::cerr << "bandwidthRecorder::recordEvent() Recording event time=" << now << ". bw=" << bw << std::endl; + + // Every 20 seconds at min, compute a new estimate of the required bandwidth. + static const int BANDWIDTH_ESTIMATE_DELAY = 20 ; + + if(now > last_event_record + BANDWIDTH_ESTIMATE_DELAY*1000) + { + // Compute the bandwidth using recorded times, in msecs + float speed = total_record/1024.0f/(now - last_event_record)*1000.0f ; + + // Apply a small temporal convolution. + estimated_required_bandwidth = 0.75*estimated_required_bandwidth + 0.25 * speed ; + +#ifdef NXS_NET_DEBUG + std::cerr << std::dec << " " << total_record << " Bytes (" << total_events << " items)" + << " received in " << now - last_event_record << " seconds. Speed: " << speed << " KBytes/sec" << std::endl; + std::cerr << " instantaneous speed = " << speed << " KB/s" << std::endl; + std::cerr << " cumulated estimated = " << estimated_required_bandwidth << " KB/s" << std::endl; +#endif + + last_event_record = now ; + total_record = 0 ; + total_events = 0 ; + } + } + + // Estimate the probability of sending an item so that the expected bandwidth matches the residual bandwidth + + static float computeCurrentSendingProbability() + { + int maxIn,maxOut; + float currIn,currOut ; + + rsConfig->GetMaxDataRates(maxIn,maxOut) ; + rsConfig->GetCurrentDataRates(currIn,currOut) ; + + float accepted_bandwidth = std::max(0.0f,maxOut - currOut) ; + float sending_probability = std::min( accepted_bandwidth / estimated_required_bandwidth,1.0f ) ; + +#ifdef NXS_NET_DEBUG + std::cerr << "bandwidthRecorder::computeCurrentSendingProbability()" << std::endl; + std::cerr << " current required bandwidth: " << estimated_required_bandwidth << " KB/s" << std::endl; + std::cerr << " max out = " << maxOut << ", currOut=" << currOut << std::endl; + std::cerr << " computed probability: " << sending_probability << std::endl; +#endif + + return sending_probability ; + } + +private: + static RsMutex mtx; + static uint64_t last_event_record ; + static float estimated_required_bandwidth ; + static uint32_t total_events ; + static uint64_t total_record ; +}; + +uint32_t NxsBandwidthRecorder::total_events =0 ; // total number of events. Not used. +uint64_t NxsBandwidthRecorder::last_event_record = time(NULL) * 1000;// starting time of bw estimate period (in msec) +uint64_t NxsBandwidthRecorder::total_record =0 ; // total bytes recorded in the current time frame +float NxsBandwidthRecorder::estimated_required_bandwidth = 10.0f ;// Estimated BW for sending sync data. Set to 10KB/s, to avoid 0. +RsMutex NxsBandwidthRecorder::mtx("Bandwidth recorder") ; // Protects the recorder since bw events are collected from multiple GXS Net services + void RsGxsNetService::syncWithPeers() { #ifdef NXS_NET_DEBUG @@ -99,6 +186,8 @@ void RsGxsNetService::syncWithPeers() std::cerr << std::endl; #endif + static RsNxsSerialiser ser(mServType) ; // this is used to estimate bandwidth. + RsStackMutex stack(mNxsMutex); std::set peers; @@ -110,24 +199,27 @@ void RsGxsNetService::syncWithPeers() { // for now just grps for(; sit != peers.end(); ++sit) - { + { - const RsPeerId peerId = *sit; + const RsPeerId peerId = *sit; - ClientGrpMap::const_iterator cit = mClientGrpUpdateMap.find(peerId); - uint32_t updateTS = 0; - if(cit != mClientGrpUpdateMap.end()) - { - const RsGxsGrpUpdateItem *gui = cit->second; - updateTS = gui->grpUpdateTS; - } - RsNxsSyncGrp *grp = new RsNxsSyncGrp(mServType); - grp->clear(); - grp->PeerId(*sit); - grp->updateTS = updateTS; - sendItem(grp); - } - } + ClientGrpMap::const_iterator cit = mClientGrpUpdateMap.find(peerId); + uint32_t updateTS = 0; + if(cit != mClientGrpUpdateMap.end()) + { + const RsGxsGrpUpdateItem *gui = cit->second; + updateTS = gui->grpUpdateTS; + } + RsNxsSyncGrp *grp = new RsNxsSyncGrp(mServType); + grp->clear(); + grp->PeerId(*sit); + grp->updateTS = updateTS; + + NxsBandwidthRecorder::recordEvent(mServType,grp) ; + + sendItem(grp); + } + } #ifndef GXS_DISABLE_SYNC_MSGS @@ -156,6 +248,11 @@ void RsGxsNetService::syncWithPeers() sit = peers.begin(); + float sending_probability = NxsBandwidthRecorder::computeCurrentSendingProbability() ; +#ifdef NXS_NET_DEBUG + std::cerr << "syncWithPeers(): Sending probability = " << sending_probability << std::endl; +#endif + // synchronise group msg for groups which we're subscribed to for(; sit != peers.end(); ++sit) { @@ -199,8 +296,14 @@ void RsGxsNetService::syncWithPeers() msg->PeerId(peerId); msg->grpId = grpId; msg->updateTS = updateTS; - sendItem(msg); - } + + NxsBandwidthRecorder::recordEvent(mServType,msg) ; + + if(RSRandom::random_f32() < sending_probability) + sendItem(msg); + else + delete msg ; + } } GrpMetaMap::iterator mmit = toRequest.begin();