From 48a1c66c60a8a152f402dcb7e5f0cf510ac05d71 Mon Sep 17 00:00:00 2001 From: drbob Date: Thu, 21 Jun 2012 23:23:46 +0000 Subject: [PATCH] Added Bandwidth Monitoring service to libretroshare to help debug Lag. - p3bwctrl.h/.cc & rsbwctrlitems.h/.cc - New Interface in pqihandler to extract the data. - New Interface in rsconfig to display in GUI. - Added extra debugging in pqistreamer for catching big outqueues. git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@5241 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/libretroshare.pro | 6 +- libretroshare/src/pqi/pqi_base.h | 22 ++ libretroshare/src/pqi/pqihandler.cc | 31 ++ libretroshare/src/pqi/pqihandler.h | 25 +- libretroshare/src/pqi/pqistreamer.cc | 18 + libretroshare/src/retroshare/rsconfig.h | 34 +- libretroshare/src/rsserver/p3serverconfig.cc | 24 ++ libretroshare/src/rsserver/p3serverconfig.h | 7 +- libretroshare/src/rsserver/rsinit.cc | 6 + libretroshare/src/serialiser/itempriorities.h | 4 + libretroshare/src/serialiser/rsbwctrlitems.cc | 205 ++++++++++++ libretroshare/src/serialiser/rsbwctrlitems.h | 84 +++++ libretroshare/src/serialiser/rsserviceids.h | 3 + libretroshare/src/serialiser/rstlvbase.h | 1 + libretroshare/src/services/p3bwctrl.cc | 313 ++++++++++++++++++ libretroshare/src/services/p3bwctrl.h | 128 +++++++ 16 files changed, 880 insertions(+), 31 deletions(-) create mode 100644 libretroshare/src/serialiser/rsbwctrlitems.cc create mode 100644 libretroshare/src/serialiser/rsbwctrlitems.h create mode 100644 libretroshare/src/services/p3bwctrl.cc create mode 100644 libretroshare/src/services/p3bwctrl.h diff --git a/libretroshare/src/libretroshare.pro b/libretroshare/src/libretroshare.pro index 9c367ad04..b8bed6b6e 100644 --- a/libretroshare/src/libretroshare.pro +++ b/libretroshare/src/libretroshare.pro @@ -438,6 +438,7 @@ HEADERS += serialiser/rsbaseitems.h \ serialiser/rsdsdvitems.h \ serialiser/rstlvbanlist.h \ serialiser/rsbanlistitems.h \ + serialiser/rsbwctrlitems.h \ serialiser/rstunnelitems.h HEADERS += services/p3channels.h \ @@ -452,6 +453,7 @@ HEADERS += services/p3channels.h \ services/p3statusservice.h \ services/p3dsdv.h \ services/p3banlist.h \ + services/p3bwctrl.h \ services/p3tunnel.h HEADERS += distrib/p3distrib.h \ @@ -579,6 +581,7 @@ SOURCES += serialiser/rsbaseitems.cc \ serialiser/rsdsdvitems.cc \ serialiser/rstlvbanlist.cc \ serialiser/rsbanlistitems.cc \ + serialiser/rsbwctrlitems.cc \ serialiser/rstunnelitems.cc SOURCES += services/p3channels.cc \ @@ -591,7 +594,8 @@ SOURCES += services/p3channels.cc \ services/p3service.cc \ services/p3statusservice.cc \ services/p3dsdv.cc \ - services/p3banlist.cc + services/p3banlist.cc \ + services/p3bwctrl.cc \ # removed because getPeer() doesn t exist services/p3tunnel.cc diff --git a/libretroshare/src/pqi/pqi_base.h b/libretroshare/src/pqi/pqi_base.h index 0f5342d03..53ea1e73d 100644 --- a/libretroshare/src/pqi/pqi_base.h +++ b/libretroshare/src/pqi/pqi_base.h @@ -53,6 +53,19 @@ int fixme(char *str, int n); * For controlling data rates. * #define DEBUG_RATECAP 1 */ + +class RsBwRates +{ + public: + RsBwRates() + :mRateIn(0), mRateOut(0), mMaxRateIn(0), mMaxRateOut(0) {return;} + float mRateIn; + float mRateOut; + float mMaxRateIn; + float mMaxRateOut; +}; + + class RateInterface { @@ -64,6 +77,15 @@ public: virtual ~RateInterface() { return; } +virtual void getRates(RsBwRates &rates) +{ + rates.mRateIn = bw_in; + rates.mRateOut = bw_out; + rates.mMaxRateIn = bwMax_in; + rates.mMaxRateOut = bwMax_out; + return; +} + virtual float getRate(bool in) { if (in) diff --git a/libretroshare/src/pqi/pqihandler.cc b/libretroshare/src/pqi/pqihandler.cc index afd70cbd8..f91926eda 100644 --- a/libretroshare/src/pqi/pqihandler.cc +++ b/libretroshare/src/pqi/pqihandler.cc @@ -735,6 +735,37 @@ RsRawItem *pqihandler::GetRsRawItem() static const float MIN_RATE = 0.01; // 10 B/s +// NEW extern fn to extract rates. +int pqihandler::ExtractRates(std::map &ratemap, RsBwRates &total) +{ + total.mMaxRateIn = getMaxRate(true); + total.mMaxRateOut = getMaxRate(false); + total.mRateIn = 0; + total.mRateOut = 0; + + /* Lock once rates have been retrieved */ + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ + + std::map::iterator it; + for(it = mods.begin(); it != mods.end(); it++) + { + SearchModule *mod = (it -> second); + + RsBwRates peerRates; + mod -> pqi -> getRates(peerRates); + + total.mRateIn += peerRates.mRateIn; + total.mRateOut += peerRates.mRateOut; + + ratemap[it->first] = peerRates; + + } + + return 1; +} + + + // internal fn to send updates int pqihandler::UpdateRates() { diff --git a/libretroshare/src/pqi/pqihandler.h b/libretroshare/src/pqi/pqihandler.h index bf524eeff..e2367ba44 100644 --- a/libretroshare/src/pqi/pqihandler.h +++ b/libretroshare/src/pqi/pqihandler.h @@ -88,14 +88,14 @@ class pqihandler: public P3Interface, public pqiQoS virtual RsRawItem *GetRsRawItem(); // rate control. - //indiv rate is deprecated - //void setMaxIndivRate(bool in, float val); - //float getMaxIndivRate(bool in); void setMaxRate(bool in, float val); float getMaxRate(bool in); void getCurrentRates(float &in, float &out); + // TESTING INTERFACE. + int ExtractRates(std::map &ratemap, RsBwRates &totals); + bool drawFromQoS_queue() ; protected: @@ -137,25 +137,6 @@ class pqihandler: public P3Interface, public pqiQoS float ticks_per_sec ; }; -//inline void pqihandler::setMaxIndivRate(bool in, float val) -//{ -// RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ -// if (in) -// rateIndiv_in = val; -// else -// rateIndiv_out = val; -// return; -//} -// -//inline float pqihandler::getMaxIndivRate(bool in) -//{ -// RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ -// if (in) -// return rateIndiv_in; -// else -// return rateIndiv_out; -//} - inline void pqihandler::setMaxRate(bool in, float val) { RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ diff --git a/libretroshare/src/pqi/pqistreamer.cc b/libretroshare/src/pqi/pqistreamer.cc index a2274f920..3dd999ffd 100644 --- a/libretroshare/src/pqi/pqistreamer.cc +++ b/libretroshare/src/pqi/pqistreamer.cc @@ -849,6 +849,24 @@ void pqistreamer::outSentBytes(int outb) pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out); } + /*** One theory for the massive delays - is that the queue here is filling up ****/ +//#define DEBUG_LAG 1 +#ifdef DEBUG_LAG + +#define MIN_PKTS_FOR_MSG 100 + if (out_pkt.size() > MIN_PKTS_FOR_MSG) + { + std::cerr << "pqistreamer::outSentBytes() for: " << PeerId(); + std::cerr << " End of Write and still " << out_pkt.size() << " pkts left"; + std::cerr << std::endl; + } + +#endif + + + + + totalSent += outb; currSent += outb; diff --git a/libretroshare/src/retroshare/rsconfig.h b/libretroshare/src/retroshare/rsconfig.h index 03a0a8fc0..70243b161 100644 --- a/libretroshare/src/retroshare/rsconfig.h +++ b/libretroshare/src/retroshare/rsconfig.h @@ -29,6 +29,7 @@ #include #include #include +#include /* The New Config Interface Class */ class RsServerConfig; @@ -134,14 +135,31 @@ class RsConfigDataRates public: RsConfigDataRates() { - maxDownloadDataRate = 0; - maxUploadDataRate = 0; - maxIndivDataRate = 0; + mRateIn = 0; + mRateMaxIn = 0; + mAllocIn = 0; + + mAllocTs = 0; + + mRateOut = 0; + mRateMaxOut = 0; + mAllowedOut = 0; + + mAllowedTs = 0; } - int maxDownloadDataRate; /* kb */ - int maxUploadDataRate; /* kb */ - int maxIndivDataRate; /* kb */ + /* all in kB/s */ + float mRateIn; + float mRateMaxIn; + float mAllocIn; + + time_t mAllocTs; + + float mRateOut; + float mRateMaxOut; + float mAllowedOut; + + time_t mAllowedTs; }; @@ -209,7 +227,9 @@ virtual int getConfigNetStatus(RsConfigNetStatus &status) = 0; // NOT IMPLEMENTED YET! //virtual int getConfigStartup(RsConfigStartup ¶ms) = 0; -//virtual int getConfigDataRates(RsConfigDataRates ¶ms) = 0; + +virtual int getTotalBandwidthRates(RsConfigDataRates &rates) = 0; +virtual int getAllBandwidthRates(std::map &ratemap) = 0; /* From RsInit */ diff --git a/libretroshare/src/rsserver/p3serverconfig.cc b/libretroshare/src/rsserver/p3serverconfig.cc index 651a6a754..3eee45fad 100644 --- a/libretroshare/src/rsserver/p3serverconfig.cc +++ b/libretroshare/src/rsserver/p3serverconfig.cc @@ -24,6 +24,7 @@ */ #include "rsserver/p3serverconfig.h" +#include "services/p3bwctrl.h" RsServerConfig *rsConfig = NULL; @@ -107,10 +108,33 @@ int p3ServerConfig::getConfigStartup(RsConfigStartup &/*params*/) return 0; } +#if 0 int p3ServerConfig::getConfigDataRates(RsConfigDataRates &/*params*/) { return 0; } +#endif + + /***** for RsConfig -> p3BandwidthControl ****/ + +int p3ServerConfig::getTotalBandwidthRates(RsConfigDataRates &rates) +{ + if (rsBandwidthControl) + { + return rsBandwidthControl->getTotalBandwidthRates(rates); + } + return 0; +} + + +int p3ServerConfig::getAllBandwidthRates(std::map &ratemap) +{ + if (rsBandwidthControl) + { + return rsBandwidthControl->getAllBandwidthRates(ratemap); + } + return 0; +} /* From RsInit */ diff --git a/libretroshare/src/rsserver/p3serverconfig.h b/libretroshare/src/rsserver/p3serverconfig.h index 996adcca4..70d6191e7 100644 --- a/libretroshare/src/rsserver/p3serverconfig.h +++ b/libretroshare/src/rsserver/p3serverconfig.h @@ -49,7 +49,12 @@ virtual ~p3ServerConfig(); virtual int getConfigNetStatus(RsConfigNetStatus &status); virtual int getConfigStartup(RsConfigStartup ¶ms); -virtual int getConfigDataRates(RsConfigDataRates ¶ms); +//virtual int getConfigDataRates(RsConfigDataRates ¶ms); + + /***** for RsConfig -> p3BandwidthControl ****/ + +virtual int getTotalBandwidthRates(RsConfigDataRates &rates); +virtual int getAllBandwidthRates(std::map &ratemap); /* From RsInit */ diff --git a/libretroshare/src/rsserver/rsinit.cc b/libretroshare/src/rsserver/rsinit.cc index 21f7f56fc..1581df045 100644 --- a/libretroshare/src/rsserver/rsinit.cc +++ b/libretroshare/src/rsserver/rsinit.cc @@ -1828,6 +1828,7 @@ RsTurtle *rsTurtle = NULL ; #define RS_RELEASE 1 #include "services/p3banlist.h" +#include "services/p3bwctrl.h" #include "services/p3dsdv.h" @@ -2241,6 +2242,8 @@ int RsServer::StartupRetroShare() pqih -> addService(mBanList); mBitDht->setupPeerSharer(mBanList); + p3BandwidthControl *mBwCtrl = new p3BandwidthControl(pqih); + #ifdef RS_DSDVTEST p3Dsdv *mDsdv = new p3Dsdv(mLinkMgr); pqih -> addService(mDsdv); @@ -2283,6 +2286,7 @@ int RsServer::StartupRetroShare() mLinkMgr->addMonitor(msgSrv); mLinkMgr->addMonitor(mStatusSrv); mLinkMgr->addMonitor(chatSrv); + mLinkMgr->addMonitor(mBwCtrl); /* must also add the controller as a Monitor... * a little hack to get it to work. @@ -2470,6 +2474,7 @@ int RsServer::StartupRetroShare() /* Setup GUI Interfaces. */ rsDisc = new p3Discovery(ad); + rsBandwidthControl = mBwCtrl; rsConfig = new p3ServerConfig(mPeerMgr, mLinkMgr, mNetMgr, mGeneralConfig); rsMsgs = new p3Msgs(msgSrv, chatSrv); @@ -2489,6 +2494,7 @@ int RsServer::StartupRetroShare() rsPhoto = NULL; #endif + /* put a welcome message in! */ if (RsInitConfig::firsttime_run) { diff --git a/libretroshare/src/serialiser/itempriorities.h b/libretroshare/src/serialiser/itempriorities.h index 49246788e..b86e635d4 100644 --- a/libretroshare/src/serialiser/itempriorities.h +++ b/libretroshare/src/serialiser/itempriorities.h @@ -81,6 +81,10 @@ const uint8_t QOS_PRIORITY_RS_VOIP_PING = 9 ; // const uint8_t QOS_PRIORITY_RS_BANLIST_ITEM = 2 ; +// Bandwidth Control. +// +const uint8_t QOS_PRIORITY_RS_BWCTRL_ALLOWED_ITEM = 9 ; + // Dsdv Routing // const uint8_t QOS_PRIORITY_RS_DSDV_ROUTE = 4 ; diff --git a/libretroshare/src/serialiser/rsbwctrlitems.cc b/libretroshare/src/serialiser/rsbwctrlitems.cc new file mode 100644 index 000000000..fbd314bb2 --- /dev/null +++ b/libretroshare/src/serialiser/rsbwctrlitems.cc @@ -0,0 +1,205 @@ +/* + * libretroshare/src/serialiser: rsbwctrlitems.cc + * + * RetroShare Serialiser. + * + * Copyright 2012 by Robert Fernie. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License Version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + * USA. + * + * Please report all bugs and problems to "retroshare@lunamutt.com". + * + */ + +#include "serialiser/rsbaseserial.h" +#include "serialiser/rsbwctrlitems.h" + +/*** +#define RSSERIAL_DEBUG 1 +***/ + +#include + +/*************************************************************************/ + +RsBwCtrlAllowedItem::~RsBwCtrlAllowedItem() +{ + return; +} + +void RsBwCtrlAllowedItem::clear() +{ + allowedBw = 0; +} + +std::ostream &RsBwCtrlAllowedItem::print(std::ostream &out, uint16_t indent) +{ + printRsItemBase(out, "RsBwCtrlAllowedItem", indent); + uint16_t int_Indent = indent + 2; + printIndent(out, int_Indent); + out << "AllowedBw: " << allowedBw; + out << std::endl; + + printRsItemEnd(out, "RsBwCtrlAllowedItem", indent); + return out; +} + + +uint32_t RsBwCtrlSerialiser::sizeAllowed(RsBwCtrlAllowedItem * /*item*/) +{ + uint32_t s = 8; /* header */ + s += GetTlvUInt32Size(); + + return s; +} + +/* serialise the data to the buffer */ +bool RsBwCtrlSerialiser::serialiseAllowed(RsBwCtrlAllowedItem *item, void *data, uint32_t *pktsize) +{ + uint32_t tlvsize = sizeAllowed(item); + uint32_t offset = 0; + + if (*pktsize < tlvsize) + return false; /* not enough space */ + + *pktsize = tlvsize; + + bool ok = true; + + ok &= setRsItemHeader(data, tlvsize, item->PacketId(), tlvsize); + +#ifdef RSSERIAL_DEBUG + std::cerr << "RsBwCtrlSerialiser::serialiseRoute() Header: " << ok << std::endl; + std::cerr << "RsBwCtrlSerialiser::serialiseRoute() Size: " << tlvsize << std::endl; +#endif + + /* skip the header */ + offset += 8; + + /* add mandatory parts first */ + ok &= SetTlvUInt32(data, tlvsize, &offset, TLV_TYPE_UINT32_BW, item->allowedBw); + + if (offset != tlvsize) + { + ok = false; +#ifdef RSSERIAL_DEBUG + std::cerr << "RsBwCtrlSerialiser::serialiseRoute() Size Error! " << std::endl; +#endif + } + + return ok; +} + +RsBwCtrlAllowedItem *RsBwCtrlSerialiser::deserialiseAllowed(void *data, uint32_t *pktsize) +{ + /* get the type and size */ + uint32_t rstype = getRsItemId(data); + uint32_t tlvsize = getRsItemSize(data); + + uint32_t offset = 0; + + + if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) || + (RS_SERVICE_TYPE_BWCTRL != getRsItemService(rstype)) || + (RS_PKT_SUBTYPE_BWCTRL_ALLOWED_ITEM != getRsItemSubType(rstype))) + { + return NULL; /* wrong type */ + } + + if (*pktsize < tlvsize) /* check size */ + return NULL; /* not enough data */ + + /* set the packet length */ + *pktsize = tlvsize; + + bool ok = true; + + /* ready to load */ + RsBwCtrlAllowedItem *item = new RsBwCtrlAllowedItem(); + item->clear(); + + /* skip the header */ + offset += 8; + + /* get mandatory parts first */ + ok &= GetTlvUInt32(data, tlvsize, &offset, TLV_TYPE_UINT32_BW, &(item->allowedBw)); + + + if (offset != tlvsize) + { + /* error */ + delete item; + return NULL; + } + + if (!ok) + { + delete item; + return NULL; + } + + return item; +} + +/*************************************************************************/ + +uint32_t RsBwCtrlSerialiser::size(RsItem *i) +{ + RsBwCtrlAllowedItem *dri; + + if (NULL != (dri = dynamic_cast(i))) + { + return sizeAllowed(dri); + } + return 0; +} + +bool RsBwCtrlSerialiser::serialise(RsItem *i, void *data, uint32_t *pktsize) +{ + RsBwCtrlAllowedItem *dri; + + if (NULL != (dri = dynamic_cast(i))) + { + return serialiseAllowed(dri, data, pktsize); + } + return false; +} + +RsItem *RsBwCtrlSerialiser::deserialise(void *data, uint32_t *pktsize) +{ + /* get the type and size */ + uint32_t rstype = getRsItemId(data); + + if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) || + (RS_SERVICE_TYPE_BWCTRL != getRsItemService(rstype))) + { + return NULL; /* wrong type */ + } + + switch(getRsItemSubType(rstype)) + { + case RS_PKT_SUBTYPE_BWCTRL_ALLOWED_ITEM: + return deserialiseAllowed(data, pktsize); + break; + default: + return NULL; + break; + } +} + +/*************************************************************************/ + + + diff --git a/libretroshare/src/serialiser/rsbwctrlitems.h b/libretroshare/src/serialiser/rsbwctrlitems.h new file mode 100644 index 000000000..c1679c9c9 --- /dev/null +++ b/libretroshare/src/serialiser/rsbwctrlitems.h @@ -0,0 +1,84 @@ +#ifndef RS_BANDWIDTH_CONTROL_ITEMS_H +#define RS_BANDWIDTH_CONTROL_ITEMS_H + +/* + * libretroshare/src/serialiser: rsbwctrlitems.h + * + * RetroShare Serialiser. + * + * Copyright 2012 by Robert Fernie. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License Version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + * USA. + * + * Please report all bugs and problems to "retroshare@lunamutt.com". + * + */ + +#include + +#include "serialiser/rsserviceids.h" +#include "serialiser/rsserial.h" +#include "serialiser/rstlvbase.h" + +#define RS_PKT_SUBTYPE_BWCTRL_ALLOWED_ITEM 0x01 + +/**************************************************************************/ + +class RsBwCtrlAllowedItem: public RsItem +{ + public: + RsBwCtrlAllowedItem() + :RsItem(RS_PKT_VERSION_SERVICE, RS_SERVICE_TYPE_BWCTRL, + RS_PKT_SUBTYPE_BWCTRL_ALLOWED_ITEM) + { + setPriorityLevel(QOS_PRIORITY_RS_BWCTRL_ALLOWED_ITEM); + return; + } + +virtual ~RsBwCtrlAllowedItem(); +virtual void clear(); +std::ostream &print(std::ostream &out, uint16_t indent = 0); + + uint32_t allowedBw; // Units are bytes/sec => 4Gb/s; + +}; + + +class RsBwCtrlSerialiser: public RsSerialType +{ + public: + RsBwCtrlSerialiser() + :RsSerialType(RS_PKT_VERSION_SERVICE, RS_SERVICE_TYPE_BWCTRL) + { return; } +virtual ~RsBwCtrlSerialiser() + { return; } + +virtual uint32_t size(RsItem *); +virtual bool serialise (RsItem *item, void *data, uint32_t *size); +virtual RsItem * deserialise(void *data, uint32_t *size); + + private: + +virtual uint32_t sizeAllowed(RsBwCtrlAllowedItem *); +virtual bool serialiseAllowed (RsBwCtrlAllowedItem *item, void *data, uint32_t *size); +virtual RsBwCtrlAllowedItem *deserialiseAllowed(void *data, uint32_t *size); + + +}; + +/**************************************************************************/ + +#endif /* RS_BANDWIDTH_CONTROL_ITEMS_H */ + diff --git a/libretroshare/src/serialiser/rsserviceids.h b/libretroshare/src/serialiser/rsserviceids.h index 824096f5c..a04f3d647 100644 --- a/libretroshare/src/serialiser/rsserviceids.h +++ b/libretroshare/src/serialiser/rsserviceids.h @@ -94,6 +94,9 @@ const uint16_t RS_SERVICE_TYPE_PHOTO = 0xf040; /* DSDV Testing at the moment - Service Only */ const uint16_t RS_SERVICE_TYPE_DSDV = 0xf050; +/* Bandwidth Testing at the moment - Service Only */ +const uint16_t RS_SERVICE_TYPE_BWCTRL = 0xf060; + /* Games/External Apps - Service Only */ const uint16_t RS_SERVICE_TYPE_GAME_LAUNCHER = 0xf200; diff --git a/libretroshare/src/serialiser/rstlvbase.h b/libretroshare/src/serialiser/rstlvbase.h index 691495956..72e1b4a4b 100644 --- a/libretroshare/src/serialiser/rstlvbase.h +++ b/libretroshare/src/serialiser/rstlvbase.h @@ -115,6 +115,7 @@ const uint16_t TLV_TYPE_UINT32_POP = 0x0031; const uint16_t TLV_TYPE_UINT32_AGE = 0x0032; const uint16_t TLV_TYPE_UINT32_OFFSET = 0x0033; const uint16_t TLV_TYPE_UINT32_SERID = 0x0034; +const uint16_t TLV_TYPE_UINT32_BW = 0x0035; const uint16_t TLV_TYPE_UINT64_SIZE = 0x0040; const uint16_t TLV_TYPE_UINT64_OFFSET = 0x0041; diff --git a/libretroshare/src/services/p3bwctrl.cc b/libretroshare/src/services/p3bwctrl.cc new file mode 100644 index 000000000..30e6b373a --- /dev/null +++ b/libretroshare/src/services/p3bwctrl.cc @@ -0,0 +1,313 @@ +/* + * libretroshare/src/services p3bwctrl.cc + * + * Bandwidth Control Service for RetroShare. + * + * Copyright 2011-2011 by Robert Fernie. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License Version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + * USA. + * + * Please report all bugs and problems to "retroshare@lunamutt.com". + * + */ + +#include "pqi/p3linkmgr.h" +#include "pqi/p3netmgr.h" + +#include "util/rsnet.h" + +#include "services/p3bwctrl.h" +#include "serialiser/rsbwctrlitems.h" + +#include + +/**** + * #define DEBUG_BWCTRL 1 + ****/ + + +/************ IMPLEMENTATION NOTES ********************************* + * + */ + +p3BandwidthControl *rsBandwidthControl; + + +p3BandwidthControl::p3BandwidthControl(pqipersongrp *pg) + :p3Service(RS_SERVICE_TYPE_BWCTRL), mPg(pg), mBwMtx("p3BwCtrl") +{ + addSerialType(new RsBwCtrlSerialiser()); + + mLastCheck = 0; +} + + +int p3BandwidthControl::tick() +{ + processIncoming(); + + bool doCheck = false; + { + RsStackMutex stack(mBwMtx); /****** LOCKED MUTEX *******/ + +#define CHECK_PERIOD 5 + + time_t now = time(NULL); + if (now - mLastCheck > CHECK_PERIOD) + { + doCheck = true; + mLastCheck = now; + } + } + + if (doCheck) + { + checkAvailableBandwidth(); + } + + return 0; +} + +int p3BandwidthControl::status() +{ + return 1; +} + + +/***** Implementation ******/ + + +bool p3BandwidthControl::checkAvailableBandwidth() +{ + /* check each connection status */ + std::map rateMap; + RsBwRates total; + + mPg->ExtractRates(rateMap, total); + std::map::iterator it; + std::map::iterator bit; + + /* have to merge with existing list, + * erasing as we go ... then any left have to deal with + */ + RsStackMutex stack(mBwMtx); /****** LOCKED MUTEX *******/ + + mTotalRates = total; + + time_t now = time(NULL); + std::list oldIds; // unused for now! + + for(bit = mBwMap.begin(); bit != mBwMap.end(); bit++) + { + /* check alloc rate */ + //time_t age = now - bit->second.mLastSend; + + /* find a matching entry */ + it = rateMap.find(bit->first); + if (it == rateMap.end()) + { + oldIds.push_back(bit->first); + continue; + } + + //float delta = bit->second.mAllocated - it->second.mMaxRateIn; + /* if delta < 0 ... then need update (or else we get a queue) */ + /* if delta > 0 ... then need update (to allow more data) */ + + /* for the moment - always send an update */ + bool updatePeer = true; + +#if 0 + /* if changed significantly */ + if (sig) + { + updatePeer = true; + } + + /* if changed small but old */ + if ((any change) && (timeperiod)) + { + updatePeer = true; + } +#endif + + /* update rates info */ + bit->second.mRates = it->second; + bit->second.mRateUpdateTs = now; + + if (updatePeer) + { +#define ALLOC_FACTOR (0.9) + // save value sent, + bit->second.mAllocated = ALLOC_FACTOR * 1000.0 * it->second.mMaxRateIn; + bit->second.mLastSend = now; + + RsBwCtrlAllowedItem *item = new RsBwCtrlAllowedItem(); + item->PeerId(bit->first); + item->allowedBw = bit->second.mAllocated; + + sendItem(item); + } + + /* now cleanup */ + rateMap.erase(it); + } + + /* any left over rateMaps ... are bad! (or not active - more likely) */ + return true; +} + + + +bool p3BandwidthControl::processIncoming() +{ + RsItem *item = NULL; + time_t now = time(NULL); + + while(NULL != (item = recvItem())) + { + RsBwCtrlAllowedItem *bci = dynamic_cast(item); + if (!bci) + { + delete item; + continue; + } + + /* For each packet */ + RsStackMutex stack(mBwMtx); /****** LOCKED MUTEX *******/ + std::map::iterator bit; + + bit = mBwMap.find(bci->PeerId()); + if (bit == mBwMap.end()) + { + // ERROR. + delete item; + continue; + } + + /* update allowed bandwidth */ + bit->second.mAllowedOut = bci->allowedBw; + bit->second.mLastRecvd = now; + delete item; + + /* store info in data */ + //mPg->setAllowedRate(bit->first, bit->second.mAllowedOut / 1000.0); + } + return true; +} + +int p3BandwidthControl::getTotalBandwidthRates(RsConfigDataRates &rates) +{ + RsStackMutex stack(mBwMtx); /****** LOCKED MUTEX *******/ + + rates.mRateIn = mTotalRates.mRateIn; + rates.mRateMaxIn = mTotalRates.mMaxRateIn; + rates.mRateOut = mTotalRates.mRateOut; + rates.mRateMaxOut = mTotalRates.mMaxRateOut; + + rates.mAllocIn = 0; + rates.mAllocTs = 0; + + rates.mAllowedOut = 0; + rates.mAllowedTs = 0; + + return 1; +} + +int p3BandwidthControl::getAllBandwidthRates(std::map &ratemap) +{ + RsStackMutex stack(mBwMtx); /****** LOCKED MUTEX *******/ + + std::map::iterator bit; + for(bit = mBwMap.begin(); bit != mBwMap.end(); bit++) + { + RsConfigDataRates rates; + + rates.mRateIn = bit->second.mRates.mRateIn; + rates.mRateMaxIn = bit->second.mRates.mMaxRateIn; + rates.mRateOut = bit->second.mRates.mRateOut; + rates.mRateMaxOut = bit->second.mRates.mMaxRateOut; + + rates.mAllocIn = bit->second.mAllocated / 1000.0; + rates.mAllocTs = bit->second.mLastSend; + + rates.mAllowedOut = bit->second.mAllowedOut / 1000.0; + rates.mAllowedTs = bit->second.mLastRecvd; + + ratemap[bit->first] = rates; + } + return true ; + + +} + + + + + + +int p3BandwidthControl::printRateInfo_locked(std::ostream &out) +{ + out << "p3BandwidthControl::printRateInfo_locked()"; + out << std::endl; + + //time_t now = time(NULL); + + std::map::iterator bit; + for(bit = mBwMap.begin(); bit != mBwMap.end(); bit++) + { + //out << " Age: " << now - it->second.mTs; + //out << std::endl; + } + return true ; +} + + /*************** pqiMonitor callback ***********************/ +void p3BandwidthControl::statusChange(const std::list &plist) +{ + std::list::const_iterator it; + for (it = plist.begin(); it != plist.end(); it++) + { + if (it->state & RS_PEER_S_FRIEND) + { + if (it->actions & RS_PEER_DISCONNECTED) + { + /* remove from map */ + RsStackMutex stack(mBwMtx); /****** LOCKED MUTEX *******/ + std::map::iterator bit; + bit = mBwMap.find(it->id); + if (bit == mBwMap.end()) + { + std::cerr << "p3BandwidthControl::statusChange() ERROR"; + std::cerr << " Entry not in map"; + std::cerr << std::endl; + } + else + { + mBwMap.erase(bit); + } + } + else if (it->actions & RS_PEER_CONNECTED) + { + /* stuff */ + BwCtrlData data; + mBwMap[it->id] = data; + } + } + } + return; +} + + diff --git a/libretroshare/src/services/p3bwctrl.h b/libretroshare/src/services/p3bwctrl.h new file mode 100644 index 000000000..8f8812bfa --- /dev/null +++ b/libretroshare/src/services/p3bwctrl.h @@ -0,0 +1,128 @@ +/* + * libretroshare/src/services/p3bwctrl.h + * + * Bandwidth Control. + * + * Copyright 2012 by Robert Fernie. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License Version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + * USA. + * + * Please report all bugs and problems to "retroshare@lunamutt.com". + * + */ + + +#ifndef SERVICE_RSBANDWIDTH_CONTROL_HEADER +#define SERVICE_RSBANDWIDTH_CONTROL_HEADER + +#include +#include +#include + +#include "serialiser/rsbwctrlitems.h" +#include "services/p3service.h" +#include "pqi/pqipersongrp.h" +#include "retroshare/rsconfig.h" // for datatypes. + +// Extern is defined here - as this is bundled with rsconfig.h +class p3BandwidthControl; +extern p3BandwidthControl *rsBandwidthControl; + + +class BwCtrlData +{ + public: + BwCtrlData() + :mRateUpdateTs(0), mAllocated(0), mLastSend(0), mAllowedOut(0), mLastRecvd(0) + { return; } + + /* Rates are floats in KB/s */ + RsBwRates mRates; + time_t mRateUpdateTs; + + /* these are integers (B/s) */ + uint32_t mAllocated; + time_t mLastSend; + + uint32_t mAllowedOut; + time_t mLastRecvd; +}; + + +//!The RS bandwidth Control Service. + /** + * + * Exchange packets to regulate p2p bandwidth. + * + * Sadly this has to be strongly integrated into pqi, with ref to pqipersongrp. + */ + +class p3BandwidthControl: public p3Service, public pqiMonitor +{ + public: + p3BandwidthControl(pqipersongrp *pg); + + /***** overloaded from RsBanList *****/ + + + /***** overloaded from p3Service *****/ + /*! + * This retrieves all BwCtrl items + */ + virtual int tick(); + virtual int status(); + + + /***** for RsConfig (not directly overloaded) ****/ + + virtual int getTotalBandwidthRates(RsConfigDataRates &rates); + virtual int getAllBandwidthRates(std::map &ratemap); + + + + /*! + * Interface stuff. + */ + + /*************** pqiMonitor callback ***********************/ + virtual void statusChange(const std::list &plist); + + + /************* from p3Config *******************/ + //virtual RsSerialiser *setupSerialiser() ; + //virtual bool saveList(bool& cleanup, std::list&) ; + //virtual void saveDone(); + //virtual bool loadList(std::list& load) ; + + + private: + + bool checkAvailableBandwidth(); + bool processIncoming(); + + pqipersongrp *mPg; + + RsMutex mBwMtx; + + int printRateInfo_locked(std::ostream &out); + + time_t mLastCheck; + + RsBwRates mTotalRates; + std::map mBwMap; + +}; + +#endif // SERVICE_RSBANDWIDTH_CONTROL_HEADER