Added Bandwidth Monitoring service to libretroshare to help debug Lag.

- p3bwctrl.h/.cc & rsbwctrlitems.h/.cc
	- New Interface in pqihandler to extract the data.
	- New Interface in rsconfig to display in GUI.
	- Added extra debugging in pqistreamer for catching big outqueues.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@5241 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2012-06-21 23:23:46 +00:00
parent 0d3d1ebc18
commit 48a1c66c60
16 changed files with 880 additions and 31 deletions

View File

@ -438,6 +438,7 @@ HEADERS += serialiser/rsbaseitems.h \
serialiser/rsdsdvitems.h \ serialiser/rsdsdvitems.h \
serialiser/rstlvbanlist.h \ serialiser/rstlvbanlist.h \
serialiser/rsbanlistitems.h \ serialiser/rsbanlistitems.h \
serialiser/rsbwctrlitems.h \
serialiser/rstunnelitems.h serialiser/rstunnelitems.h
HEADERS += services/p3channels.h \ HEADERS += services/p3channels.h \
@ -452,6 +453,7 @@ HEADERS += services/p3channels.h \
services/p3statusservice.h \ services/p3statusservice.h \
services/p3dsdv.h \ services/p3dsdv.h \
services/p3banlist.h \ services/p3banlist.h \
services/p3bwctrl.h \
services/p3tunnel.h services/p3tunnel.h
HEADERS += distrib/p3distrib.h \ HEADERS += distrib/p3distrib.h \
@ -579,6 +581,7 @@ SOURCES += serialiser/rsbaseitems.cc \
serialiser/rsdsdvitems.cc \ serialiser/rsdsdvitems.cc \
serialiser/rstlvbanlist.cc \ serialiser/rstlvbanlist.cc \
serialiser/rsbanlistitems.cc \ serialiser/rsbanlistitems.cc \
serialiser/rsbwctrlitems.cc \
serialiser/rstunnelitems.cc serialiser/rstunnelitems.cc
SOURCES += services/p3channels.cc \ SOURCES += services/p3channels.cc \
@ -591,7 +594,8 @@ SOURCES += services/p3channels.cc \
services/p3service.cc \ services/p3service.cc \
services/p3statusservice.cc \ services/p3statusservice.cc \
services/p3dsdv.cc \ services/p3dsdv.cc \
services/p3banlist.cc services/p3banlist.cc \
services/p3bwctrl.cc \
# removed because getPeer() doesn t exist services/p3tunnel.cc # removed because getPeer() doesn t exist services/p3tunnel.cc

View File

