moved QoS down to pqistreamer pipes. This removes one out queue, and removes lags due to many packets being sent at ones.

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@5249 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2012-06-23 12:10:41 +00:00
parent fd34bbfdaf
commit 8d070bb030
10 changed files with 242 additions and 127 deletions

View File

@ -400,6 +400,7 @@ HEADERS += pqi/authssl.h \
pqi/pqissludp.h \
pqi/pqistore.h \
pqi/pqistreamer.h \
pqi/pqiqosstreamer.h \
pqi/sslfns.h \
pqi/pqinetstatebox.h
@ -533,6 +534,7 @@ SOURCES += pqi/authgpg.cc \
pqi/pqissludp.cc \
pqi/pqistore.cc \
pqi/pqistreamer.cc \
pqi/pqiqosstreamer.cc \
pqi/sslfns.cc \
pqi/pqinetstatebox.cc

View File

@ -37,9 +37,8 @@ static const float PQI_HANDLER_NB_PRIORITY_RATIO = 2 ;
#define DEBUG_TICK 1
#define RSITEM_DEBUG 1
****/
//#define DEBUG_QOS 1
pqihandler::pqihandler(SecurityPolicy *Global) : pqiQoS(PQI_HANDLER_NB_PRIORITY_LEVELS,PQI_HANDLER_NB_PRIORITY_RATIO),coreMtx("pqihandler")
pqihandler::pqihandler(SecurityPolicy *Global) : coreMtx("pqihandler")
{
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
@ -90,62 +89,17 @@ int pqihandler::tick()
}
}
// send items from QoS queue
moreToTick |= drawFromQoS_queue() ;
UpdateRates();
return moreToTick;
}
bool pqihandler::drawFromQoS_queue()
{
float avail_out = getMaxRate(false) * 1024 / ticks_per_sec ;
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
++nb_ticks ;
time_t now = time(NULL) ;
if(last_m + 3 < now)
{
ticks_per_sec = nb_ticks / (float)(now - last_m) ;
nb_ticks = 0 ;
last_m = now ;
}
#ifdef DEBUG_QOS
std::cerr << "ticks per sec: " << ticks_per_sec << ", max rate in bytes/s = " << avail_out*ticks_per_sec << ", avail out per tick= " << avail_out << std::endl;
#endif
uint64_t total_bytes_sent = 0 ;
RsItem *item ;
while( total_bytes_sent < avail_out && (item = out_rsItem()) != NULL)
{
//
uint32_t size ;
locked_HandleRsItem(item, 0, size);
total_bytes_sent += size ;
#ifdef DEBUG_QOS
std::cerr << "treating item " << (void*)item << ", priority " << (int)item->priority_level() << ", size=" << size << ", total = " << total_bytes_sent << ", queue size = " << qos_queue_size() << std::endl;
#endif
}
#ifdef DEBUG_QOS
assert(total_bytes_sent >= avail_out || qos_queue_size() == 0) ;
std::cerr << "total bytes sent = " << total_bytes_sent << ", " ;
if(qos_queue_size() > 0)
std::cerr << "Queue still has " << qos_queue_size() << " elements." << std::endl;
else
std::cerr << "Queue is empty." << std::endl;
#endif
return (qos_queue_size() > 0) ;
}
bool pqihandler::queueOutRsItem(RsItem *item)
{
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
in_rsItem(item) ;
uint32_t size ;
locked_HandleRsItem(item, 0, size);
#ifdef DEBUG_QOS
if(item->priority_level() == QOS_PRIORITY_UNKNOWN)
@ -380,7 +334,7 @@ int pqihandler::SendRsRawItem(RsRawItem *ns)
{
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "pqihandler::SendRsRawItem()");
// queue the item into the QoS
// directly send item to streamers
return queueOutRsItem(ns) ;
}

View File

