mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-01-28 08:16:59 -05:00
improved calculation of speed in pqistreamer (was called before only when sending bytes), and prevented situation where no HB is received by the connection is not reset because last HB is 0, causing ghost connected peers in GUI
This commit is contained in:
parent
993d7d4c8f
commit
a5044bd71c
@ -495,7 +495,7 @@ int pqihandler::UpdateRates()
|
|||||||
|
|
||||||
// loop through modules to get the used bandwith and the number of modules that are affectively transfering
|
// loop through modules to get the used bandwith and the number of modules that are affectively transfering
|
||||||
#ifdef PQI_HDL_DEBUG_UR
|
#ifdef PQI_HDL_DEBUG_UR
|
||||||
std::cerr << " Looping through modules" << std::endl;
|
std::cerr << "Looping through modules" << std::endl;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int index = 0;
|
int index = 0;
|
||||||
@ -504,6 +504,12 @@ int pqihandler::UpdateRates()
|
|||||||
{
|
{
|
||||||
SearchModule *mod = (it -> second);
|
SearchModule *mod = (it -> second);
|
||||||
float crate_in = mod -> pqi -> getRate(true);
|
float crate_in = mod -> pqi -> getRate(true);
|
||||||
|
|
||||||
|
#ifdef PQI_HDL_DEBUG_UR
|
||||||
|
if(crate_in > 0.0)
|
||||||
|
std::cerr << " got in rate for peer " << it->first << " : " << crate_in << std::endl;
|
||||||
|
#endif
|
||||||
|
|
||||||
if ((crate_in > 0.01 * avail_in) || (crate_in > 0.1))
|
if ((crate_in > 0.01 * avail_in) || (crate_in > 0.1))
|
||||||
{
|
{
|
||||||
++effectiveDownloadsSm;
|
++effectiveDownloadsSm;
|
||||||
|
@ -141,9 +141,19 @@ int pqiperson::tick()
|
|||||||
{
|
{
|
||||||
RS_STACK_MUTEX(mPersonMtx);
|
RS_STACK_MUTEX(mPersonMtx);
|
||||||
|
|
||||||
|
#ifdef PERSON_DEBUG
|
||||||
|
if(active)
|
||||||
|
{
|
||||||
|
std::cerr << "pqiperson: peer=" << (activepqi? (activepqi->PeerId()): (RsPeerId())) <<", active=" << active << ", last HB=" << time(NULL) - lastHeartbeatReceived << " secs ago." ;
|
||||||
|
if(lastHeartbeatReceived==0)
|
||||||
|
std::cerr << "!!!!!!!" << std::endl;
|
||||||
|
else
|
||||||
|
std::cerr << std::endl;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
//if lastHeartbeatReceived is 0, it might be not activated so don't do a net reset.
|
//if lastHeartbeatReceived is 0, it might be not activated so don't do a net reset.
|
||||||
if ( active && (lastHeartbeatReceived != 0)
|
if ( active && time(NULL) > lastHeartbeatReceived + HEARTBEAT_REPEAT_TIME * 5)
|
||||||
&& (time(NULL) - lastHeartbeatReceived) > HEARTBEAT_REPEAT_TIME * 5)
|
|
||||||
{
|
{
|
||||||
int ageLastIncoming = time(NULL) - activepqi->getLastIncomingTS();
|
int ageLastIncoming = time(NULL) - activepqi->getLastIncomingTS();
|
||||||
|
|
||||||
@ -157,12 +167,7 @@ int pqiperson::tick()
|
|||||||
|
|
||||||
if (ageLastIncoming > 60) // Check timeout
|
if (ageLastIncoming > 60) // Check timeout
|
||||||
{
|
{
|
||||||
#ifdef PERSON_DEBUG
|
std::cerr << "pqiperson::tick() " << PeerId().toStdString() << " No Heartbeat & No Packets for 60 secs -> assume dead." << std::endl;
|
||||||
std::cerr << "pqiperson::tick() " << PeerId().toStdString()
|
|
||||||
<< " No Heartbeat & No Packets -> assume dead."
|
|
||||||
<< "calling pqissl::reset()" << std::endl;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
this->reset_locked();
|
this->reset_locked();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -338,7 +343,7 @@ int pqiperson::handleNotifyEvent_locked(NetInterface *ni, int newState,
|
|||||||
|
|
||||||
// mark as active.
|
// mark as active.
|
||||||
active = true;
|
active = true;
|
||||||
lastHeartbeatReceived = 0;
|
lastHeartbeatReceived = time(NULL) ;
|
||||||
activepqi = pqi;
|
activepqi = pqi;
|
||||||
inConnectAttempt = false;
|
inConnectAttempt = false;
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@ static const int CONNECT_UNREACHABLE = 3;
|
|||||||
static const int CONNECT_FIREWALLED = 4;
|
static const int CONNECT_FIREWALLED = 4;
|
||||||
static const int CONNECT_FAILED = 5;
|
static const int CONNECT_FAILED = 5;
|
||||||
|
|
||||||
static const int HEARTBEAT_REPEAT_TIME = 5;
|
static const time_t HEARTBEAT_REPEAT_TIME = 5;
|
||||||
|
|
||||||
#include "pqi/pqiqosstreamer.h"
|
#include "pqi/pqiqosstreamer.h"
|
||||||
#include "pqi/pqithreadstreamer.h"
|
#include "pqi/pqithreadstreamer.h"
|
||||||
|
@ -38,7 +38,9 @@
|
|||||||
|
|
||||||
const int pqistreamerzone = 8221;
|
const int pqistreamerzone = 8221;
|
||||||
|
|
||||||
const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */
|
static const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */
|
||||||
|
static const int PQISTREAM_AVG_PERIOD = 5; // update speed estimate every 5 seconds
|
||||||
|
static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over speed estimate.
|
||||||
|
|
||||||
/* This removes the print statements (which hammer pqidebug) */
|
/* This removes the print statements (which hammer pqidebug) */
|
||||||
/***
|
/***
|
||||||
@ -173,63 +175,39 @@ RsItem *pqistreamer::GetItem()
|
|||||||
return osr;
|
return osr;
|
||||||
}
|
}
|
||||||
|
|
||||||
// // PQInterface
|
void pqistreamer::updateRates()
|
||||||
int pqistreamer::tick()
|
|
||||||
{
|
{
|
||||||
|
// now update rates both ways.
|
||||||
|
|
||||||
|
time_t t = time(NULL); // get current timestep.
|
||||||
|
|
||||||
|
if (t > mAvgLastUpdate + PQISTREAM_AVG_PERIOD)
|
||||||
|
{
|
||||||
|
float avgReadpSec = getRate(true) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgReadCount/(1000.0 * (t - mAvgLastUpdate));
|
||||||
|
float avgSentpSec = getRate(false) * PQISTREAM_AVG_FRAC + (1.0 - PQISTREAM_AVG_FRAC) * mAvgSentCount/(1000.0 * (t - mAvgLastUpdate));
|
||||||
|
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
{
|
std::cerr << "Peer " << PeerId() << ": Current speed estimates: " << avgReadpSec << " / " << avgSentpSec << std::endl;
|
||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
|
||||||
std::string out = "pqistreamer::tick()\n" + PeerId();
|
|
||||||
rs_sprintf_append(out, ": currRead/Sent: %d/%d", mCurrRead, mCurrSent);
|
|
||||||
|
|
||||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out);
|
|
||||||
}
|
|
||||||
#endif
|
#endif
|
||||||
|
/* pretend our rate is zero if we are
|
||||||
if (!tick_bio())
|
* not bandwidthLimited().
|
||||||
|
*/
|
||||||
|
if (mBio->bandwidthLimited())
|
||||||
{
|
{
|
||||||
return 0;
|
setRate(true, avgReadpSec);
|
||||||
}
|
setRate(false, avgSentpSec);
|
||||||
|
|
||||||
tick_recv(0);
|
|
||||||
tick_send(0);
|
|
||||||
|
|
||||||
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
|
|
||||||
#ifdef DEBUG_PQISTREAMER
|
|
||||||
/* give details of the packets */
|
|
||||||
{
|
|
||||||
std::list<void *>::iterator it;
|
|
||||||
|
|
||||||
std::string out = "pqistreamer::tick() Queued Data: for " + PeerId();
|
|
||||||
|
|
||||||
if (mBio->isactive())
|
|
||||||
{
|
|
||||||
out += " (active)";
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
out += " (waiting)";
|
std::cerr << "Warning: setting to 0" << std::endl;
|
||||||
}
|
setRate(true, 0);
|
||||||
out += "\n";
|
setRate(false, 0);
|
||||||
|
|
||||||
{
|
|
||||||
int total = locked_compute_out_pkt_size() ;
|
|
||||||
|
|
||||||
rs_sprintf_append(out, "\t Out Packets [%d] => %d bytes\n", locked_out_queue_size(), total);
|
|
||||||
rs_sprintf_append(out, "\t Incoming [%d]\n", mIncomingSize);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out);
|
mAvgLastUpdate = t;
|
||||||
|
mAvgReadCount = 0;
|
||||||
|
mAvgSentCount = 0;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
/* if there is more stuff in the queues */
|
|
||||||
if ((!mIncoming.empty()) || (locked_out_queue_size() > 0))
|
|
||||||
{
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int pqistreamer::tick_bio()
|
int pqistreamer::tick_bio()
|
||||||
@ -955,9 +933,6 @@ int pqistreamer::inAllowedBytes_locked()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static const float AVG_PERIOD = 5; // sec
|
|
||||||
static const float AVG_FRAC = 0.8; // for low pass filter.
|
|
||||||
|
|
||||||
void pqistreamer::outSentBytes_locked(uint32_t outb)
|
void pqistreamer::outSentBytes_locked(uint32_t outb)
|
||||||
{
|
{
|
||||||
#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
@ -981,50 +956,10 @@ void pqistreamer::outSentBytes_locked(uint32_t outb)
|
|||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
mTotalSent += outb;
|
mTotalSent += outb;
|
||||||
mCurrSent += outb;
|
mCurrSent += outb;
|
||||||
mAvgSentCount += outb;
|
mAvgSentCount += outb;
|
||||||
|
|
||||||
int t = time(NULL); // get current timestep.
|
|
||||||
if (t - mAvgLastUpdate > AVG_PERIOD)
|
|
||||||
{
|
|
||||||
float avgReadpSec = getRate(true);
|
|
||||||
float avgSentpSec = getRate(false);
|
|
||||||
|
|
||||||
avgReadpSec *= AVG_FRAC;
|
|
||||||
avgReadpSec += (1.0 - AVG_FRAC) * mAvgReadCount /
|
|
||||||
(1000.0 * (t - mAvgLastUpdate));
|
|
||||||
|
|
||||||
avgSentpSec *= AVG_FRAC;
|
|
||||||
avgSentpSec += (1.0 - AVG_FRAC) * mAvgSentCount /
|
|
||||||
(1000.0 * (t - mAvgLastUpdate));
|
|
||||||
|
|
||||||
|
|
||||||
/* pretend our rate is zero if we are
|
|
||||||
* not bandwidthLimited().
|
|
||||||
*/
|
|
||||||
if (mBio->bandwidthLimited())
|
|
||||||
{
|
|
||||||
setRate(true, avgReadpSec);
|
|
||||||
setRate(false, avgSentpSec);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
setRate(true, 0);
|
|
||||||
setRate(false, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
mAvgLastUpdate = t;
|
|
||||||
mAvgReadCount = 0;
|
|
||||||
mAvgSentCount = 0;
|
|
||||||
}
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,8 +55,6 @@ class pqistreamer: public PQInterface
|
|||||||
}
|
}
|
||||||
virtual int SendItem(RsItem *,uint32_t& serialized_size);
|
virtual int SendItem(RsItem *,uint32_t& serialized_size);
|
||||||
virtual RsItem *GetItem();
|
virtual RsItem *GetItem();
|
||||||
|
|
||||||
virtual int tick();
|
|
||||||
virtual int status();
|
virtual int status();
|
||||||
|
|
||||||
time_t getLastIncomingTS(); // Time of last data packet, for checking a connection is alive.
|
time_t getLastIncomingTS(); // Time of last data packet, for checking a connection is alive.
|
||||||
@ -81,6 +79,7 @@ class pqistreamer: public PQInterface
|
|||||||
//virtual int locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const; // extracting data.
|
//virtual int locked_gatherStatistics(std::vector<uint32_t>& per_service_count,std::vector<uint32_t>& per_priority_count) const; // extracting data.
|
||||||
virtual int locked_gatherStatistics(std::list<RSTrafficClue>& outqueue_stats,std::list<RSTrafficClue>& inqueue_stats); // extracting data.
|
virtual int locked_gatherStatistics(std::list<RSTrafficClue>& outqueue_stats,std::list<RSTrafficClue>& inqueue_stats); // extracting data.
|
||||||
|
|
||||||
|
void updateRates() ;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
RsMutex mStreamerMtx ; // Protects data, fns below, protected so pqiqos can use it too.
|
RsMutex mStreamerMtx ; // Protects data, fns below, protected so pqiqos can use it too.
|
||||||
@ -139,10 +138,11 @@ class pqistreamer: public PQInterface
|
|||||||
// these are representative (but not exact)
|
// these are representative (but not exact)
|
||||||
int mCurrRead;
|
int mCurrRead;
|
||||||
int mCurrSent;
|
int mCurrSent;
|
||||||
int mCurrReadTS; // TS from which these are measured.
|
|
||||||
int mCurrSentTS;
|
|
||||||
|
|
||||||
int mAvgLastUpdate; // TS from which these are measured.
|
time_t mCurrReadTS; // TS from which these are measured.
|
||||||
|
time_t mCurrSentTS;
|
||||||
|
|
||||||
|
time_t mAvgLastUpdate; // TS from which these are measured.
|
||||||
float mAvgReadCount;
|
float mAvgReadCount;
|
||||||
float mAvgSentCount;
|
float mAvgSentCount;
|
||||||
|
|
||||||
|
@ -65,6 +65,8 @@ void pqithreadstreamer::data_tick()
|
|||||||
isactive = mBio->isactive();
|
isactive = mBio->isactive();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
updateRates() ;
|
||||||
|
|
||||||
if (!isactive)
|
if (!isactive)
|
||||||
{
|
{
|
||||||
usleep(DEFAULT_STREAMER_IDLE_SLEEP);
|
usleep(DEFAULT_STREAMER_IDLE_SLEEP);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user