@ -53,6 +53,19 @@ int fixme(char *str, int n);
* For controlling data rates. * For controlling data rates.
* #define DEBUG_RATECAP 1 * #define DEBUG_RATECAP 1
*/ */
class RsBwRates
{
public:
RsBwRates()
:mRateIn(0), mRateOut(0), mMaxRateIn(0), mMaxRateOut(0) {return;}
float mRateIn;
float mRateOut;
float mMaxRateIn;
float mMaxRateOut;
};
class RateInterface class RateInterface
{ {
@ -64,6 +77,15 @@ public:
virtual ~RateInterface() { return; } virtual ~RateInterface() { return; }
virtual void getRates(RsBwRates &rates)
{
rates.mRateIn = bw_in;
rates.mRateOut = bw_out;
rates.mMaxRateIn = bwMax_in;
rates.mMaxRateOut = bwMax_out;
return;
}
virtual float getRate(bool in) virtual float getRate(bool in)
{ {
if (in) if (in)

View File

@ -735,6 +735,37 @@ RsRawItem *pqihandler::GetRsRawItem()
static const float MIN_RATE = 0.01; // 10 B/s static const float MIN_RATE = 0.01; // 10 B/s
// NEW extern fn to extract rates.
int pqihandler::ExtractRates(std::map<std::string, RsBwRates> &ratemap, RsBwRates &total)
{
total.mMaxRateIn = getMaxRate(true);
total.mMaxRateOut = getMaxRate(false);
total.mRateIn = 0;
total.mRateOut = 0;
/* Lock once rates have been retrieved */
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
std::map<std::string, SearchModule *>::iterator it;
for(it = mods.begin(); it != mods.end(); it++)
{
SearchModule *mod = (it -> second);
RsBwRates peerRates;
mod -> pqi -> getRates(peerRates);
total.mRateIn += peerRates.mRateIn;
total.mRateOut += peerRates.mRateOut;
ratemap[it->first] = peerRates;
}
return 1;
}
// internal fn to send updates // internal fn to send updates
int pqihandler::UpdateRates() int pqihandler::UpdateRates()
{ {

View File

@ -88,14 +88,14 @@ class pqihandler: public P3Interface, public pqiQoS
virtual RsRawItem *GetRsRawItem(); virtual RsRawItem *GetRsRawItem();
// rate control. // rate control.
//indiv rate is deprecated
//void setMaxIndivRate(bool in, float val);
//float getMaxIndivRate(bool in);
void setMaxRate(bool in, float val); void setMaxRate(bool in, float val);
float getMaxRate(bool in); float getMaxRate(bool in);
void getCurrentRates(float &in, float &out); void getCurrentRates(float &in, float &out);
// TESTING INTERFACE.
int ExtractRates(std::map<std::string, RsBwRates> &ratemap, RsBwRates &totals);
bool drawFromQoS_queue() ; bool drawFromQoS_queue() ;
protected: protected:
@ -137,25 +137,6 @@ class pqihandler: public P3Interface, public pqiQoS
float ticks_per_sec ; float ticks_per_sec ;
}; };
//inline void pqihandler::setMaxIndivRate(bool in, float val)
//{
// RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
// if (in)
// rateIndiv_in = val;
// else
// rateIndiv_out = val;
// return;
//}
//
//inline float pqihandler::getMaxIndivRate(bool in)
//{
// RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
// if (in)
// return rateIndiv_in;
// else
// return rateIndiv_out;
//}
inline void pqihandler::setMaxRate(bool in, float val) inline void pqihandler::setMaxRate(bool in, float val)
{ {
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/

View File

@ -849,6 +849,24 @@ void pqistreamer::outSentBytes(int outb)
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out);
} }
/*** One theory for the massive delays - is that the queue here is filling up ****/
//#define DEBUG_LAG 1
#ifdef DEBUG_LAG
#define MIN_PKTS_FOR_MSG 100
if (out_pkt.size() > MIN_PKTS_FOR_MSG)
{
std::cerr << "pqistreamer::outSentBytes() for: " << PeerId();
std::cerr << " End of Write and still " << out_pkt.size() << " pkts left";
std::cerr << std::endl;
}
#endif
totalSent += outb; totalSent += outb;
currSent += outb; currSent += outb;

View File

@ -29,6 +29,7 @@
#include <inttypes.h> #include <inttypes.h>
#include <string> #include <string>
#include <list> #include <list>
#include <map>
/* The New Config Interface Class */ /* The New Config Interface Class */
class RsServerConfig; class RsServerConfig;
@ -134,14 +135,31 @@ class RsConfigDataRates
public: public:
RsConfigDataRates() RsConfigDataRates()
{ {
maxDownloadDataRate = 0; mRateIn = 0;
maxUploadDataRate = 0; mRateMaxIn = 0;
maxIndivDataRate = 0; mAllocIn = 0;
mAllocTs = 0;
mRateOut = 0;
mRateMaxOut = 0;
mAllowedOut = 0;
mAllowedTs = 0;
} }
int maxDownloadDataRate; /* kb */ /* all in kB/s */
int maxUploadDataRate; /* kb */ float mRateIn;
int maxIndivDataRate; /* kb */ float mRateMaxIn;
float mAllocIn;
time_t mAllocTs;
float mRateOut;
float mRateMaxOut;
float mAllowedOut;
time_t mAllowedTs;
}; };
@ -209,7 +227,9 @@ virtual int getConfigNetStatus(RsConfigNetStatus &status) = 0;
// NOT IMPLEMENTED YET! // NOT IMPLEMENTED YET!
//virtual int getConfigStartup(RsConfigStartup &params) = 0; //virtual int getConfigStartup(RsConfigStartup &params) = 0;
//virtual int getConfigDataRates(RsConfigDataRates &params) = 0;
virtual int getTotalBandwidthRates(RsConfigDataRates &rates) = 0;
virtual int getAllBandwidthRates(std::map<std::string, RsConfigDataRates> &ratemap) = 0;
/* From RsInit */ /* From RsInit */

View File

@ -24,6 +24,7 @@
*/ */
#include "rsserver/p3serverconfig.h" #include "rsserver/p3serverconfig.h"
#include "services/p3bwctrl.h"
RsServerConfig *rsConfig = NULL; RsServerConfig *rsConfig = NULL;
@ -107,10 +108,33 @@ int p3ServerConfig::getConfigStartup(RsConfigStartup &/*params*/)
return 0; return 0;
} }
#if 0
int p3ServerConfig::getConfigDataRates(RsConfigDataRates &/*params*/) int p3ServerConfig::getConfigDataRates(RsConfigDataRates &/*params*/)
{ {
return 0; return 0;
} }
#endif
/***** for RsConfig -> p3BandwidthControl ****/
int p3ServerConfig::getTotalBandwidthRates(RsConfigDataRates &rates)
{
if (rsBandwidthControl)
{
return rsBandwidthControl->getTotalBandwidthRates(rates);
}
return 0;
}
int p3ServerConfig::getAllBandwidthRates(std::map<std::string, RsConfigDataRates> &ratemap)
{
if (rsBandwidthControl)
{
return rsBandwidthControl->getAllBandwidthRates(ratemap);
}
return 0;
}
/* From RsInit */ /* From RsInit */