@ -46,7 +46,7 @@ class SearchModule
// Presents a P3 Face to the world!
// and funnels data through to a PQInterface.
//
class pqihandler: public P3Interface, public pqiQoS
class pqihandler: public P3Interface
{
public:
pqihandler(SecurityPolicy *Global);
@ -96,8 +96,6 @@ class pqihandler: public P3Interface, public pqiQoS
// TESTING INTERFACE.
int ExtractRates(std::map<std::string, RsBwRates> &ratemap, RsBwRates &totals);
bool drawFromQoS_queue() ;
protected:
/* check to be overloaded by those that can
* generates warnings otherwise

View File

@ -43,15 +43,15 @@ static const int CONNECT_FAILED = 5;
static const int HEARTBEAT_REPEAT_TIME = 5;
#include "pqi/pqistreamer.h"
#include "pqi/pqiqosstreamer.h"
class pqiconnect: public pqistreamer, public NetInterface
class pqiconnect: public pqiQoSstreamer, public NetInterface
{
public:
pqiconnect(RsSerialiser *rss, NetBinInterface *ni_in)
:pqistreamer(rss, ni_in->PeerId(), ni_in, 0), // pqistreamer will cleanup NetInterface.
NetInterface(NULL, ni_in->PeerId()), // No need for callback
ni(ni_in)
:pqiQoSstreamer(rss, ni_in->PeerId(), ni_in, 0), // pqistreamer will cleanup NetInterface.
NetInterface(NULL, ni_in->PeerId()), // No need for callback
ni(ni_in)
{
if (!ni_in)
{

View File

@ -22,6 +22,17 @@ for(int i=((int)nb_levels)-1;i>=0;--i,c *= alpha)
}
}
void pqiQoS::clear()
{
void *item ;
for(int i=0;i<_item_queues.size();++i)
while( (item = _item_queues[i].pop()) != NULL)
free(item) ;
_nb_items = 0 ;
}
void pqiQoS::print() const
{
std::cerr << "pqiQoS: " << _item_queues.size() << " levels, alpha=" << _alpha ;
@ -32,19 +43,19 @@ void pqiQoS::print() const
std::cerr << std::endl;
}
void pqiQoS::in_rsItem(RsItem *item)
void pqiQoS::in_rsItem(void *ptr,int priority)
{
if(item->priority_level() >= _item_queues.size())
if(priority >= _item_queues.size())
{
std::cerr << "pqiQoS::in_rsRawItem() ****Warning****: priority " << item->priority_level() << " out of scope [0," << _item_queues.size()-1 << "]. Priority will be clamped to maximum value." << std::endl;
item->setPriorityLevel(_item_queues.size()-1) ;
std::cerr << "pqiQoS::in_rsRawItem() ****Warning****: priority " << priority << " out of scope [0," << _item_queues.size()-1 << "]. Priority will be clamped to maximum value." << std::endl;
priority = _item_queues.size()-1 ;
}
_item_queues[item->priority_level()].push(item) ;
_item_queues[priority].push(ptr) ;
++_nb_items ;
}
RsItem *pqiQoS::out_rsItem()
void *pqiQoS::out_rsItem()
{
// Go through the queues. Increment counters.

View File

@ -32,11 +32,12 @@
// \alpha is a constant that is not necessarily an integer, but strictly > 1.
// - the set of possible priority levels is finite, and pre-determined.
//
#pragma once
#include <stdint.h>
#include <vector>
#include <list>
class RsItem ;
class pqiQoS
{
public:
@ -45,18 +46,18 @@ class pqiQoS
class ItemQueue
{
public:
RsItem *pop()
void *pop()
{
if(_items.empty())
return NULL ;
RsItem *item = _items.front() ;
void *item = _items.front() ;
_items.pop_front() ;
return item ;
}
void push(RsItem *item)
void push(void *item)
{
_items.push_back(item) ;
}
@ -64,20 +65,25 @@ class pqiQoS
float _threshold ;
float _counter ;
float _inc ;
std::list<RsItem*> _items ;
std::list<void*> _items ;
};
// This function pops items from the queue, y order of priority
//
RsItem *out_rsItem() ;
void *out_rsItem() ;
// This function is used to queue items.
//
void in_rsItem(RsItem *item) ;
void in_rsItem(void *item,int priority) ;
void print() const ;
uint64_t qos_queue_size() const { return _nb_items ; }
// kills all waiting items.
void clear() ;
void computeTotalItemSize() const ;
int debug_computeTotalItemSize() const ;
private:
// This vector stores the lists of items with equal priorities.
//

View File

@ -0,0 +1,70 @@
/*
* libretroshare/src/pqi pqistreamer.h
*
* 3P/PQI network interface for RetroShare.
*
* Copyright 2012-2012 by Cyril Soler
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#include "pqiqosstreamer.h"
pqiQoSstreamer::pqiQoSstreamer(RsSerialiser *rss, std::string peerid, BinInterface *bio_in, int bio_flagsin)
: pqistreamer(rss,peerid,bio_in,bio_flagsin), pqiQoS(PQI_QOS_STREAMER_MAX_LEVELS, PQI_QOS_STREAMER_ALPHA)
{
_total_item_size = 0 ;
_total_item_count = 0 ;
}
int pqiQoSstreamer::getQueueSize(bool in)
{
if(in)
return pqistreamer::getQueueSize(in) ;
else
return qos_queue_size() ;
}
void pqiQoSstreamer::locked_storeInOutputQueue(void *ptr,int priority)
{
_total_item_size += getRsItemSize(ptr) ;
++_total_item_count ;
pqiQoS::in_rsItem(ptr,priority) ;
}
void pqiQoSstreamer::locked_clear_out_queue()
{
pqiQoS::clear() ;
_total_item_size = 0 ;
_total_item_count = 0 ;
}
void *pqiQoSstreamer::locked_pop_out_data()
{
void *out = pqiQoS::out_rsItem() ;
if(out != NULL)
{
_total_item_size -= getRsItemSize(out) ;
--_total_item_count ;
}
return out ;
}

View File

@ -0,0 +1,51 @@
/*
* libretroshare/src/pqi pqistreamer.h
*
* 3P/PQI network interface for RetroShare.
*
* Copyright 2012-2012 by Cyril Soler
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#pragma once
#include "pqiqos.h"
#include "pqistreamer.h"
class pqiQoSstreamer: public pqistreamer, public pqiQoS
{
public:
pqiQoSstreamer(RsSerialiser *rss, std::string peerid, BinInterface *bio_in, int bio_flagsin);
static const uint32_t PQI_QOS_STREAMER_MAX_LEVELS = 10 ;
static const float PQI_QOS_STREAMER_ALPHA = 2.0 ;
virtual void locked_storeInOutputQueue(void *ptr,int priority) ;
virtual int out_queue_size() const { return _total_item_count ; }
virtual void locked_clear_out_queue() ;
virtual int locked_compute_out_pkt_size() const { return _total_item_size ; }
virtual void *locked_pop_out_data() ;
virtual int getQueueSize(bool in) ;
private:
uint32_t _total_item_size ;
uint32_t _total_item_count ;
};

View File

@ -111,12 +111,7 @@ pqistreamer::~pqistreamer()
delete rsSerialiser;
// clean up outgoing. (cntrl packets)
while(out_pkt.size() > 0)
{
void *pkt = out_pkt.front();
out_pkt.pop_front();
free(pkt);
}
locked_clear_out_queue() ;
if (pkt_wpending)
{
@ -126,7 +121,7 @@ pqistreamer::~pqistreamer()
free(pkt_rpending);
// clean up outgoing.
// clean up incoming.
while(incoming.size() > 0)
{
RsItem *i = incoming.front();
@ -212,23 +207,9 @@ int pqistreamer::tick()
{
RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data
int total = 0;
for(it = out_pkt.begin(); it != out_pkt.end(); it++)
{
total += getRsItemSize(*it);
}
rs_sprintf_append(out, "\t Out Packets [%d] => %d bytes\n", out_pkt.size(), total);
total = 0;
for(it = out_pkt.begin(); it != out_pkt.end(); it++)
{
total += getRsItemSize(*it);
}
rs_sprintf_append(out, "\t Out Data [%d] => %d bytes\n", out_pkt.size(), total);
int total = compute_out_pkt_size() ;
rs_sprintf_append(out, "\t Out Packets [%d] => %d bytes\n", out_queue_size(), total);
rs_sprintf_append(out, "\t Incoming [%d]\n", incoming.size());
}
@ -237,7 +218,7 @@ int pqistreamer::tick()
#endif
/* if there is more stuff in the queues */
if ((incoming.size() > 0) || (out_pkt.size() > 0))
if ((incoming.size() > 0) || (out_queue_size() > 0))
{
return 1;
}
@ -258,6 +239,10 @@ int pqistreamer::status()
return 0;
}
void pqistreamer::locked_storeInOutputQueue(void *ptr,int)
{
out_pkt.push_back(ptr);
}
//
/**************** HANDLE OUTGOING TRANSLATION + TRANSMISSION ******/
@ -291,7 +276,7 @@ int pqistreamer::queue_outpqi(RsItem *pqi,uint32_t& pktsize)
#endif
if (rsSerialiser->serialise(pqi, ptr, &pktsize))
{
out_pkt.push_back(ptr);
locked_storeInOutputQueue(ptr,pqi->priority_level()) ;
if (!(bio_flags & BIN_FLAGS_NO_DELETE))
{
@ -355,16 +340,7 @@ int pqistreamer::handleoutgoing()
if (!(bio->isactive()))
{
/* if we are not active - clear anything in the queues. */
for(it = out_pkt.begin(); it != out_pkt.end(); )
{
free(*it);
it = out_pkt.erase(it);
#ifdef DEBUG_PQISTREAMER
std::string out = "pqistreamer::handleoutgoing() Not active -> Clearing Pkt!";
// std::cerr << out ;
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out);
#endif
}
locked_clear_out_queue() ;
/* also remove the pending packets */
if (pkt_wpending)
@ -407,15 +383,7 @@ int pqistreamer::handleoutgoing()
// send a out_pkt., else send out_data. unless
// there is a pending packet.
if (!pkt_wpending)
if (out_pkt.size() > 0)
{
pkt_wpending = *(out_pkt.begin());
out_pkt.pop_front();
#ifdef DEBUG_TRANSFERS
std::cerr << "pqistreamer::handleoutgoing() getting next pkt from out_pkt queue";
std::cerr << std::endl;
#endif
}
pkt_wpending = locked_pop_out_data() ;
if (pkt_wpending)
{
@ -854,10 +822,10 @@ void pqistreamer::outSentBytes(int outb)
#ifdef DEBUG_LAG
#define MIN_PKTS_FOR_MSG 100
if (out_pkt.size() > MIN_PKTS_FOR_MSG)
if (out_queue_size() > MIN_PKTS_FOR_MSG)
{
std::cerr << "pqistreamer::outSentBytes() for: " << PeerId();
std::cerr << " End of Write and still " << out_pkt.size() << " pkts left";
std::cerr << " End of Write and still " << out_queue_size() << " pkts left";
std::cerr << std::endl;
}
@ -928,14 +896,61 @@ int pqistreamer::getQueueSize(bool in)
{
if (in)
return incoming.size();
return out_pkt.size();
return out_queue_size();
}
void pqistreamer::getRates(RsBwRates &rates)
{
RateInterface::getRates(rates);
rates.mQueueIn = incoming.size();
rates.mQueueOut = out_pkt.size();
rates.mQueueOut = out_queue_size();
}
int pqistreamer::out_queue_size() const
{
// Warning: because out_pkt is a list, calling size
// is O(n) ! Makign algorithms pretty inefficient. We should record how many
// items get stored and discarded to have a proper size value at any time
//
return out_pkt.size() ;
}
void pqistreamer::locked_clear_out_queue()
{
for(std::list<void*>::iterator it = out_pkt.begin(); it != out_pkt.end(); )
{
free(*it);
it = out_pkt.erase(it);
#ifdef DEBUG_PQISTREAMER
std::string out = "pqistreamer::handleoutgoing() Not active -> Clearing Pkt!";
// std::cerr << out ;
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out);
#endif
}
}
int pqistreamer::locked_compute_out_pkt_size() const
{
int total = 0 ;
for(std::list<void*>::const_iterator it = out_pkt.begin(); it != out_pkt.end(); ++it)
total += getRsItemSize(*it);
return total ;
}
void *pqistreamer::locked_pop_out_data()
{
void *res = NULL ;
if (!out_pkt.empty())
{
res = *(out_pkt.begin());
out_pkt.pop_front();
#ifdef DEBUG_TRANSFERS
std::cerr << "pqistreamer::handleoutgoing() getting next pkt from out_pkt queue";
std::cerr << std::endl;
#endif
}
return res ;
}

View File

@ -61,17 +61,26 @@ class pqistreamer: public PQInterface
time_t getLastIncomingTS(); // Time of last data packet, for checking a connection is alive.
virtual void getRates(RsBwRates &rates);
virtual int getQueueSize(bool in); // extracting data.
private:
protected:
/* Implementation */
// These methods are redefined in pqiQoSstreamer
//
virtual void locked_storeInOutputQueue(void *ptr,int priority) ;
virtual int out_queue_size() const ;
virtual void locked_clear_out_queue() ;
virtual int locked_compute_out_pkt_size() const ;
virtual void *locked_pop_out_data() ;
private:
// to filter functions - detect filecancel/data and act!
int queue_outpqi( RsItem *i,uint32_t& serialized_size);
int handleincomingitem(RsItem *i);
int queue_outpqi(RsItem *i,uint32_t& serialized_size);
int handleincomingitem(RsItem *i);
// ticked regularly (manages out queues and sending
// via above interfaces.
int handleoutgoing();
int handleincoming();
virtual int handleoutgoing();
virtual int handleincoming();
// Bandwidth/Streaming Management.
float outTimeSlice();
@ -124,5 +133,4 @@ class pqistreamer: public PQInterface
};
#endif //MRK_PQI_STREAMER_HEADER