Added a thread per active peer - to reduce RTT and increase throughout.

* Added pqithreadstreamer, tweaked pqistreamer to support derivation.
 * Shifted RTT from p3Service to p3FastService.
 * Disabled lots of debug.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.6-initdev@6787 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2013-10-02 03:21:04 +00:00
parent a7dd9ad9e3
commit b587301b5a
22 changed files with 658 additions and 1414 deletions

View file

@ -35,10 +35,9 @@ const int pqipersonzone = 82371;
* #define PERSON_DEBUG 1
****/
#define PERSON_DEBUG 1
pqiperson::pqiperson(std::string id, pqipersongrp *pg)
:PQInterface(id), active(false), activepqi(NULL),
:PQInterface(id), mNotifyMtx("pqiperson-notify"), mPersonMtx("pqiperson"),
active(false), activepqi(NULL),
inConnectAttempt(false), waittimes(0),
pqipg(pg)
{
@ -50,6 +49,8 @@ pqiperson::pqiperson(std::string id, pqipersongrp *pg)
pqiperson::~pqiperson()
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
// clean up the children.
std::map<uint32_t, pqiconnect *>::iterator it;
for(it = kids.begin(); it != kids.end(); it++)
@ -64,6 +65,8 @@ pqiperson::~pqiperson()
// The PQInterface interface.
int pqiperson::SendItem(RsItem *i,uint32_t& serialized_size)
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
std::string out = "pqiperson::SendItem()";
if (active)
{
@ -86,14 +89,27 @@ int pqiperson::SendItem(RsItem *i,uint32_t& serialized_size)
RsItem *pqiperson::GetItem()
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
if (active)
return activepqi -> GetItem();
// else not possible.
return NULL;
}
bool pqiperson::RecvItem(RsItem *item)
{
std::cerr << "pqiperson::RecvItem()";
std::cerr << std::endl;
return pqipg->recvItem((RsRawItem *) item);
}
int pqiperson::status()
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
if (active)
return activepqi -> status();
return -1;
@ -101,6 +117,8 @@ int pqiperson::status()
int pqiperson::receiveHeartbeat()
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::receiveHeartbeat() from peer : " + PeerId());
lastHeartbeatReceived = time(NULL);
@ -110,61 +128,68 @@ int pqiperson::receiveHeartbeat()
// tick......
int pqiperson::tick()
{
//if lastHeartbeatReceived is 0, it might be not activated so don't do a net reset.
if (active && (lastHeartbeatReceived != 0) &&
(time(NULL) - lastHeartbeatReceived) > HEARTBEAT_REPEAT_TIME * 5)
int activeTick = 0;
{
int ageLastIncoming = time(NULL) - activepqi->getLastIncomingTS();
std::string out = "pqiperson::tick() WARNING No heartbeat from: " + PeerId();
//out << " assume dead. calling pqissl::reset(), LastHeartbeat was: ";
rs_sprintf_append(out, " LastHeartbeat was: %ld secs ago", time(NULL) - lastHeartbeatReceived);
rs_sprintf_append(out, " LastIncoming was: %d secs ago", ageLastIncoming);
pqioutput(PQL_WARNING, pqipersonzone, out);
#define NO_PACKET_TIMEOUT 60
if (ageLastIncoming > NO_PACKET_TIMEOUT)
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
//if lastHeartbeatReceived is 0, it might be not activated so don't do a net reset.
if (active && (lastHeartbeatReceived != 0) &&
(time(NULL) - lastHeartbeatReceived) > HEARTBEAT_REPEAT_TIME * 5)
{
out = "pqiperson::tick() " + PeerId();
out += " No Heartbeat & No Packets -> assume dead. calling pqissl::reset()";
int ageLastIncoming = time(NULL) - activepqi->getLastIncomingTS();
std::string out = "pqiperson::tick() WARNING No heartbeat from: " + PeerId();
//out << " assume dead. calling pqissl::reset(), LastHeartbeat was: ";
rs_sprintf_append(out, " LastHeartbeat was: %ld secs ago", time(NULL) - lastHeartbeatReceived);
rs_sprintf_append(out, " LastIncoming was: %d secs ago", ageLastIncoming);
pqioutput(PQL_WARNING, pqipersonzone, out);
this->reset();
#define NO_PACKET_TIMEOUT 60
if (ageLastIncoming > NO_PACKET_TIMEOUT)
{
out = "pqiperson::tick() " + PeerId();
out += " No Heartbeat & No Packets -> assume dead. calling pqissl::reset()";
pqioutput(PQL_WARNING, pqipersonzone, out);
this->reset();
}
}
{
std::string out = "pqiperson::tick() Id: " + PeerId() + " ";
if (active)
out += "***Active***";
else
out += ">>InActive<<";
out += "\n";
rs_sprintf_append(out, "Activepqi: %p inConnectAttempt: ", activepqi);
if (inConnectAttempt)
out += "In Connection Attempt";
else
out += " Not Connecting ";
out += "\n";
// tick the children.
std::map<uint32_t, pqiconnect *>::iterator it;
for(it = kids.begin(); it != kids.end(); it++)
{
if (0 < (it->second) -> tick())
{
activeTick = 1;
}
rs_sprintf_append(out, "\tTicking Child: %d\n", it->first);
}
pqioutput(PQL_DEBUG_ALL, pqipersonzone, out);
} // end of pqioutput.
}
int activeTick = 0;
{
std::string out = "pqiperson::tick() Id: " + PeerId() + " ";
if (active)
out += "***Active***";
else
out += ">>InActive<<";
out += "\n";
rs_sprintf_append(out, "Activepqi: %p inConnectAttempt: ", activepqi);
if (inConnectAttempt)
out += "In Connection Attempt";
else
out += " Not Connecting ";
out += "\n";
// tick the children.
std::map<uint32_t, pqiconnect *>::iterator it;
for(it = kids.begin(); it != kids.end(); it++)
{
if (0 < (it->second) -> tick())
{
activeTick = 1;
}
rs_sprintf_append(out, "\tTicking Child: %d\n", it->first);
}
pqioutput(PQL_DEBUG_ALL, pqipersonzone, out);
} // end of pqioutput.
// handle Notify Events that were generated.
processNotifyEvents();
return activeTick;
}
@ -172,8 +197,60 @@ int pqiperson::tick()
// callback function for the child - notify of a change.
// This is only used for out-of-band info....
// otherwise could get dangerous loops.
// - Actually, now we have - must store and process later.
int pqiperson::notifyEvent(NetInterface *ni, int newState, const struct sockaddr_storage &remote_peer_address)
{
if (mPersonMtx.trylock())
{
handleNotifyEvent_locked(ni, newState, remote_peer_address);
mPersonMtx.unlock();
return 1;
}
RsStackMutex stack(mNotifyMtx); /**** LOCK MUTEX ****/
mNotifyQueue.push_back(NotifyData(ni, newState, remote_peer_address));
return 1;
}
void pqiperson::processNotifyEvents()
{
NetInterface *ni;
int state;
struct sockaddr_storage addr;
while(1)
{
{
RsStackMutex stack(mNotifyMtx); /**** LOCK MUTEX ****/
if (mNotifyQueue.empty())
{
return;
}
NotifyData &data = mNotifyQueue.front();
ni = data.mNi;
state = data.mState;
addr = data.mAddr;
mNotifyQueue.pop_front();
}
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
handleNotifyEvent_locked(ni, state, addr);
}
return;
}
int pqiperson::handleNotifyEvent_locked(NetInterface *ni, int newState, const struct sockaddr_storage &remote_peer_address)
{
{
std::string out = "pqiperson::notifyEvent() Id: " + PeerId() + "\n";
rs_sprintf_append(out, "Message: %d from: %p\n", newState, ni);
@ -243,6 +320,8 @@ int pqiperson::notifyEvent(NetInterface *ni, int newState, const struct sockadd
activepqi = pqi;
inConnectAttempt = false;
activepqi->start(); // STARTUP THREAD.
/* reset all other children? (clear up long UDP attempt) */
for(it = kids.begin(); it != kids.end(); it++)
{
@ -270,6 +349,7 @@ int pqiperson::notifyEvent(NetInterface *ni, int newState, const struct sockadd
{
pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::notifyEvent() Id: " + PeerId() + " CONNECT_FAILED->marking so!");
activepqi->stop(); // STOP THREAD.
active = false;
activepqi = NULL;
}
@ -302,11 +382,14 @@ int pqiperson::notifyEvent(NetInterface *ni, int newState, const struct sockadd
int pqiperson::reset()
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::reset() resetting all pqiconnect for Id: " + PeerId());
std::map<uint32_t, pqiconnect *>::iterator it;
for(it = kids.begin(); it != kids.end(); it++)
{
(it->second) -> stop(); // STOP THREAD.
(it->second) -> reset();
}
@ -317,8 +400,29 @@ int pqiperson::reset()
return 1;
}
int pqiperson::fullstopthreads()
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::fullstopthreads() for Id: " + PeerId());
std::map<uint32_t, pqiconnect *>::iterator it;
for(it = kids.begin(); it != kids.end(); it++)
{
(it->second) -> fullstop(); // WAIT FOR THREAD TO STOP.
}
activepqi = NULL;
active = false;
lastHeartbeatReceived = 0;
return 1;
}
int pqiperson::addChildInterface(uint32_t type, pqiconnect *pqi)
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
{
std::string out;
rs_sprintf(out, "pqiperson::addChildInterface() : Id %s %u", PeerId().c_str(), type);
@ -335,6 +439,8 @@ int pqiperson::addChildInterface(uint32_t type, pqiconnect *pqi)
int pqiperson::listen()
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
pqioutput(PQL_DEBUG_BASIC, pqipersonzone, "pqiperson::listen() Id: " + PeerId());
if (!active)
@ -352,6 +458,8 @@ int pqiperson::listen()
int pqiperson::stoplistening()
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
pqioutput(PQL_DEBUG_BASIC, pqipersonzone, "pqiperson::stoplistening() Id: " + PeerId());
std::map<uint32_t, pqiconnect *>::iterator it;
@ -368,6 +476,8 @@ int pqiperson::connect(uint32_t type, const struct sockaddr_storage &raddr,
uint32_t delay, uint32_t period, uint32_t timeout, uint32_t flags, uint32_t bandwidth,
const std::string &domain_addr, uint16_t domain_port)
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
#ifdef PERSON_DEBUG
#endif
{
@ -413,7 +523,7 @@ int pqiperson::connect(uint32_t type, const struct sockaddr_storage &raddr,
#ifdef PERSON_DEBUG
std::cerr << "pqiperson::connect() WARNING, clearing rate cap" << std::endl;
#endif
setRateCap(0,0);
setRateCap_locked(0,0);
#ifdef PERSON_DEBUG
std::cerr << "pqiperson::connect() setting connect_parameters" << std::endl;
@ -444,25 +554,10 @@ int pqiperson::connect(uint32_t type, const struct sockaddr_storage &raddr,
}
pqiconnect *pqiperson::getKid(uint32_t type)
{
std::map<uint32_t, pqiconnect *>::iterator it;
if (kids.empty()) {
return NULL;
}
it = kids.find(type);
if (it == kids.end())
{
return NULL;
} else {
return it->second;
}
}
void pqiperson::getRates(RsBwRates &rates)
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
// get the rate from the active one.
if ((!active) || (activepqi == NULL))
return;
@ -471,6 +566,8 @@ void pqiperson::getRates(RsBwRates &rates)
int pqiperson::getQueueSize(bool in)
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
// get the rate from the active one.
if ((!active) || (activepqi == NULL))
return 0;
@ -480,6 +577,8 @@ int pqiperson::getQueueSize(bool in)
float pqiperson::getRate(bool in)
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
// get the rate from the active one.
if ((!active) || (activepqi == NULL))
return 0;
@ -488,6 +587,8 @@ float pqiperson::getRate(bool in)
void pqiperson::setMaxRate(bool in, float val)
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
// set to all of them. (and us)
PQInterface::setMaxRate(in, val);
// clean up the children.
@ -499,6 +600,12 @@ void pqiperson::setMaxRate(bool in, float val)
}
void pqiperson::setRateCap(float val_in, float val_out)
{
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
return setRateCap_locked(val_in, val_out);
}
void pqiperson::setRateCap_locked(float val_in, float val_out)
{
// set to all of them. (and us)
PQInterface::setRateCap(val_in, val_out);