View File

@ -49,7 +49,12 @@ virtual ~p3ServerConfig();
virtual int getConfigNetStatus(RsConfigNetStatus &status); virtual int getConfigNetStatus(RsConfigNetStatus &status);
virtual int getConfigStartup(RsConfigStartup &params); virtual int getConfigStartup(RsConfigStartup &params);
virtual int getConfigDataRates(RsConfigDataRates &params); //virtual int getConfigDataRates(RsConfigDataRates &params);
/***** for RsConfig -> p3BandwidthControl ****/
virtual int getTotalBandwidthRates(RsConfigDataRates &rates);
virtual int getAllBandwidthRates(std::map<std::string, RsConfigDataRates> &ratemap);
/* From RsInit */ /* From RsInit */

View File

@ -1828,6 +1828,7 @@ RsTurtle *rsTurtle = NULL ;
#define RS_RELEASE 1 #define RS_RELEASE 1
#include "services/p3banlist.h" #include "services/p3banlist.h"
#include "services/p3bwctrl.h"
#include "services/p3dsdv.h" #include "services/p3dsdv.h"
@ -2241,6 +2242,8 @@ int RsServer::StartupRetroShare()
pqih -> addService(mBanList); pqih -> addService(mBanList);
mBitDht->setupPeerSharer(mBanList); mBitDht->setupPeerSharer(mBanList);
p3BandwidthControl *mBwCtrl = new p3BandwidthControl(pqih);
#ifdef RS_DSDVTEST #ifdef RS_DSDVTEST
p3Dsdv *mDsdv = new p3Dsdv(mLinkMgr); p3Dsdv *mDsdv = new p3Dsdv(mLinkMgr);
pqih -> addService(mDsdv); pqih -> addService(mDsdv);
@ -2283,6 +2286,7 @@ int RsServer::StartupRetroShare()
mLinkMgr->addMonitor(msgSrv); mLinkMgr->addMonitor(msgSrv);
mLinkMgr->addMonitor(mStatusSrv); mLinkMgr->addMonitor(mStatusSrv);
mLinkMgr->addMonitor(chatSrv); mLinkMgr->addMonitor(chatSrv);
mLinkMgr->addMonitor(mBwCtrl);
/* must also add the controller as a Monitor... /* must also add the controller as a Monitor...
* a little hack to get it to work. * a little hack to get it to work.
@ -2470,6 +2474,7 @@ int RsServer::StartupRetroShare()
/* Setup GUI Interfaces. */ /* Setup GUI Interfaces. */
rsDisc = new p3Discovery(ad); rsDisc = new p3Discovery(ad);
rsBandwidthControl = mBwCtrl;
rsConfig = new p3ServerConfig(mPeerMgr, mLinkMgr, mNetMgr, mGeneralConfig); rsConfig = new p3ServerConfig(mPeerMgr, mLinkMgr, mNetMgr, mGeneralConfig);
rsMsgs = new p3Msgs(msgSrv, chatSrv); rsMsgs = new p3Msgs(msgSrv, chatSrv);
@ -2489,6 +2494,7 @@ int RsServer::StartupRetroShare()
rsPhoto = NULL; rsPhoto = NULL;
#endif #endif
/* put a welcome message in! */ /* put a welcome message in! */
if (RsInitConfig::firsttime_run) if (RsInitConfig::firsttime_run)
{ {

View File

@ -81,6 +81,10 @@ const uint8_t QOS_PRIORITY_RS_VOIP_PING = 9 ;
// //
const uint8_t QOS_PRIORITY_RS_BANLIST_ITEM = 2 ; const uint8_t QOS_PRIORITY_RS_BANLIST_ITEM = 2 ;
// Bandwidth Control.
//
const uint8_t QOS_PRIORITY_RS_BWCTRL_ALLOWED_ITEM = 9 ;
// Dsdv Routing // Dsdv Routing
// //
const uint8_t QOS_PRIORITY_RS_DSDV_ROUTE = 4 ; const uint8_t QOS_PRIORITY_RS_DSDV_ROUTE = 4 ;

View File

@ -0,0 +1,205 @@
/*
* libretroshare/src/serialiser: rsbwctrlitems.cc
*
* RetroShare Serialiser.
*
* Copyright 2012 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2.1 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#include "serialiser/rsbaseserial.h"
#include "serialiser/rsbwctrlitems.h"
/***
#define RSSERIAL_DEBUG 1
***/
#include <iostream>
/*************************************************************************/
RsBwCtrlAllowedItem::~RsBwCtrlAllowedItem()
{
return;
}
void RsBwCtrlAllowedItem::clear()
{
allowedBw = 0;
}
std::ostream &RsBwCtrlAllowedItem::print(std::ostream &out, uint16_t indent)
{
printRsItemBase(out, "RsBwCtrlAllowedItem", indent);
uint16_t int_Indent = indent + 2;
printIndent(out, int_Indent);
out << "AllowedBw: " << allowedBw;
out << std::endl;
printRsItemEnd(out, "RsBwCtrlAllowedItem", indent);
return out;
}
uint32_t RsBwCtrlSerialiser::sizeAllowed(RsBwCtrlAllowedItem * /*item*/)
{
uint32_t s = 8; /* header */
s += GetTlvUInt32Size();
return s;
}
/* serialise the data to the buffer */
bool RsBwCtrlSerialiser::serialiseAllowed(RsBwCtrlAllowedItem *item, void *data, uint32_t *pktsize)
{
uint32_t tlvsize = sizeAllowed(item);
uint32_t offset = 0;
if (*pktsize < tlvsize)
return false; /* not enough space */
*pktsize = tlvsize;
bool ok = true;
ok &= setRsItemHeader(data, tlvsize, item->PacketId(), tlvsize);
#ifdef RSSERIAL_DEBUG
std::cerr << "RsBwCtrlSerialiser::serialiseRoute() Header: " << ok << std::endl;
std::cerr << "RsBwCtrlSerialiser::serialiseRoute() Size: " << tlvsize << std::endl;
#endif
/* skip the header */
offset += 8;
/* add mandatory parts first */
ok &= SetTlvUInt32(data, tlvsize, &offset, TLV_TYPE_UINT32_BW, item->allowedBw);
if (offset != tlvsize)
{
ok = false;
#ifdef RSSERIAL_DEBUG
std::cerr << "RsBwCtrlSerialiser::serialiseRoute() Size Error! " << std::endl;
#endif
}
return ok;
}
RsBwCtrlAllowedItem *RsBwCtrlSerialiser::deserialiseAllowed(void *data, uint32_t *pktsize)
{
/* get the type and size */
uint32_t rstype = getRsItemId(data);
uint32_t tlvsize = getRsItemSize(data);
uint32_t offset = 0;
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) ||
(RS_SERVICE_TYPE_BWCTRL != getRsItemService(rstype)) ||
(RS_PKT_SUBTYPE_BWCTRL_ALLOWED_ITEM != getRsItemSubType(rstype)))
{
return NULL; /* wrong type */
}
if (*pktsize < tlvsize) /* check size */
return NULL; /* not enough data */
/* set the packet length */
*pktsize = tlvsize;
bool ok = true;
/* ready to load */
RsBwCtrlAllowedItem *item = new RsBwCtrlAllowedItem();
item->clear();
/* skip the header */
offset += 8;
/* get mandatory parts first */
ok &= GetTlvUInt32(data, tlvsize, &offset, TLV_TYPE_UINT32_BW, &(item->allowedBw));
if (offset != tlvsize)
{
/* error */
delete item;
return NULL;
}
if (!ok)
{
delete item;
return NULL;
}
return item;
}
/*************************************************************************/
uint32_t RsBwCtrlSerialiser::size(RsItem *i)
{
RsBwCtrlAllowedItem *dri;
if (NULL != (dri = dynamic_cast<RsBwCtrlAllowedItem *>(i)))
{
return sizeAllowed(dri);
}
return 0;
}
bool RsBwCtrlSerialiser::serialise(RsItem *i, void *data, uint32_t *pktsize)
{
RsBwCtrlAllowedItem *dri;
if (NULL != (dri = dynamic_cast<RsBwCtrlAllowedItem *>(i)))
{
return serialiseAllowed(dri, data, pktsize);
}
return false;
}
RsItem *RsBwCtrlSerialiser::deserialise(void *data, uint32_t *pktsize)
{
/* get the type and size */
uint32_t rstype = getRsItemId(data);
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) ||
(RS_SERVICE_TYPE_BWCTRL != getRsItemService(rstype)))
{
return NULL; /* wrong type */
}
switch(getRsItemSubType(rstype))
{
case RS_PKT_SUBTYPE_BWCTRL_ALLOWED_ITEM:
return deserialiseAllowed(data, pktsize);
break;
default:
return NULL;
break;
}
}
/*************************************************************************/

View File

@ -0,0 +1,84 @@
#ifndef RS_BANDWIDTH_CONTROL_ITEMS_H
#define RS_BANDWIDTH_CONTROL_ITEMS_H
/*
* libretroshare/src/serialiser: rsbwctrlitems.h
*
* RetroShare Serialiser.
*
* Copyright 2012 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2.1 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#include <map>
#include "serialiser/rsserviceids.h"
#include "serialiser/rsserial.h"
#include "serialiser/rstlvbase.h"
#define RS_PKT_SUBTYPE_BWCTRL_ALLOWED_ITEM 0x01
/**************************************************************************/
class RsBwCtrlAllowedItem: public RsItem
{
public:
RsBwCtrlAllowedItem()
:RsItem(RS_PKT_VERSION_SERVICE, RS_SERVICE_TYPE_BWCTRL,
RS_PKT_SUBTYPE_BWCTRL_ALLOWED_ITEM)
{
setPriorityLevel(QOS_PRIORITY_RS_BWCTRL_ALLOWED_ITEM);
return;
}
virtual ~RsBwCtrlAllowedItem();
virtual void clear();
std::ostream &print(std::ostream &out, uint16_t indent = 0);
uint32_t allowedBw; // Units are bytes/sec => 4Gb/s;
};
class RsBwCtrlSerialiser: public RsSerialType
{
public:
RsBwCtrlSerialiser()
:RsSerialType(RS_PKT_VERSION_SERVICE, RS_SERVICE_TYPE_BWCTRL)
{ return; }
virtual ~RsBwCtrlSerialiser()
{ return; }
virtual uint32_t size(RsItem *);
virtual bool serialise (RsItem *item, void *data, uint32_t *size);
virtual RsItem * deserialise(void *data, uint32_t *size);
private:
virtual uint32_t sizeAllowed(RsBwCtrlAllowedItem *);
virtual bool serialiseAllowed (RsBwCtrlAllowedItem *item, void *data, uint32_t *size);
virtual RsBwCtrlAllowedItem *deserialiseAllowed(void *data, uint32_t *size);
};
/**************************************************************************/
#endif /* RS_BANDWIDTH_CONTROL_ITEMS_H */

