mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-06-15 10:00:51 -04:00
merge of QoS branch into trunk
git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@4588 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
parent
663ce50630
commit
0ed60eaf86
23 changed files with 734 additions and 337 deletions
|
@ -110,38 +110,46 @@ class NetInterface;
|
|||
**/
|
||||
class PQInterface: public RateInterface
|
||||
{
|
||||
public:
|
||||
PQInterface(const std::string &id) :peerId(id) { return; }
|
||||
virtual ~PQInterface() { return; }
|
||||
public:
|
||||
PQInterface(const std::string &id) :peerId(id) { return; }
|
||||
virtual ~PQInterface() { return; }
|
||||
|
||||
/*!
|
||||
* allows user to send RsItems to a particular facility (file, network)
|
||||
*/
|
||||
virtual int SendItem(RsItem *) = 0;
|
||||
/*!
|
||||
* allows user to send RsItems to a particular facility (file, network)
|
||||
*/
|
||||
virtual int SendItem(RsItem *) = 0;
|
||||
|
||||
/*!
|
||||
* Retrieve RsItem from a facility
|
||||
*/
|
||||
virtual RsItem *GetItem() = 0;
|
||||
// this function is overloaded in classes that need the serilized size to be returned.
|
||||
virtual int SendItem(RsItem *item,uint32_t& size)
|
||||
{
|
||||
size = 0 ;
|
||||
std::cerr << "Warning: PQInterface::SendItem(RsItem*,uint32_t&) calledbut not overloaded! Serialized size will not be returned." << std::endl;
|
||||
return SendItem(item) ;
|
||||
}
|
||||
|
||||
/**
|
||||
* also there are tick + person id functions.
|
||||
*/
|
||||
virtual int tick() { return 0; }
|
||||
virtual int status() { return 0; }
|
||||
virtual std::string PeerId() { return peerId; }
|
||||
/*!
|
||||
* Retrieve RsItem from a facility
|
||||
*/
|
||||
virtual RsItem *GetItem() = 0;
|
||||
|
||||
// the callback from NetInterface Connection Events.
|
||||
virtual int notifyEvent(NetInterface *ni, int event)
|
||||
{
|
||||
(void) ni; /* remove unused parameter warnings */
|
||||
(void) event; /* remove unused parameter warnings */
|
||||
return 0;
|
||||
}
|
||||
/**
|
||||
* also there are tick + person id functions.
|
||||
*/
|
||||
virtual int tick() { return 0; }
|
||||
virtual int status() { return 0; }
|
||||
virtual std::string PeerId() { return peerId; }
|
||||
|
||||
// the callback from NetInterface Connection Events.
|
||||
virtual int notifyEvent(NetInterface *ni, int event)
|
||||
{
|
||||
(void) ni; /* remove unused parameter warnings */
|
||||
(void) event; /* remove unused parameter warnings */
|
||||
return 0;
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
std::string peerId;
|
||||
std::string peerId;
|
||||
};
|
||||
|
||||
|
||||
|
|
|
@ -33,12 +33,16 @@
|
|||
#include <stdlib.h>
|
||||
const int pqihandlerzone = 34283;
|
||||
|
||||
static const int PQI_HANDLER_NB_PRIORITY_LEVELS = 10 ;
|
||||
static const float PQI_HANDLER_NB_PRIORITY_RATIO = 2 ;
|
||||
|
||||
/****
|
||||
#define DEBUG_TICK 1
|
||||
#define RSITEM_DEBUG 1
|
||||
****/
|
||||
//#define DEBUG_QOS 1
|
||||
|
||||
pqihandler::pqihandler(SecurityPolicy *Global) : coreMtx("pqihandler")
|
||||
pqihandler::pqihandler(SecurityPolicy *Global) : pqiQoS(PQI_HANDLER_NB_PRIORITY_LEVELS,PQI_HANDLER_NB_PRIORITY_RATIO),coreMtx("pqihandler")
|
||||
{
|
||||
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
|
||||
|
||||
|
@ -60,6 +64,9 @@ pqihandler::pqihandler(SecurityPolicy *Global) : coreMtx("pqihandler")
|
|||
rateIndiv_in = 0.01;
|
||||
rateMax_out = 0.01;
|
||||
rateMax_in = 0.01;
|
||||
last_m = time(NULL) ;
|
||||
nb_ticks = 0 ;
|
||||
ticks_per_sec = 5 ; // initial guess
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -67,34 +74,96 @@ int pqihandler::tick()
|
|||
{
|
||||
int moreToTick = 0;
|
||||
|
||||
{ RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
|
||||
{
|
||||
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
|
||||
|
||||
// tick all interfaces...
|
||||
std::map<std::string, SearchModule *>::iterator it;
|
||||
for(it = mods.begin(); it != mods.end(); it++)
|
||||
{
|
||||
if (0 < ((it -> second) -> pqi) -> tick())
|
||||
// tick all interfaces...
|
||||
std::map<std::string, SearchModule *>::iterator it;
|
||||
for(it = mods.begin(); it != mods.end(); it++)
|
||||
{
|
||||
if (0 < ((it -> second) -> pqi) -> tick())
|
||||
{
|
||||
#ifdef DEBUG_TICK
|
||||
std::cerr << "pqihandler::tick() moreToTick from mod()" << std::endl;
|
||||
#endif
|
||||
moreToTick = 1;
|
||||
}
|
||||
}
|
||||
// get the items, and queue them correctly
|
||||
if (0 < locked_GetItems())
|
||||
{
|
||||
#ifdef DEBUG_TICK
|
||||
std::cerr << "pqihandler::tick() moreToTick from mod()" << std::endl;
|
||||
std::cerr << "pqihandler::tick() moreToTick from GetItems()" << std::endl;
|
||||
#endif
|
||||
moreToTick = 1;
|
||||
}
|
||||
}
|
||||
// get the items, and queue them correctly
|
||||
if (0 < locked_GetItems())
|
||||
{
|
||||
#ifdef DEBUG_TICK
|
||||
std::cerr << "pqihandler::tick() moreToTick from GetItems()" << std::endl;
|
||||
#endif
|
||||
moreToTick = 1;
|
||||
}
|
||||
} /****** UNLOCK ******/
|
||||
|
||||
// send items from QoS queue
|
||||
|
||||
moreToTick |= drawFromQoS_queue() ;
|
||||
|
||||
UpdateRates();
|
||||
return moreToTick;
|
||||
}
|
||||
|
||||
bool pqihandler::drawFromQoS_queue()
|
||||
{
|
||||
float avail_out = getMaxRate(false) * 1024 / ticks_per_sec ;
|
||||
|
||||
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
|
||||
|
||||
++nb_ticks ;
|
||||
time_t now = time(NULL) ;
|
||||
if(last_m + 3 < now)
|
||||
{
|
||||
ticks_per_sec = nb_ticks / (float)(now - last_m) ;
|
||||
nb_ticks = 0 ;
|
||||
last_m = now ;
|
||||
}
|
||||
#ifdef DEBUG_QOS
|
||||
std::cerr << "ticks per sec: " << ticks_per_sec << ", max rate in bytes/s = " << avail_out*ticks_per_sec << ", avail out per tick= " << avail_out << std::endl;
|
||||
#endif
|
||||
|
||||
uint64_t total_bytes_sent = 0 ;
|
||||
RsItem *item ;
|
||||
|
||||
while( total_bytes_sent < avail_out && (item = out_rsItem()) != NULL)
|
||||
{
|
||||
//
|
||||
uint32_t size ;
|
||||
locked_HandleRsItem(item, 0, size);
|
||||
total_bytes_sent += size ;
|
||||
#ifdef DEBUG_QOS
|
||||
std::cerr << "treating item " << (void*)item << ", priority " << (int)item->priority_level() << ", size=" << size << ", total = " << total_bytes_sent << ", queue size = " << qos_queue_size() << std::endl;
|
||||
#endif
|
||||
}
|
||||
#ifdef DEBUG_QOS
|
||||
assert(total_bytes_sent >= avail_out || qos_queue_size() == 0) ;
|
||||
std::cerr << "total bytes sent = " << total_bytes_sent << ", " ;
|
||||
if(qos_queue_size() > 0)
|
||||
std::cerr << "Queue still has " << qos_queue_size() << " elements." << std::endl;
|
||||
else
|
||||
std::cerr << "Queue is empty." << std::endl;
|
||||
#endif
|
||||
|
||||
return (qos_queue_size() > 0) ;
|
||||
}
|
||||
|
||||
|
||||
bool pqihandler::queueOutRsItem(RsItem *item)
|
||||
{
|
||||
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
|
||||
in_rsItem(item) ;
|
||||
|
||||
#ifdef DEBUG_QOS
|
||||
if(item->priority_level() == QOS_PRIORITY_UNKNOWN)
|
||||
std::cerr << "Caught an unprioritized item !" << std::endl;
|
||||
|
||||
print() ;
|
||||
#endif
|
||||
return true ;
|
||||
}
|
||||
|
||||
int pqihandler::status()
|
||||
{
|
||||
|
@ -200,10 +269,9 @@ int pqihandler::locked_checkOutgoingRsItem(RsItem */*item*/, int /*global*/)
|
|||
|
||||
|
||||
// generalised output
|
||||
int pqihandler::HandleRsItem(RsItem *item, int allowglobal)
|
||||
int pqihandler::locked_HandleRsItem(RsItem *item, int allowglobal,uint32_t& computed_size)
|
||||
{
|
||||
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
|
||||
|
||||
computed_size = 0 ;
|
||||
std::map<std::string, SearchModule *>::iterator it;
|
||||
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
|
||||
"pqihandler::HandleRsItem()");
|
||||
|
@ -217,7 +285,7 @@ int pqihandler::HandleRsItem(RsItem *item, int allowglobal)
|
|||
out << " Cannot send out Global RsItem";
|
||||
pqioutput(PQL_ALERT, pqihandlerzone, out.str());
|
||||
#ifdef DEBUG_TICK
|
||||
std::cerr << out.str();
|
||||
std::cerr << out.str();
|
||||
#endif
|
||||
delete item;
|
||||
return -1;
|
||||
|
@ -271,7 +339,7 @@ int pqihandler::HandleRsItem(RsItem *item, int allowglobal)
|
|||
#endif
|
||||
|
||||
// if yes send on item.
|
||||
((it -> second) -> pqi) -> SendItem(item);
|
||||
((it -> second) -> pqi) -> SendItem(item,computed_size);
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
|
@ -294,50 +362,51 @@ int pqihandler::HandleRsItem(RsItem *item, int allowglobal)
|
|||
|
||||
int pqihandler::SearchSpecific(RsCacheRequest *ns)
|
||||
{
|
||||
return HandleRsItem(ns, 0);
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
|
||||
int pqihandler::SendSearchResult(RsCacheItem *ns)
|
||||
{
|
||||
return HandleRsItem(ns, 0);
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
|
||||
int pqihandler::SendFileRequest(RsFileRequest *ns)
|
||||
{
|
||||
return HandleRsItem(ns, 0);
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
|
||||
int pqihandler::SendFileData(RsFileData *ns)
|
||||
{
|
||||
return HandleRsItem(ns, 0);
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
int pqihandler::SendFileChunkMapRequest(RsFileChunkMapRequest *ns)
|
||||
{
|
||||
return HandleRsItem(ns, 0);
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
int pqihandler::SendFileChunkMap(RsFileChunkMap *ns)
|
||||
{
|
||||
return HandleRsItem(ns, 0);
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
int pqihandler::SendFileCRC32MapRequest(RsFileCRC32MapRequest *ns)
|
||||
{
|
||||
return HandleRsItem(ns, 0);
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
int pqihandler::SendFileCRC32Map(RsFileCRC32Map *ns)
|
||||
{
|
||||
return HandleRsItem(ns, 0);
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
|
||||
int pqihandler::SendRsRawItem(RsRawItem *ns)
|
||||
{
|
||||
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
|
||||
"pqihandler::SendRsRawItem()");
|
||||
#ifdef DEBUG_TICK
|
||||
std::cerr << "pqihandler::SendRsRawItem()" << std ::endl;
|
||||
#endif
|
||||
return HandleRsItem(ns, 0);
|
||||
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "pqihandler::SendRsRawItem()");
|
||||
|
||||
// queue the item into the QoS
|
||||
|
||||
return queueOutRsItem(ns) ;
|
||||
}
|
||||
|
||||
|
||||
|
||||
// inputs. This is a very basic
|
||||
// system that is completely biased and slow...
|
||||
// someone please fix.
|
||||
|
@ -695,12 +764,14 @@ int pqihandler::UpdateRates()
|
|||
used_bw_in += crate_in;
|
||||
used_bw_out += crate_out;
|
||||
}
|
||||
#ifdef DEBUG_QOS
|
||||
// std::cerr << "Totals (In) Used B/W " << used_bw_in;
|
||||
// std::cerr << " Available B/W " << avail_in;
|
||||
// std::cerr << " Effective transfers " << effectiveDownloadsSm << std::endl;
|
||||
// std::cerr << "Totals (Out) Used B/W " << used_bw_out;
|
||||
// std::cerr << " Available B/W " << avail_out;
|
||||
// std::cerr << " Effective transfers " << effectiveUploadsSm << std::endl;
|
||||
#endif
|
||||
|
||||
locked_StoreCurrentRates(used_bw_in, used_bw_out);
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
|
||||
#include "pqi/pqi.h"
|
||||
#include "pqi/pqisecurity.h"
|
||||
#include "pqi/pqiqos.h"
|
||||
|
||||
#include "util/rsthreads.h"
|
||||
|
||||
|
@ -45,85 +46,91 @@ class SearchModule
|
|||
// Presents a P3 Face to the world!
|
||||
// and funnels data through to a PQInterface.
|
||||
//
|
||||
class pqihandler: public P3Interface
|
||||
class pqihandler: public P3Interface, public pqiQoS
|
||||
{
|
||||
public:
|
||||
pqihandler(SecurityPolicy *Global);
|
||||
bool AddSearchModule(SearchModule *mod);
|
||||
bool RemoveSearchModule(SearchModule *mod);
|
||||
pqihandler(SecurityPolicy *Global);
|
||||
bool AddSearchModule(SearchModule *mod);
|
||||
bool RemoveSearchModule(SearchModule *mod);
|
||||
|
||||
// P3Interface.
|
||||
virtual int SearchSpecific(RsCacheRequest *ns);
|
||||
virtual int SendSearchResult(RsCacheItem *);
|
||||
// P3Interface.
|
||||
virtual int SearchSpecific(RsCacheRequest *ns);
|
||||
virtual int SendSearchResult(RsCacheItem *);
|
||||
|
||||
// inputs.
|
||||
virtual RsCacheRequest * RequestedSearch();
|
||||
virtual RsCacheItem * GetSearchResult();
|
||||
// inputs.
|
||||
virtual RsCacheRequest * RequestedSearch();
|
||||
virtual RsCacheItem * GetSearchResult();
|
||||
|
||||
// file i/o
|
||||
virtual int SendFileRequest(RsFileRequest *ns);
|
||||
virtual int SendFileData(RsFileData *ns);
|
||||
virtual int SendFileChunkMapRequest(RsFileChunkMapRequest *ns);
|
||||
virtual int SendFileChunkMap(RsFileChunkMap *ns);
|
||||
virtual int SendFileCRC32MapRequest(RsFileCRC32MapRequest *ns);
|
||||
virtual int SendFileCRC32Map(RsFileCRC32Map *ns);
|
||||
virtual RsFileRequest *GetFileRequest();
|
||||
virtual RsFileData *GetFileData();
|
||||
virtual RsFileChunkMapRequest *GetFileChunkMapRequest();
|
||||
virtual RsFileChunkMap *GetFileChunkMap();
|
||||
virtual RsFileCRC32MapRequest *GetFileCRC32MapRequest();
|
||||
virtual RsFileCRC32Map *GetFileCRC32Map();
|
||||
// file i/o
|
||||
virtual int SendFileRequest(RsFileRequest *ns);
|
||||
virtual int SendFileData(RsFileData *ns);
|
||||
virtual int SendFileChunkMapRequest(RsFileChunkMapRequest *ns);
|
||||
virtual int SendFileChunkMap(RsFileChunkMap *ns);
|
||||
virtual int SendFileCRC32MapRequest(RsFileCRC32MapRequest *ns);
|
||||
virtual int SendFileCRC32Map(RsFileCRC32Map *ns);
|
||||
virtual RsFileRequest *GetFileRequest();
|
||||
virtual RsFileData *GetFileData();
|
||||
virtual RsFileChunkMapRequest *GetFileChunkMapRequest();
|
||||
virtual RsFileChunkMap *GetFileChunkMap();
|
||||
virtual RsFileCRC32MapRequest *GetFileCRC32MapRequest();
|
||||
virtual RsFileCRC32Map *GetFileCRC32Map();
|
||||
|
||||
// Rest of P3Interface
|
||||
virtual int tick();
|
||||
virtual int status();
|
||||
// Rest of P3Interface
|
||||
virtual int tick();
|
||||
virtual int status();
|
||||
|
||||
// Service Data Interface
|
||||
virtual int SendRsRawItem(RsRawItem *);
|
||||
virtual RsRawItem *GetRsRawItem();
|
||||
// Service Data Interface
|
||||
virtual int SendRsRawItem(RsRawItem *);
|
||||
virtual RsRawItem *GetRsRawItem();
|
||||
|
||||
// rate control.
|
||||
//indiv rate is deprecated
|
||||
//void setMaxIndivRate(bool in, float val);
|
||||
//float getMaxIndivRate(bool in);
|
||||
void setMaxRate(bool in, float val);
|
||||
float getMaxRate(bool in);
|
||||
// rate control.
|
||||
//indiv rate is deprecated
|
||||
//void setMaxIndivRate(bool in, float val);
|
||||
//float getMaxIndivRate(bool in);
|
||||
void setMaxRate(bool in, float val);
|
||||
float getMaxRate(bool in);
|
||||
|
||||
void getCurrentRates(float &in, float &out);
|
||||
void getCurrentRates(float &in, float &out);
|
||||
|
||||
bool drawFromQoS_queue() ;
|
||||
|
||||
protected:
|
||||
/* check to be overloaded by those that can
|
||||
* generates warnings otherwise
|
||||
*/
|
||||
/* check to be overloaded by those that can
|
||||
* generates warnings otherwise
|
||||
*/
|
||||
|
||||
int HandleRsItem(RsItem *ns, int allowglobal);
|
||||
int locked_HandleRsItem(RsItem *ns, int allowglobal,uint32_t& size);
|
||||
bool queueOutRsItem(RsItem *) ;
|
||||
|
||||
virtual int locked_checkOutgoingRsItem(RsItem *item, int global);
|
||||
int locked_GetItems();
|
||||
void locked_SortnStoreItem(RsItem *item);
|
||||
virtual int locked_checkOutgoingRsItem(RsItem *item, int global);
|
||||
int locked_GetItems();
|
||||
void locked_SortnStoreItem(RsItem *item);
|
||||
|
||||
RsMutex coreMtx; /* MUTEX */
|
||||
RsMutex coreMtx; /* MUTEX */
|
||||
|
||||
std::map<std::string, SearchModule *> mods;
|
||||
SecurityPolicy *globsec;
|
||||
std::map<std::string, SearchModule *> mods;
|
||||
SecurityPolicy *globsec;
|
||||
|
||||
// Temporary storage...
|
||||
std::list<RsItem *> in_result, in_search, in_request, in_data, in_service,in_chunkmap,in_chunkmap_request,in_crc32map_request,in_crc32map;
|
||||
// Temporary storage...
|
||||
std::list<RsItem *> in_result, in_search, in_request, in_data, in_service,in_chunkmap,in_chunkmap_request,in_crc32map_request,in_crc32map;
|
||||
|
||||
private:
|
||||
|
||||
// rate control.
|
||||
int UpdateRates();
|
||||
void locked_StoreCurrentRates(float in, float out);
|
||||
// rate control.
|
||||
int UpdateRates();
|
||||
void locked_StoreCurrentRates(float in, float out);
|
||||
|
||||
float rateIndiv_in;
|
||||
float rateIndiv_out;
|
||||
float rateMax_in;
|
||||
float rateMax_out;
|
||||
float rateIndiv_in;
|
||||
float rateIndiv_out;
|
||||
float rateMax_in;
|
||||
float rateMax_out;
|
||||
|
||||
float rateTotal_in;
|
||||
float rateTotal_out;
|
||||
float rateTotal_in;
|
||||
float rateTotal_out;
|
||||
|
||||
uint32_t nb_ticks ;
|
||||
time_t last_m ;
|
||||
float ticks_per_sec ;
|
||||
};
|
||||
|
||||
//inline void pqihandler::setMaxIndivRate(bool in, float val)
|
||||
|
|
|
@ -61,7 +61,7 @@ pqiperson::~pqiperson()
|
|||
|
||||
|
||||
// The PQInterface interface.
|
||||
int pqiperson::SendItem(RsItem *i)
|
||||
int pqiperson::SendItem(RsItem *i,uint32_t& serialized_size)
|
||||
{
|
||||
std::ostringstream out;
|
||||
out << "pqiperson::SendItem()";
|
||||
|
@ -72,7 +72,7 @@ int pqiperson::SendItem(RsItem *i)
|
|||
#ifdef PERSON_DEBUG
|
||||
std::cerr << out.str() << std::endl;
|
||||
#endif
|
||||
return activepqi -> SendItem(i);
|
||||
return activepqi -> SendItem(i,serialized_size);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -120,7 +120,13 @@ int receiveHeartbeat();
|
|||
int addChildInterface(uint32_t type, pqiconnect *pqi);
|
||||
|
||||
// The PQInterface interface.
|
||||
virtual int SendItem(RsItem *);
|
||||
virtual int SendItem(RsItem *,uint32_t& serialized_size);
|
||||
virtual int SendItem(RsItem *item)
|
||||
{
|
||||
std::cerr << "Warning pqiperson::sendItem(RsItem*) should not be called. Plz call SendItem(RsItem *,uint32_t& serialized_size) instead." << std::endl;
|
||||
uint32_t serialized_size ;
|
||||
return SendItem(item,serialized_size) ;
|
||||
}
|
||||
virtual RsItem *GetItem();
|
||||
|
||||
virtual int status();
|
||||
|
|
|
@ -138,14 +138,6 @@ pqistreamer::~pqistreamer()
|
|||
free(pkt);
|
||||
}
|
||||
|
||||
// clean up outgoing (data packets)
|
||||
while(out_data.size() > 0)
|
||||
{
|
||||
void *pkt = out_data.front();
|
||||
out_data.pop_front();
|
||||
free(pkt);
|
||||
}
|
||||
|
||||
if (pkt_wpending)
|
||||
{
|
||||
free(pkt_wpending);
|
||||
|
@ -166,7 +158,7 @@ pqistreamer::~pqistreamer()
|
|||
|
||||
|
||||
// Get/Send Items.
|
||||
int pqistreamer::SendItem(RsItem *si)
|
||||
int pqistreamer::SendItem(RsItem *si,uint32_t& out_size)
|
||||
{
|
||||
#ifdef RSITEM_DEBUG
|
||||
{
|
||||
|
@ -178,7 +170,7 @@ int pqistreamer::SendItem(RsItem *si)
|
|||
}
|
||||
#endif
|
||||
|
||||
return queue_outpqi(si);
|
||||
return queue_outpqi(si,out_size);
|
||||
}
|
||||
|
||||
RsItem *pqistreamer::GetItem()
|
||||
|
@ -201,15 +193,17 @@ RsItem *pqistreamer::GetItem()
|
|||
// // PQInterface
|
||||
int pqistreamer::tick()
|
||||
{
|
||||
{
|
||||
std::ostringstream out;
|
||||
out << "pqistreamer::tick()";
|
||||
out << std::endl;
|
||||
out << PeerId() << ": currRead/Sent: " << currRead << "/" << currSent;
|
||||
out << std::endl;
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
{
|
||||
std::ostringstream out;
|
||||
out << "pqistreamer::tick()";
|
||||
out << std::endl;
|
||||
out << PeerId() << ": currRead/Sent: " << currRead << "/" << currSent;
|
||||
out << std::endl;
|
||||
|
||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str());
|
||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str());
|
||||
}
|
||||
#endif
|
||||
|
||||
bio->tick();
|
||||
|
||||
|
@ -227,13 +221,14 @@ int pqistreamer::tick()
|
|||
handleincoming();
|
||||
handleoutgoing();
|
||||
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
/* give details of the packets */
|
||||
{
|
||||
std::list<void *>::iterator it;
|
||||
std::list<void *>::iterator it;
|
||||
|
||||
std::ostringstream out;
|
||||
out << "pqistreamer::tick() Queued Data:";
|
||||
out << " for " << PeerId();
|
||||
out << " for " << PeerId();
|
||||
|
||||
if (bio->isactive())
|
||||
{
|
||||
|
@ -270,11 +265,12 @@ int pqistreamer::tick()
|
|||
out << std::endl;
|
||||
}
|
||||
|
||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out.str());
|
||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out.str());
|
||||
}
|
||||
#endif
|
||||
|
||||
/* if there is more stuff in the queues */
|
||||
if ((incoming.size() > 0) || (out_pkt.size() > 0) || (out_data.size() > 0))
|
||||
if ((incoming.size() > 0) || (out_pkt.size() > 0))
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
@ -302,8 +298,9 @@ int pqistreamer::status()
|
|||
//
|
||||
/**************** HANDLE OUTGOING TRANSLATION + TRANSMISSION ******/
|
||||
|
||||
int pqistreamer::queue_outpqi(RsItem *pqi)
|
||||
int pqistreamer::queue_outpqi(RsItem *pqi,uint32_t& pktsize)
|
||||
{
|
||||
pktsize = 0 ;
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
std::cerr << "pqistreamer::queue_outpqi() called." << std::endl;
|
||||
#endif
|
||||
|
@ -316,44 +313,28 @@ int pqistreamer::queue_outpqi(RsItem *pqi)
|
|||
if(dynamic_cast<RsFileData*>(pqi)!=NULL && (bio_flags & BIN_FLAGS_NO_DELETE))
|
||||
{
|
||||
std::cerr << "Having file data with flags = " << bio_flags << std::endl ;
|
||||
*(int*)0x0=1 ;
|
||||
}
|
||||
|
||||
{
|
||||
std::ostringstream out;
|
||||
out << "pqistreamer::queue_outpqi()";
|
||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str());
|
||||
}
|
||||
#endif
|
||||
|
||||
{
|
||||
std::ostringstream out;
|
||||
out << "pqistreamer::queue_outpqi()";
|
||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str());
|
||||
}
|
||||
|
||||
/* decide which type of packet it is */
|
||||
RsFileData *dta = dynamic_cast<RsFileData *>(pqi); // This is the old test method
|
||||
bool isCntrl = (dta == NULL);
|
||||
|
||||
if(pqi->queueType() == RsItem::DATA_QUEUE) // this is the new test method. More general.
|
||||
{
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
std::cerr << "PQISTREAMER:: got a data queue packet !!" << std::endl ;
|
||||
#endif
|
||||
isCntrl = false ;
|
||||
}
|
||||
|
||||
uint32_t pktsize = rsSerialiser->size(pqi);
|
||||
pktsize = rsSerialiser->size(pqi);
|
||||
void *ptr = malloc(pktsize);
|
||||
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
std::cerr << "pqistreamer::queue_outpqi() serializing packet with packet size : " << pktsize << std::endl;
|
||||
std::cerr << "pqistreamer::queue_outpqi() serializing packet with packet size : " << pktsize << std::endl;
|
||||
#endif
|
||||
if (rsSerialiser->serialise(pqi, ptr, &pktsize))
|
||||
if (rsSerialiser->serialise(pqi, ptr, &pktsize))
|
||||
{
|
||||
if (isCntrl)
|
||||
{
|
||||
out_pkt.push_back(ptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
out_data.push_back(ptr);
|
||||
}
|
||||
out_pkt.push_back(ptr);
|
||||
|
||||
if (!(bio_flags & BIN_FLAGS_NO_DELETE))
|
||||
{
|
||||
delete pqi;
|
||||
|
@ -382,11 +363,13 @@ int pqistreamer::queue_outpqi(RsItem *pqi)
|
|||
|
||||
int pqistreamer::handleincomingitem(RsItem *pqi)
|
||||
{
|
||||
{
|
||||
std::ostringstream out;
|
||||
out << "pqistreamer::handleincomingitem()";
|
||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str());
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
{
|
||||
std::ostringstream out;
|
||||
out << "pqistreamer::handleincomingitem()";
|
||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str());
|
||||
}
|
||||
#endif
|
||||
|
||||
// Use overloaded Contact function
|
||||
pqi -> PeerId(PeerId());
|
||||
|
@ -398,11 +381,13 @@ int pqistreamer::handleoutgoing()
|
|||
{
|
||||
RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data
|
||||
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
{
|
||||
std::ostringstream out;
|
||||
out << "pqistreamer::handleoutgoing()";
|
||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str());
|
||||
}
|
||||
#endif
|
||||
|
||||
int maxbytes = outAllowedBytes();
|
||||
int sentbytes = 0;
|
||||
|
@ -420,21 +405,12 @@ int pqistreamer::handleoutgoing()
|
|||
{
|
||||
free(*it);
|
||||
it = out_pkt.erase(it);
|
||||
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
std::ostringstream out;
|
||||
out << "pqistreamer::handleoutgoing() Not active -> Clearing Pkt!";
|
||||
// std::cerr << out.str() ;
|
||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out.str());
|
||||
}
|
||||
for(it = out_data.begin(); it != out_data.end(); )
|
||||
{
|
||||
free(*it);
|
||||
it = out_data.erase(it);
|
||||
|
||||
std::ostringstream out;
|
||||
out << "pqistreamer::handleoutgoing() Not active -> Clearing DPkt!";
|
||||
// std::cerr << out.str() ;
|
||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out.str());
|
||||
#endif
|
||||
}
|
||||
|
||||
/* also remove the pending packets */
|
||||
|
@ -478,7 +454,6 @@ int pqistreamer::handleoutgoing()
|
|||
// send a out_pkt., else send out_data. unless
|
||||
// there is a pending packet.
|
||||
if (!pkt_wpending)
|
||||
{
|
||||
if (out_pkt.size() > 0)
|
||||
{
|
||||
pkt_wpending = *(out_pkt.begin());
|
||||
|
@ -487,22 +462,10 @@ int pqistreamer::handleoutgoing()
|
|||
std::cerr << "pqistreamer::handleoutgoing() getting next pkt from out_pkt queue";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
}
|
||||
else if (out_data.size() > 0)
|
||||
{
|
||||
pkt_wpending = *(out_data.begin());
|
||||
out_data.pop_front();
|
||||
#ifdef DEBUG_TRANSFERS
|
||||
std::cerr << "pqistreamer::handleoutgoing() getting next pkt from out_data queue";
|
||||
std::cerr << std::endl;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
if (pkt_wpending)
|
||||
{
|
||||
std::ostringstream out;
|
||||
// write packet.
|
||||
len = getRsItemSize(pkt_wpending);
|
||||
|
||||
|
@ -512,6 +475,7 @@ int pqistreamer::handleoutgoing()
|
|||
|
||||
if (len != (ss = bio->senddata(pkt_wpending, len)))
|
||||
{
|
||||
std::ostringstream out;
|
||||
out << "Problems with Send Data! (only " << ss << " bytes sent" << ", total pkt size=" << len ;
|
||||
out << std::endl;
|
||||
// std::cerr << out.str() ;
|
||||
|
@ -528,10 +492,6 @@ int pqistreamer::handleoutgoing()
|
|||
std::cerr << std::endl;
|
||||
#endif
|
||||
|
||||
// out << " Success!" << ", sent " << len << " bytes" << std::endl;
|
||||
// std::cerr << out.str() ;
|
||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out.str());
|
||||
|
||||
free(pkt_wpending);
|
||||
pkt_wpending = NULL;
|
||||
|
||||
|
@ -551,11 +511,13 @@ int pqistreamer::handleincoming()
|
|||
int readbytes = 0;
|
||||
static const int max_failed_read_attempts = 2000 ;
|
||||
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
{
|
||||
std::ostringstream out;
|
||||
out << "pqistreamer::handleincoming()";
|
||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str());
|
||||
}
|
||||
#endif
|
||||
|
||||
if(!(bio->isactive()))
|
||||
{
|
||||
|
|
|
@ -41,75 +41,80 @@
|
|||
|
||||
class pqistreamer: public PQInterface
|
||||
{
|
||||
public:
|
||||
pqistreamer(RsSerialiser *rss, std::string peerid, BinInterface *bio_in, int bio_flagsin);
|
||||
virtual ~pqistreamer();
|
||||
public:
|
||||
pqistreamer(RsSerialiser *rss, std::string peerid, BinInterface *bio_in, int bio_flagsin);
|
||||
virtual ~pqistreamer();
|
||||
|
||||
// PQInterface
|
||||
virtual int SendItem(RsItem *);
|
||||
virtual RsItem *GetItem();
|
||||
// PQInterface
|
||||
virtual int SendItem(RsItem *item)
|
||||
{
|
||||
std::cerr << "Warning pqistreamer::sendItem(RsItem*) should not be called. Plz call SendItem(RsItem *,uint32_t& serialized_size) instead." << std::endl;
|
||||
uint32_t serialized_size ;
|
||||
return SendItem(item,serialized_size) ;
|
||||
}
|
||||
virtual int SendItem(RsItem *,uint32_t& serialized_size);
|
||||
virtual RsItem *GetItem();
|
||||
|
||||
virtual int tick();
|
||||
virtual int status();
|
||||
virtual int tick();
|
||||
virtual int status();
|
||||
|
||||
private:
|
||||
/* Implementation */
|
||||
/* Implementation */
|
||||
|
||||
// to filter functions - detect filecancel/data and act!
|
||||
int queue_outpqi( RsItem *i);
|
||||
int handleincomingitem(RsItem *i);
|
||||
// to filter functions - detect filecancel/data and act!
|
||||
int queue_outpqi( RsItem *i,uint32_t& serialized_size);
|
||||
int handleincomingitem(RsItem *i);
|
||||
|
||||
// ticked regularly (manages out queues and sending
|
||||
// via above interfaces.
|
||||
int handleoutgoing();
|
||||
int handleincoming();
|
||||
// ticked regularly (manages out queues and sending
|
||||
// via above interfaces.
|
||||
int handleoutgoing();
|
||||
int handleincoming();
|
||||
|
||||
// Bandwidth/Streaming Management.
|
||||
float outTimeSlice();
|
||||
// Bandwidth/Streaming Management.
|
||||
float outTimeSlice();
|
||||
|
||||
int outAllowedBytes();
|
||||
void outSentBytes(int );
|
||||
int outAllowedBytes();
|
||||
void outSentBytes(int );
|
||||
|
||||
int inAllowedBytes();
|
||||
void inReadBytes(int );
|
||||
int inAllowedBytes();
|
||||
void inReadBytes(int );
|
||||
|
||||
// RsSerialiser - determines which packets can be serialised.
|
||||
RsSerialiser *rsSerialiser;
|
||||
// Binary Interface for IO, initialisated at startup.
|
||||
BinInterface *bio;
|
||||
unsigned int bio_flags; // BIN_FLAGS_NO_CLOSE | BIN_FLAGS_NO_DELETE
|
||||
// RsSerialiser - determines which packets can be serialised.
|
||||
RsSerialiser *rsSerialiser;
|
||||
// Binary Interface for IO, initialisated at startup.
|
||||
BinInterface *bio;
|
||||
unsigned int bio_flags; // BIN_FLAGS_NO_CLOSE | BIN_FLAGS_NO_DELETE
|
||||
|
||||
void *pkt_wpending; // storage for pending packet to write.
|
||||
int pkt_rpend_size; // size of pkt_rpending.
|
||||
void *pkt_rpending; // storage for read in pending packets.
|
||||
void *pkt_wpending; // storage for pending packet to write.
|
||||
int pkt_rpend_size; // size of pkt_rpending.
|
||||
void *pkt_rpending; // storage for read in pending packets.
|
||||
|
||||
enum {reading_state_packet_started=1,
|
||||
enum {reading_state_packet_started=1,
|
||||
reading_state_initial=0 } ;
|
||||
|
||||
int reading_state ;
|
||||
int failed_read_attempts ;
|
||||
int reading_state ;
|
||||
int failed_read_attempts ;
|
||||
|
||||
// Temp Storage for transient data.....
|
||||
std::list<void *> out_pkt; // Cntrl / Search / Results queue
|
||||
std::list<void *> out_data; // FileData - secondary queue.
|
||||
std::list<RsItem *> incoming;
|
||||
// Temp Storage for transient data.....
|
||||
std::list<void *> out_pkt; // Cntrl / Search / Results queue
|
||||
std::list<RsItem *> incoming;
|
||||
|
||||
// data for network stats.
|
||||
int totalRead;
|
||||
int totalSent;
|
||||
// data for network stats.
|
||||
int totalRead;
|
||||
int totalSent;
|
||||
|
||||
// these are representative (but not exact)
|
||||
int currRead;
|
||||
int currSent;
|
||||
int currReadTS; // TS from which these are measured.
|
||||
int currSentTS;
|
||||
// these are representative (but not exact)
|
||||
int currRead;
|
||||
int currSent;
|
||||
int currReadTS; // TS from which these are measured.
|
||||
int currSentTS;
|
||||
|
||||
int avgLastUpdate; // TS from which these are measured.
|
||||
float avgReadCount;
|
||||
float avgSentCount;
|
||||
int avgLastUpdate; // TS from which these are measured.
|
||||
float avgReadCount;
|
||||
float avgSentCount;
|
||||
|
||||
RsMutex streamerMtx ;
|
||||
// pthread_t thread_id;
|
||||
RsMutex streamerMtx ;
|
||||
// pthread_t thread_id;
|
||||
};
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue