mirror of
https://github.com/RetroShare/RetroShare.git
synced 2024-10-01 02:35:48 -04:00
added tempering system for GXS, so as to not send more in the outqueues than what the current bandwidth can handle.
git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@7654 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
parent
42b8ea6602
commit
1078126a38
@ -25,9 +25,11 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
#include <sys/time.h>
|
||||||
#include <math.h>
|
#include <math.h>
|
||||||
|
|
||||||
#include "rsgxsnetservice.h"
|
#include "rsgxsnetservice.h"
|
||||||
|
#include "retroshare/rsconfig.h"
|
||||||
#include "retroshare/rsgxsflags.h"
|
#include "retroshare/rsgxsflags.h"
|
||||||
#include "retroshare/rsgxscircles.h"
|
#include "retroshare/rsgxscircles.h"
|
||||||
#include "pgp/pgpauxutils.h"
|
#include "pgp/pgpauxutils.h"
|
||||||
@ -38,8 +40,8 @@
|
|||||||
|
|
||||||
#define GIXS_CUT_OFF 0
|
#define GIXS_CUT_OFF 0
|
||||||
|
|
||||||
#define SYNC_PERIOD 60 // every 60 seconds (1 second for testing)
|
#define SYNC_PERIOD 60 // every 60 seconds (12 seconds for testing)
|
||||||
#define TRANSAC_TIMEOUT 30 // 30 seconds. Has been increased to avoid epidemic transaction cancelling.
|
#define TRANSAC_TIMEOUT 30 // 30 seconds. Has been increased to avoid epidemic transaction cancelling due to overloaded outqueues.
|
||||||
|
|
||||||
const uint32_t RsGxsNetService::FRAGMENT_SIZE = 150000;
|
const uint32_t RsGxsNetService::FRAGMENT_SIZE = 150000;
|
||||||
|
|
||||||
@ -79,7 +81,7 @@ int RsGxsNetService::tick()
|
|||||||
|
|
||||||
if((elapsed) < now)
|
if((elapsed) < now)
|
||||||
{
|
{
|
||||||
syncWithPeers();
|
syncWithPeers();
|
||||||
mSyncTs = now;
|
mSyncTs = now;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,6 +94,91 @@ int RsGxsNetService::tick()
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This class collects outgoing items due to the broadcast of Nxs messages. It computes
|
||||||
|
// a probability that can be used to temper the broadcast of items so as to match the
|
||||||
|
// residual bandwidth (difference between max allowed bandwidth and current outgoing rate.
|
||||||
|
|
||||||
|
class NxsBandwidthRecorder
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
static void recordEvent(uint16_t service_type, RsItem *item)
|
||||||
|
{
|
||||||
|
RsStackMutex m(mtx) ;
|
||||||
|
|
||||||
|
uint32_t bw = RsNxsSerialiser(service_type).size(item) ; // this is used to estimate bandwidth.
|
||||||
|
timeval tv ;
|
||||||
|
gettimeofday(&tv,NULL) ;
|
||||||
|
|
||||||
|
// compute time(NULL) in msecs, for a more accurate bw estimate.
|
||||||
|
|
||||||
|
uint64_t now = tv.tv_sec * 1000 + tv.tv_usec/1000 ;
|
||||||
|
|
||||||
|
total_record += bw ;
|
||||||
|
++total_events ;
|
||||||
|
|
||||||
|
std::cerr << "bandwidthRecorder::recordEvent() Recording event time=" << now << ". bw=" << bw << std::endl;
|
||||||
|
|
||||||
|
// Every 20 seconds at min, compute a new estimate of the required bandwidth.
|
||||||
|
static const int BANDWIDTH_ESTIMATE_DELAY = 20 ;
|
||||||
|
|
||||||
|
if(now > last_event_record + BANDWIDTH_ESTIMATE_DELAY*1000)
|
||||||
|
{
|
||||||
|
// Compute the bandwidth using recorded times, in msecs
|
||||||
|
float speed = total_record/1024.0f/(now - last_event_record)*1000.0f ;
|
||||||
|
|
||||||
|
// Apply a small temporal convolution.
|
||||||
|
estimated_required_bandwidth = 0.75*estimated_required_bandwidth + 0.25 * speed ;
|
||||||
|
|
||||||
|
#ifdef NXS_NET_DEBUG
|
||||||
|
std::cerr << std::dec << " " << total_record << " Bytes (" << total_events << " items)"
|
||||||
|
<< " received in " << now - last_event_record << " seconds. Speed: " << speed << " KBytes/sec" << std::endl;
|
||||||
|
std::cerr << " instantaneous speed = " << speed << " KB/s" << std::endl;
|
||||||
|
std::cerr << " cumulated estimated = " << estimated_required_bandwidth << " KB/s" << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
last_event_record = now ;
|
||||||
|
total_record = 0 ;
|
||||||
|
total_events = 0 ;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Estimate the probability of sending an item so that the expected bandwidth matches the residual bandwidth
|
||||||
|
|
||||||
|
static float computeCurrentSendingProbability()
|
||||||
|
{
|
||||||
|
int maxIn,maxOut;
|
||||||
|
float currIn,currOut ;
|
||||||
|
|
||||||
|
rsConfig->GetMaxDataRates(maxIn,maxOut) ;
|
||||||
|
rsConfig->GetCurrentDataRates(currIn,currOut) ;
|
||||||
|
|
||||||
|
float accepted_bandwidth = std::max(0.0f,maxOut - currOut) ;
|
||||||
|
float sending_probability = std::min( accepted_bandwidth / estimated_required_bandwidth,1.0f ) ;
|
||||||
|
|
||||||
|
#ifdef NXS_NET_DEBUG
|
||||||
|
std::cerr << "bandwidthRecorder::computeCurrentSendingProbability()" << std::endl;
|
||||||
|
std::cerr << " current required bandwidth: " << estimated_required_bandwidth << " KB/s" << std::endl;
|
||||||
|
std::cerr << " max out = " << maxOut << ", currOut=" << currOut << std::endl;
|
||||||
|
std::cerr << " computed probability: " << sending_probability << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
return sending_probability ;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
static RsMutex mtx;
|
||||||
|
static uint64_t last_event_record ;
|
||||||
|
static float estimated_required_bandwidth ;
|
||||||
|
static uint32_t total_events ;
|
||||||
|
static uint64_t total_record ;
|
||||||
|
};
|
||||||
|
|
||||||
|
uint32_t NxsBandwidthRecorder::total_events =0 ; // total number of events. Not used.
|
||||||
|
uint64_t NxsBandwidthRecorder::last_event_record = time(NULL) * 1000;// starting time of bw estimate period (in msec)
|
||||||
|
uint64_t NxsBandwidthRecorder::total_record =0 ; // total bytes recorded in the current time frame
|
||||||
|
float NxsBandwidthRecorder::estimated_required_bandwidth = 10.0f ;// Estimated BW for sending sync data. Set to 10KB/s, to avoid 0.
|
||||||
|
RsMutex NxsBandwidthRecorder::mtx("Bandwidth recorder") ; // Protects the recorder since bw events are collected from multiple GXS Net services
|
||||||
|
|
||||||
void RsGxsNetService::syncWithPeers()
|
void RsGxsNetService::syncWithPeers()
|
||||||
{
|
{
|
||||||
#ifdef NXS_NET_DEBUG
|
#ifdef NXS_NET_DEBUG
|
||||||
@ -99,6 +186,8 @@ void RsGxsNetService::syncWithPeers()
|
|||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
static RsNxsSerialiser ser(mServType) ; // this is used to estimate bandwidth.
|
||||||
|
|
||||||
RsStackMutex stack(mNxsMutex);
|
RsStackMutex stack(mNxsMutex);
|
||||||
|
|
||||||
std::set<RsPeerId> peers;
|
std::set<RsPeerId> peers;
|
||||||
@ -110,24 +199,27 @@ void RsGxsNetService::syncWithPeers()
|
|||||||
{
|
{
|
||||||
// for now just grps
|
// for now just grps
|
||||||
for(; sit != peers.end(); ++sit)
|
for(; sit != peers.end(); ++sit)
|
||||||
{
|
{
|
||||||
|
|
||||||
const RsPeerId peerId = *sit;
|
const RsPeerId peerId = *sit;
|
||||||
|
|
||||||
ClientGrpMap::const_iterator cit = mClientGrpUpdateMap.find(peerId);
|
ClientGrpMap::const_iterator cit = mClientGrpUpdateMap.find(peerId);
|
||||||
uint32_t updateTS = 0;
|
uint32_t updateTS = 0;
|
||||||
if(cit != mClientGrpUpdateMap.end())
|
if(cit != mClientGrpUpdateMap.end())
|
||||||
{
|
{
|
||||||
const RsGxsGrpUpdateItem *gui = cit->second;
|
const RsGxsGrpUpdateItem *gui = cit->second;
|
||||||
updateTS = gui->grpUpdateTS;
|
updateTS = gui->grpUpdateTS;
|
||||||
}
|
}
|
||||||
RsNxsSyncGrp *grp = new RsNxsSyncGrp(mServType);
|
RsNxsSyncGrp *grp = new RsNxsSyncGrp(mServType);
|
||||||
grp->clear();
|
grp->clear();
|
||||||
grp->PeerId(*sit);
|
grp->PeerId(*sit);
|
||||||
grp->updateTS = updateTS;
|
grp->updateTS = updateTS;
|
||||||
sendItem(grp);
|
|
||||||
}
|
NxsBandwidthRecorder::recordEvent(mServType,grp) ;
|
||||||
}
|
|
||||||
|
sendItem(grp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#ifndef GXS_DISABLE_SYNC_MSGS
|
#ifndef GXS_DISABLE_SYNC_MSGS
|
||||||
|
|
||||||
@ -156,6 +248,11 @@ void RsGxsNetService::syncWithPeers()
|
|||||||
|
|
||||||
sit = peers.begin();
|
sit = peers.begin();
|
||||||
|
|
||||||
|
float sending_probability = NxsBandwidthRecorder::computeCurrentSendingProbability() ;
|
||||||
|
#ifdef NXS_NET_DEBUG
|
||||||
|
std::cerr << "syncWithPeers(): Sending probability = " << sending_probability << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
// synchronise group msg for groups which we're subscribed to
|
// synchronise group msg for groups which we're subscribed to
|
||||||
for(; sit != peers.end(); ++sit)
|
for(; sit != peers.end(); ++sit)
|
||||||
{
|
{
|
||||||
@ -199,8 +296,14 @@ void RsGxsNetService::syncWithPeers()
|
|||||||
msg->PeerId(peerId);
|
msg->PeerId(peerId);
|
||||||
msg->grpId = grpId;
|
msg->grpId = grpId;
|
||||||
msg->updateTS = updateTS;
|
msg->updateTS = updateTS;
|
||||||
sendItem(msg);
|
|
||||||
}
|
NxsBandwidthRecorder::recordEvent(mServType,msg) ;
|
||||||
|
|
||||||
|
if(RSRandom::random_f32() < sending_probability)
|
||||||
|
sendItem(msg);
|
||||||
|
else
|
||||||
|
delete msg ;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
GrpMetaMap::iterator mmit = toRequest.begin();
|
GrpMetaMap::iterator mmit = toRequest.begin();
|
||||||
|
Loading…
Reference in New Issue
Block a user