View File

@ -94,6 +94,9 @@ const uint16_t RS_SERVICE_TYPE_PHOTO = 0xf040;
/* DSDV Testing at the moment - Service Only */ /* DSDV Testing at the moment - Service Only */
const uint16_t RS_SERVICE_TYPE_DSDV = 0xf050; const uint16_t RS_SERVICE_TYPE_DSDV = 0xf050;
/* Bandwidth Testing at the moment - Service Only */
const uint16_t RS_SERVICE_TYPE_BWCTRL = 0xf060;
/* Games/External Apps - Service Only */ /* Games/External Apps - Service Only */
const uint16_t RS_SERVICE_TYPE_GAME_LAUNCHER = 0xf200; const uint16_t RS_SERVICE_TYPE_GAME_LAUNCHER = 0xf200;

View File

@ -115,6 +115,7 @@ const uint16_t TLV_TYPE_UINT32_POP = 0x0031;
const uint16_t TLV_TYPE_UINT32_AGE = 0x0032; const uint16_t TLV_TYPE_UINT32_AGE = 0x0032;
const uint16_t TLV_TYPE_UINT32_OFFSET = 0x0033; const uint16_t TLV_TYPE_UINT32_OFFSET = 0x0033;
const uint16_t TLV_TYPE_UINT32_SERID = 0x0034; const uint16_t TLV_TYPE_UINT32_SERID = 0x0034;
const uint16_t TLV_TYPE_UINT32_BW = 0x0035;
const uint16_t TLV_TYPE_UINT64_SIZE = 0x0040; const uint16_t TLV_TYPE_UINT64_SIZE = 0x0040;
const uint16_t TLV_TYPE_UINT64_OFFSET = 0x0041; const uint16_t TLV_TYPE_UINT64_OFFSET = 0x0041;

View File

@ -0,0 +1,313 @@
/*
* libretroshare/src/services p3bwctrl.cc
*
* Bandwidth Control Service for RetroShare.
*
* Copyright 2011-2011 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2.1 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#include "pqi/p3linkmgr.h"
#include "pqi/p3netmgr.h"
#include "util/rsnet.h"
#include "services/p3bwctrl.h"
#include "serialiser/rsbwctrlitems.h"
#include <sys/time.h>
/****
* #define DEBUG_BWCTRL 1
****/
/************ IMPLEMENTATION NOTES *********************************
*
*/
p3BandwidthControl *rsBandwidthControl;
p3BandwidthControl::p3BandwidthControl(pqipersongrp *pg)
:p3Service(RS_SERVICE_TYPE_BWCTRL), mPg(pg), mBwMtx("p3BwCtrl")
{
addSerialType(new RsBwCtrlSerialiser());
mLastCheck = 0;
}
int p3BandwidthControl::tick()
{
processIncoming();
bool doCheck = false;
{
RsStackMutex stack(mBwMtx); /****** LOCKED MUTEX *******/
#define CHECK_PERIOD 5
time_t now = time(NULL);
if (now - mLastCheck > CHECK_PERIOD)
{
doCheck = true;
mLastCheck = now;
}
}
if (doCheck)
{
checkAvailableBandwidth();
}
return 0;
}
int p3BandwidthControl::status()
{
return 1;
}
/***** Implementation ******/
bool p3BandwidthControl::checkAvailableBandwidth()
{
/* check each connection status */
std::map<std::string, RsBwRates> rateMap;
RsBwRates total;
mPg->ExtractRates(rateMap, total);
std::map<std::string, RsBwRates>::iterator it;
std::map<std::string, BwCtrlData>::iterator bit;
/* have to merge with existing list,
* erasing as we go ... then any left have to deal with
*/
RsStackMutex stack(mBwMtx); /****** LOCKED MUTEX *******/
mTotalRates = total;
time_t now = time(NULL);
std::list<std::string> oldIds; // unused for now!
for(bit = mBwMap.begin(); bit != mBwMap.end(); bit++)
{
/* check alloc rate */
//time_t age = now - bit->second.mLastSend;
/* find a matching entry */
it = rateMap.find(bit->first);
if (it == rateMap.end())
{
oldIds.push_back(bit->first);
continue;
}
//float delta = bit->second.mAllocated - it->second.mMaxRateIn;
/* if delta < 0 ... then need update (or else we get a queue) */
/* if delta > 0 ... then need update (to allow more data) */
/* for the moment - always send an update */
bool updatePeer = true;
#if 0
/* if changed significantly */
if (sig)
{
updatePeer = true;
}
/* if changed small but old */
if ((any change) && (timeperiod))
{
updatePeer = true;
}
#endif
/* update rates info */
bit->second.mRates = it->second;
bit->second.mRateUpdateTs = now;
if (updatePeer)
{
#define ALLOC_FACTOR (0.9)
// save value sent,
bit->second.mAllocated = ALLOC_FACTOR * 1000.0 * it->second.mMaxRateIn;
bit->second.mLastSend = now;
RsBwCtrlAllowedItem *item = new RsBwCtrlAllowedItem();
item->PeerId(bit->first);
item->allowedBw = bit->second.mAllocated;
sendItem(item);
}
/* now cleanup */
rateMap.erase(it);
}
/* any left over rateMaps ... are bad! (or not active - more likely) */
return true;
}
bool p3BandwidthControl::processIncoming()
{
RsItem *item = NULL;
time_t now = time(NULL);
while(NULL != (item = recvItem()))
{
RsBwCtrlAllowedItem *bci = dynamic_cast<RsBwCtrlAllowedItem *>(item);
if (!bci)
{
delete item;
continue;
}
/* For each packet */
RsStackMutex stack(mBwMtx); /****** LOCKED MUTEX *******/
std::map<std::string, BwCtrlData>::iterator bit;
bit = mBwMap.find(bci->PeerId());
if (bit == mBwMap.end())
{
// ERROR.
delete item;
continue;
}
/* update allowed bandwidth */
bit->second.mAllowedOut = bci->allowedBw;
bit->second.mLastRecvd = now;
delete item;
/* store info in data */
//mPg->setAllowedRate(bit->first, bit->second.mAllowedOut / 1000.0);
}
return true;
}
int p3BandwidthControl::getTotalBandwidthRates(RsConfigDataRates &rates)
{
RsStackMutex stack(mBwMtx); /****** LOCKED MUTEX *******/
rates.mRateIn = mTotalRates.mRateIn;
rates.mRateMaxIn = mTotalRates.mMaxRateIn;
rates.mRateOut = mTotalRates.mRateOut;
rates.mRateMaxOut = mTotalRates.mMaxRateOut;
rates.mAllocIn = 0;
rates.mAllocTs = 0;
rates.mAllowedOut = 0;
rates.mAllowedTs = 0;
return 1;
}
int p3BandwidthControl::getAllBandwidthRates(std::map<std::string, RsConfigDataRates> &ratemap)
{
RsStackMutex stack(mBwMtx); /****** LOCKED MUTEX *******/
std::map<std::string, BwCtrlData>::iterator bit;
for(bit = mBwMap.begin(); bit != mBwMap.end(); bit++)
{
RsConfigDataRates rates;
rates.mRateIn = bit->second.mRates.mRateIn;
rates.mRateMaxIn = bit->second.mRates.mMaxRateIn;
rates.mRateOut = bit->second.mRates.mRateOut;
rates.mRateMaxOut = bit->second.mRates.mMaxRateOut;
rates.mAllocIn = bit->second.mAllocated / 1000.0;
rates.mAllocTs = bit->second.mLastSend;
rates.mAllowedOut = bit->second.mAllowedOut / 1000.0;
rates.mAllowedTs = bit->second.mLastRecvd;
ratemap[bit->first] = rates;
}
return true ;
}
int p3BandwidthControl::printRateInfo_locked(std::ostream &out)
{
out << "p3BandwidthControl::printRateInfo_locked()";
out << std::endl;
//time_t now = time(NULL);
std::map<std::string, BwCtrlData>::iterator bit;
for(bit = mBwMap.begin(); bit != mBwMap.end(); bit++)
{
//out << " Age: " << now - it->second.mTs;
//out << std::endl;
}
return true ;
}
/*************** pqiMonitor callback ***********************/
void p3BandwidthControl::statusChange(const std::list<pqipeer> &plist)
{
std::list<pqipeer>::const_iterator it;
for (it = plist.begin(); it != plist.end(); it++)
{
if (it->state & RS_PEER_S_FRIEND)
{
if (it->actions & RS_PEER_DISCONNECTED)
{
/* remove from map */
RsStackMutex stack(mBwMtx); /****** LOCKED MUTEX *******/
std::map<std::string, BwCtrlData>::iterator bit;
bit = mBwMap.find(it->id);
if (bit == mBwMap.end())
{
std::cerr << "p3BandwidthControl::statusChange() ERROR";
std::cerr << " Entry not in map";
std::cerr << std::endl;
}
else
{
mBwMap.erase(bit);
}
}
else if (it->actions & RS_PEER_CONNECTED)
{
/* stuff */
BwCtrlData data;
mBwMap[it->id] = data;
}
}
}
return;
}

View File

@ -0,0 +1,128 @@
/*
* libretroshare/src/services/p3bwctrl.h
*
* Bandwidth Control.
*
* Copyright 2012 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2.1 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#ifndef SERVICE_RSBANDWIDTH_CONTROL_HEADER
#define SERVICE_RSBANDWIDTH_CONTROL_HEADER
#include <string>
#include <list>
#include <map>
#include "serialiser/rsbwctrlitems.h"
#include "services/p3service.h"
#include "pqi/pqipersongrp.h"
#include "retroshare/rsconfig.h" // for datatypes.
// Extern is defined here - as this is bundled with rsconfig.h
class p3BandwidthControl;
extern p3BandwidthControl *rsBandwidthControl;
class BwCtrlData
{
public:
BwCtrlData()
:mRateUpdateTs(0), mAllocated(0), mLastSend(0), mAllowedOut(0), mLastRecvd(0)
{ return; }
/* Rates are floats in KB/s */
RsBwRates mRates;
time_t mRateUpdateTs;
/* these are integers (B/s) */
uint32_t mAllocated;
time_t mLastSend;
uint32_t mAllowedOut;
time_t mLastRecvd;
};
//!The RS bandwidth Control Service.
/**
*
* Exchange packets to regulate p2p bandwidth.
*
* Sadly this has to be strongly integrated into pqi, with ref to pqipersongrp.
*/
class p3BandwidthControl: public p3Service, public pqiMonitor
{
public:
p3BandwidthControl(pqipersongrp *pg);
/***** overloaded from RsBanList *****/
/***** overloaded from p3Service *****/
/*!
* This retrieves all BwCtrl items
*/
virtual int tick();
virtual int status();
/***** for RsConfig (not directly overloaded) ****/
virtual int getTotalBandwidthRates(RsConfigDataRates &rates);
virtual int getAllBandwidthRates(std::map<std::string, RsConfigDataRates> &ratemap);
/*!
* Interface stuff.
*/
/*************** pqiMonitor callback ***********************/
virtual void statusChange(const std::list<pqipeer> &plist);
/************* from p3Config *******************/
//virtual RsSerialiser *setupSerialiser() ;
//virtual bool saveList(bool& cleanup, std::list<RsItem*>&) ;
//virtual void saveDone();
//virtual bool loadList(std::list<RsItem*>& load) ;
private:
bool checkAvailableBandwidth();
bool processIncoming();
pqipersongrp *mPg;
RsMutex mBwMtx;
int printRateInfo_locked(std::ostream &out);
time_t mLastCheck;
RsBwRates mTotalRates;
std::map<std::string, BwCtrlData> mBwMap;
};
#endif // SERVICE_RSBANDWIDTH_CONTROL_HEADER