Made pqiperson more readable evidence some strange code

This commit is contained in:
Gio 2015-12-19 22:10:07 +01:00
parent 874f304cd2
commit b2c27a1fed
2 changed files with 274 additions and 303 deletions

View file

@ -38,23 +38,16 @@ const int pqipersonzone = 82371;
* #define PERSON_DEBUG 1 * #define PERSON_DEBUG 1
****/ ****/
pqiperson::pqiperson(const RsPeerId& id, pqipersongrp *pg) pqiperson::pqiperson(const RsPeerId& id, pqipersongrp *pg) :
:PQInterface(id), mNotifyMtx("pqiperson-notify"), mPersonMtx("pqiperson"), PQInterface(id), mNotifyMtx("pqiperson-notify"), mPersonMtx("pqiperson"),
active(false), activepqi(NULL), active(false), activepqi(NULL), inConnectAttempt(false), waittimes(0),
inConnectAttempt(false), waittimes(0), pqipg(pg) {} // TODO: must check id!
pqipg(pg)
{
/* must check id! */
return;
}
pqiperson::~pqiperson() pqiperson::~pqiperson()
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ RS_STACK_MUTEX(mPersonMtx);
// clean up the children. // clean up the childrens
std::map<uint32_t, pqiconnect *>::iterator it; std::map<uint32_t, pqiconnect *>::iterator it;
for(it = kids.begin(); it != kids.end(); ++it) for(it = kids.begin(); it != kids.end(); ++it)
{ {
@ -64,13 +57,11 @@ pqiperson::~pqiperson()
kids.clear(); kids.clear();
} }
int pqiperson::SendItem(RsItem *i,uint32_t& serialized_size)
// The PQInterface interface.
int pqiperson::SendItem(RsItem *i,uint32_t& serialized_size)
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ RS_STACK_MUTEX(mPersonMtx);
if (active) if(active)
{ {
// every outgoing item goes through this function, so try to not waste cpu cycles // every outgoing item goes through this function, so try to not waste cpu cycles
// check if debug output is wanted, to avoid unecessary work // check if debug output is wanted, to avoid unecessary work
@ -103,10 +94,10 @@ int pqiperson::SendItem(RsItem *i,uint32_t& serialized_size)
RsItem *pqiperson::GetItem() RsItem *pqiperson::GetItem()
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ RS_STACK_MUTEX(mPersonMtx);
if (active) if (active)
return activepqi -> GetItem(); return activepqi->GetItem();
// else not possible. // else not possible.
return NULL; return NULL;
} }
@ -114,59 +105,64 @@ RsItem *pqiperson::GetItem()
bool pqiperson::RecvItem(RsItem *item) bool pqiperson::RecvItem(RsItem *item)
{ {
#ifdef PERSON_DEBUG #ifdef PERSON_DEBUG
std::cerr << "pqiperson::RecvItem()"; std::cerr << "pqiperson::RecvItem()" << std::endl;
std::cerr << std::endl;
#endif #endif
return pqipg->recvItem((RsRawItem *) item); return pqipg->recvItem((RsRawItem *) item);
} }
int pqiperson::status() int pqiperson::status()
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ RS_STACK_MUTEX(mPersonMtx);
if (active) if (active)
return activepqi -> status(); return activepqi -> status();
return -1; return -1;
} }
int pqiperson::receiveHeartbeat() int pqiperson::receiveHeartbeat()
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ #ifdef PERSON_DEBUG
std::cerr << "pqiperson::receiveHeartbeat() from peer : "
<< PeerId().toStdString() << std::endl;
#endif
pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::receiveHeartbeat() from peer : " + PeerId().toStdString()); RS_STACK_MUTEX(mPersonMtx);
lastHeartbeatReceived = time(NULL); lastHeartbeatReceived = time(NULL);
return true ; return 1;
} }
// tick......
int pqiperson::tick() int pqiperson::tick()
{ {
int activeTick = 0; int activeTick = 0;
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ RS_STACK_MUTEX(mPersonMtx);
//if lastHeartbeatReceived is 0, it might be not activated so don't do a net reset. //if lastHeartbeatReceived is 0, it might be not activated so don't do a net reset.
if (active && (lastHeartbeatReceived != 0) && if ( active && (lastHeartbeatReceived != 0)
(time(NULL) - lastHeartbeatReceived) > HEARTBEAT_REPEAT_TIME * 5) && (time(NULL) - lastHeartbeatReceived) > HEARTBEAT_REPEAT_TIME * 5)
{ {
int ageLastIncoming = time(NULL) - activepqi->getLastIncomingTS(); int ageLastIncoming = time(NULL) - activepqi->getLastIncomingTS();
std::string out = "pqiperson::tick() WARNING No heartbeat from: " + PeerId().toStdString();
//out << " assume dead. calling pqissl::reset(), LastHeartbeat was: "; #ifdef PERSON_DEBUG
rs_sprintf_append(out, " LastHeartbeat was: %ld secs ago", time(NULL) - lastHeartbeatReceived); std::cerr << "pqiperson::tick() WARNING No heartbeat from: "
rs_sprintf_append(out, " LastIncoming was: %d secs ago", ageLastIncoming); << PeerId().toStdString() << " LastHeartbeat was: "
pqioutput(PQL_WARNING, pqipersonzone, out); << time(NULL) - lastHeartbeatReceived
<< "secs ago LastIncoming was: " << ageLastIncoming
<< "secs ago" << std::endl;
#endif
#define NO_PACKET_TIMEOUT 60 if (ageLastIncoming > 60) // Check timeout
if (ageLastIncoming > NO_PACKET_TIMEOUT)
{ {
out = "pqiperson::tick() " + PeerId().toStdString(); #ifdef PERSON_DEBUG
out += " No Heartbeat & No Packets -> assume dead. calling pqissl::reset()"; std::cerr << "pqiperson::tick() " << PeerId().toStdString()
pqioutput(PQL_WARNING, pqipersonzone, out); << " No Heartbeat & No Packets -> assume dead."
<< "calling pqissl::reset()" << std::endl;
#endif
this->reset_locked(); this->reset_locked();
} }
@ -174,34 +170,31 @@ int pqiperson::tick()
{ {
std::string out = "pqiperson::tick() Id: " + PeerId().toStdString() + " "; #ifdef PERSON_DEBUG
std::string statusStr = " inactive ";
if (active) if (active)
out += "***Active***"; statusStr = " active ";
else
out += ">>InActive<<"; std::string connectStr = " Not Connecting ";
out += "\n";
rs_sprintf_append(out, "Activepqi: %p inConnectAttempt: ", activepqi);
if (inConnectAttempt) if (inConnectAttempt)
out += "In Connection Attempt"; connectStr = " In Connection Attempt ";
else
out += " Not Connecting "; std::cerr << "pqiperson::tick() Id: " << PeerId().toStdString()
out += "\n"; << "activepqi: " << activepqi << " inConnectAttempt:"
<< connectStr << std::endl;
#endif
// tick the children. // tick the children.
std::map<uint32_t, pqiconnect *>::iterator it; std::map<uint32_t, pqiconnect *>::iterator it;
for(it = kids.begin(); it != kids.end(); ++it) for(it = kids.begin(); it != kids.end(); ++it)
{ {
if (0 < (it->second) -> tick()) if (0 < (it->second)->tick())
{
activeTick = 1; activeTick = 1;
} #ifdef PERSON_DEBUG
rs_sprintf_append(out, "\tTicking Child: %d\n", it->first); std::cerr << "\tTicking Child: "<< it->first << std::endl;
#endif
} }
}
pqioutput(PQL_DEBUG_ALL, pqipersonzone, out);
} // end of pqioutput.
} }
// handle Notify Events that were generated. // handle Notify Events that were generated.
@ -214,44 +207,41 @@ int pqiperson::tick()
// This is only used for out-of-band info.... // This is only used for out-of-band info....
// otherwise could get dangerous loops. // otherwise could get dangerous loops.
// - Actually, now we have - must store and process later. // - Actually, now we have - must store and process later.
int pqiperson::notifyEvent(NetInterface *ni, int newState, const struct sockaddr_storage &remote_peer_address) int pqiperson::notifyEvent(NetInterface *ni, int newState,
const sockaddr_storage &remote_peer_address)
{ {
#ifdef PERSON_DEBUG #ifdef PERSON_DEBUG
std::cerr << "pqiperson::notifyEvent() adding event to Queue. newState=" << newState << " from IP = " << sockaddr_storage_tostring(remote_peer_address) << std::endl; std::cerr << "pqiperson::notifyEvent() adding event to Queue. newState="
<< newState << " from IP = "
<< sockaddr_storage_tostring(remote_peer_address) << std::endl;
#endif #endif
if (mPersonMtx.trylock()) if (mPersonMtx.trylock())
{ {
handleNotifyEvent_locked(ni, newState, remote_peer_address); handleNotifyEvent_locked(ni, newState, remote_peer_address);
mPersonMtx.unlock(); mPersonMtx.unlock();
return 1; return 1;
} }
RsStackMutex stack(mNotifyMtx); /**** LOCK MUTEX ****/ RS_STACK_MUTEX(mNotifyMtx);
mNotifyQueue.push_back(NotifyData(ni, newState, remote_peer_address)); mNotifyQueue.push_back(NotifyData(ni, newState, remote_peer_address));
return 1; return 1;
} }
void pqiperson::processNotifyEvents()
void pqiperson::processNotifyEvents()
{ {
NetInterface *ni; NetInterface *ni;
int state; int state;
struct sockaddr_storage addr; sockaddr_storage addr;
while(1) while(1) // While there is notification to handle
{ {
{ {
RsStackMutex stack(mNotifyMtx); /**** LOCK MUTEX ****/ RS_STACK_MUTEX(mNotifyMtx);
if (mNotifyQueue.empty()) if(mNotifyQueue.empty())
{
return; return;
}
NotifyData &data = mNotifyQueue.front(); NotifyData &data = mNotifyQueue.front();
ni = data.mNi; ni = data.mNi;
state = data.mState; state = data.mState;
@ -260,23 +250,21 @@ void pqiperson::processNotifyEvents()
mNotifyQueue.pop_front(); mNotifyQueue.pop_front();
} }
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ RS_STACK_MUTEX(mPersonMtx);
handleNotifyEvent_locked(ni, state, addr); handleNotifyEvent_locked(ni, state, addr);
} }
return;
} }
int pqiperson::handleNotifyEvent_locked(NetInterface *ni, int newState, const struct sockaddr_storage &remote_peer_address) int pqiperson::handleNotifyEvent_locked(NetInterface *ni, int newState,
const sockaddr_storage &remote_peer_address)
{ {
#ifdef PERSON_DEBUG
{ std::cerr << "pqiperson::handleNotifyEvent_locked() Id: "
std::string out = "pqiperson::notifyEvent() Id: " + PeerId().toStdString() + "\n"; << PeerId().toStdString() << " Message: " << newState
rs_sprintf_append(out, "Message: %d from: %p\n", newState, ni); << " from: " << ni << std::endl;
rs_sprintf_append(out, "Active pqi : %p", activepqi); int i = 0;
#endif
pqioutput(PQL_DEBUG_BASIC, pqipersonzone, out);
}
/* find the pqi, */ /* find the pqi, */
pqiconnect *pqi = NULL; pqiconnect *pqi = NULL;
@ -284,14 +272,14 @@ int pqiperson::handleNotifyEvent_locked(NetInterface *ni, int newState, const s
std::map<uint32_t, pqiconnect *>::iterator it; std::map<uint32_t, pqiconnect *>::iterator it;
/* start again */ /* start again */
int i = 0;
for(it = kids.begin(); it != kids.end(); ++it) for(it = kids.begin(); it != kids.end(); ++it)
{ {
std::string out; #ifdef PERSON_DEBUG
rs_sprintf(out, "pqiperson::connectattempt() Kid# %d of %u\n", i, kids.size()); std::cerr << "pqiperson::handleNotifyEvent_locked() Kid# " << i
rs_sprintf_append(out, " type: %u in_ni: %p", it->first, ni); << " of " << kids.size() << " type: " << it->first
pqioutput(PQL_DEBUG_BASIC, pqipersonzone, out); << " in_ni: " << ni << std::endl;
i++; ++i;
#endif
if ((it->second)->thisNetInterface(ni)) if ((it->second)->thisNetInterface(ni))
{ {
@ -302,7 +290,8 @@ int pqiperson::handleNotifyEvent_locked(NetInterface *ni, int newState, const s
if (!pqi) if (!pqi)
{ {
pqioutput(PQL_WARNING, pqipersonzone, "Unknown notfyEvent Source!"); std::cerr << "pqiperson::handleNotifyEvent_locked Unknown Event Source!"
<< std::endl;
return -1; return -1;
} }
@ -311,12 +300,13 @@ int pqiperson::handleNotifyEvent_locked(NetInterface *ni, int newState, const s
{ {
case CONNECT_RECEIVED: case CONNECT_RECEIVED:
case CONNECT_SUCCESS: case CONNECT_SUCCESS:
{
/* notify */ /* notify */
if (pqipg) if (pqipg)
{ {
pqissl *ssl = dynamic_cast<pqissl*>(ni); pqissl *ssl = dynamic_cast<pqissl*>(ni);
if(ssl != NULL) if(ssl)
pqipg->notifyConnect(PeerId(), type, true, ssl->actAsServer(), remote_peer_address); pqipg->notifyConnect(PeerId(), type, true, ssl->actAsServer(), remote_peer_address);
else else
pqipg->notifyConnect(PeerId(), type, true, false, remote_peer_address); pqipg->notifyConnect(PeerId(), type, true, false, remote_peer_address);
@ -324,103 +314,99 @@ int pqiperson::handleNotifyEvent_locked(NetInterface *ni, int newState, const s
if ((active) && (activepqi != pqi)) // already connected - trouble if ((active) && (activepqi != pqi)) // already connected - trouble
{ {
pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::notifyEvent() Id: " + PeerId().toStdString() + " CONNECT_SUCCESS+active-> activing new connection, shutting others"); // TODO: 2015/12/19 Is this block dead code?
std::cerr << "pqiperson::handleNotifyEvent_locked Id: "
<< PeerId().toStdString() << " CONNECT_SUCCESS+active->"
<< "activing new connection, shutting others"
<< std::endl;
// This is the RESET that's killing the connections..... // This is the RESET that's killing the connections.....
//activepqi -> reset(); //activepqi -> reset();
// this causes a recursive call back into this fn. // this causes a recursive call back into this fn.
// which cleans up state. // which cleans up state.
// we only do this if its not going to mess with new conn. // we only do this if its not going to mess with new conn.
} }
/* now install a new one. */ /* now install a new one. */
{ {
#ifdef PERSON_DEBUG
pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::notifyEvent() Id: " + PeerId().toStdString() + " CONNECT_SUCCESS->marking so! (resetting others)"); std::cerr << "pqiperson::handleNotifyEvent_locked Id: "
<< PeerId().toStdString() << " CONNECT_SUCCESS->marking "
<< "so! (resetting others)" << std::endl;
#endif
// mark as active. // mark as active.
active = true; active = true;
lastHeartbeatReceived = 0; lastHeartbeatReceived = 0;
activepqi = pqi; activepqi = pqi;
inConnectAttempt = false; inConnectAttempt = false;
activepqi->start(); // STARTUP THREAD. activepqi->start(); // STARTUP THREAD.
/* reset all other children? (clear up long UDP attempt) */ // reset all other children (clear up long UDP attempt)
for(it = kids.begin(); it != kids.end(); ++it) for(it = kids.begin(); it != kids.end(); ++it)
{
if (!(it->second)->thisNetInterface(ni)) if (!(it->second)->thisNetInterface(ni))
{
std::string out;
rs_sprintf(out, "Resetting pqi ref : %p", &(it->second));
pqioutput(PQL_DEBUG_BASIC, pqipersonzone, out);
it->second->reset(); it->second->reset();
} else {
//std::cerr << "Active pqi : not resetting." << std::endl;
}
}
return 1; return 1;
} }
break; break;
}
case CONNECT_UNREACHABLE: case CONNECT_UNREACHABLE:
case CONNECT_FIREWALLED: case CONNECT_FIREWALLED:
case CONNECT_FAILED: case CONNECT_FAILED:
{
if (active && (activepqi == pqi))
if (active)
{ {
if (activepqi == pqi) #ifdef PERSON_DEBUG
{ std::cerr << "pqiperson::handleNotifyEvent_locked Id: "
pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::notifyEvent() Id: " + PeerId().toStdString() + " CONNECT_FAILED->marking so!"); << PeerId().toStdString()
<< " CONNECT_FAILED->marking so!" << std::endl;
#endif
activepqi->shutdown(); // STOP THREAD. activepqi->shutdown(); // STOP THREAD.
active = false; active = false;
activepqi = NULL; activepqi = NULL;
}
else
{
pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::notifyEvent() Id: " + PeerId().toStdString() + " CONNECT_FAILED-> from an unactive connection, don't flag the peer as not connected, just try next attempt !");
}
} }
#ifdef PERSON_DEBUG
else else
{ std::cerr << "pqiperson::handleNotifyEvent_locked Id: "
pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::notifyEvent() Id: " + PeerId().toStdString() + " CONNECT_FAILED+NOT active -> try connect again"); << PeerId().toStdString() + " CONNECT_FAILED-> from "
} << "an unactive connection, don't flag the peer as "
<< "not connected, just try next attempt !" << std::endl;
#endif
/* notify up */ /* notify up */
if (pqipg) if (pqipg)
{
pqipg->notifyConnect(PeerId(), type, false, false, remote_peer_address); pqipg->notifyConnect(PeerId(), type, false, false, remote_peer_address);
}
return 1; return 1;
break;
default:
break;
} }
return -1; default:
return -1;
}
} }
/***************** Not PQInterface Fns ***********************/ /***************** Not PQInterface Fns ***********************/
int pqiperson::reset() int pqiperson::reset()
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ RS_STACK_MUTEX(mPersonMtx);
return reset_locked(); return reset_locked();
} }
int pqiperson::reset_locked() int pqiperson::reset_locked()
{ {
pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::reset() resetting all pqiconnect for Id: " + PeerId().toStdString()); #ifdef PERSON_DEBUG
std::cerr << "pqiperson::reset_locked() resetting all pqiconnect for Id: "
<< PeerId().toStdString() << std::endl;
#endif
std::map<uint32_t, pqiconnect *>::iterator it; std::map<uint32_t, pqiconnect *>::iterator it;
for(it = kids.begin(); it != kids.end(); ++it) for(it = kids.begin(); it != kids.end(); ++it)
{ {
(it->second) -> shutdown(); // STOP THREAD. (it->second) -> shutdown(); // STOP THREAD.
(it->second) -> reset(); (it->second) -> reset();
} }
activepqi = NULL; activepqi = NULL;
active = false; active = false;
@ -429,17 +415,18 @@ int pqiperson::reset_locked()
return 1; return 1;
} }
int pqiperson::fullstopthreads() int pqiperson::fullstopthreads()
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ #ifdef PERSON_DEBUG
std::cerr << "pqiperson::fullstopthreads() for Id: "
<< PeerId().toStdString() << std::endl;
#endif
pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::fullstopthreads() for Id: " + PeerId().toStdString()); RS_STACK_MUTEX(mPersonMtx);
std::map<uint32_t, pqiconnect *>::iterator it; std::map<uint32_t, pqiconnect *>::iterator it;
for(it = kids.begin(); it != kids.end(); ++it) for(it = kids.begin(); it != kids.end(); ++it)
{ (it->second)->fullstop(); // WAIT FOR THREAD TO STOP.
(it->second) -> fullstop(); // WAIT FOR THREAD TO STOP.
}
activepqi = NULL; activepqi = NULL;
active = false; active = false;
@ -450,13 +437,12 @@ int pqiperson::fullstopthreads()
int pqiperson::addChildInterface(uint32_t type, pqiconnect *pqi) int pqiperson::addChildInterface(uint32_t type, pqiconnect *pqi)
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ #ifdef PERSON_DEBUG
std::cerr << "pqiperson::addChildInterface() : Id "
<< PeerId().toStdString() << " " << type << std::endl;
#endif
{ RS_STACK_MUTEX(mPersonMtx);
std::string out;
rs_sprintf(out, "pqiperson::addChildInterface() : Id %s %u", PeerId().toStdString().c_str(), type);
pqioutput(PQL_DEBUG_BASIC, pqipersonzone, out);
}
kids[type] = pqi; kids[type] = pqi;
return 1; return 1;
@ -466,91 +452,78 @@ int pqiperson::addChildInterface(uint32_t type, pqiconnect *pqi)
// functions to iterate over the connects and change state. // functions to iterate over the connects and change state.
int pqiperson::listen() int pqiperson::listen()
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ #ifdef PERSON_DEBUG
std::cerr << "pqiperson::listen() Id: " + PeerId().toStdString() << std::endl;
#endif
pqioutput(PQL_DEBUG_BASIC, pqipersonzone, "pqiperson::listen() Id: " + PeerId().toStdString()); RS_STACK_MUTEX(mPersonMtx);
if (!active) if (!active)
{ {
std::map<uint32_t, pqiconnect *>::iterator it; std::map<uint32_t, pqiconnect *>::iterator it;
for(it = kids.begin(); it != kids.end(); ++it) for(it = kids.begin(); it != kids.end(); ++it)
{ (it->second)->listen();
// set them all listening.
(it->second) -> listen();
}
} }
return 1; return 1;
} }
int pqiperson::stoplistening() int pqiperson::stoplistening()
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ #ifdef PERSON_DEBUG
std::cerr << "pqiperson::stoplistening() Id: " + PeerId().toStdString()
<< std::endl;
#endif
pqioutput(PQL_DEBUG_BASIC, pqipersonzone, "pqiperson::stoplistening() Id: " + PeerId().toStdString()); RS_STACK_MUTEX(mPersonMtx);
std::map<uint32_t, pqiconnect *>::iterator it; std::map<uint32_t, pqiconnect *>::iterator it;
for(it = kids.begin(); it != kids.end(); ++it) for(it = kids.begin(); it != kids.end(); ++it)
{ (it->second)->stoplistening();
// set them all listening.
(it->second) -> stoplistening();
}
return 1; return 1;
} }
int pqiperson::connect(uint32_t type, const struct sockaddr_storage &raddr, int pqiperson::connect(uint32_t type, const sockaddr_storage &raddr,
const struct sockaddr_storage &proxyaddr, const struct sockaddr_storage &srcaddr, const sockaddr_storage &proxyaddr,
uint32_t delay, uint32_t period, uint32_t timeout, uint32_t flags, uint32_t bandwidth, const sockaddr_storage &srcaddr,
const std::string &domain_addr, uint16_t domain_port) uint32_t delay, uint32_t period, uint32_t timeout,
uint32_t flags, uint32_t bandwidth,
const std::string &domain_addr, uint16_t domain_port)
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/
#ifdef PERSON_DEBUG #ifdef PERSON_DEBUG
std::cerr << "pqiperson::connect() Id: " << PeerId().toStdString()
<< " type: " << type << " addr: "
<< sockaddr_storage_tostring(raddr) << " proxyaddr: "
<< sockaddr_storage_tostring(proxyaddr) << " srcaddr: "
<< sockaddr_storage_tostring(srcaddr) << " delay: " << delay
<< " period: " << period << " timeout: " << timeout << " flags: "
<< flags << " bandwidth: " << bandwidth << std::endl;
#endif #endif
{
std::string out = "pqiperson::connect() Id: " + PeerId().toStdString(); RS_STACK_MUTEX(mPersonMtx);
rs_sprintf_append(out, " type: %u", type);
out += " addr: ";
out += sockaddr_storage_tostring(raddr);
out += " proxyaddr: ";
out += sockaddr_storage_tostring(proxyaddr);
out += " srcaddr: ";
out += sockaddr_storage_tostring(srcaddr);
rs_sprintf_append(out, " delay: %u", delay);
rs_sprintf_append(out, " period: %u", period);
rs_sprintf_append(out, " timeout: %u", timeout);
rs_sprintf_append(out, " flags: %u", flags);
rs_sprintf_append(out, " bandwidth: %u", bandwidth);
//std::cerr << out.str();
pqioutput(PQL_WARNING, pqipersonzone, out);
}
std::map<uint32_t, pqiconnect *>::iterator it; std::map<uint32_t, pqiconnect *>::iterator it;
it = kids.find(type); it = kids.find(type);
if (it == kids.end()) if (it == kids.end())
{ {
#ifdef PERSON_DEBUG
//pqioutput(PQL_DEBUG_BASIC, pqipersonzone, "pqiperson::connect() missing pqiconnect");
#endif
/* notify of fail! */ /* notify of fail! */
pqipg->notifyConnect(PeerId(), type, false, false, raddr); pqipg->notifyConnect(PeerId(), type, false, false, raddr);
return 0; return 0;
} }
#ifdef PERSON_DEBUG #ifdef PERSON_DEBUG
std::cerr << "pqiperson::connect() WARNING, resetting for new connection attempt" << std::endl; std::cerr << "pqiperson::connect() resetting for new connection attempt" << std::endl;
#endif #endif
/* set the parameters */ /* set the parameters */
pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::connect reset() before connection attempt");
(it->second)->reset(); (it->second)->reset();
#ifdef PERSON_DEBUG #ifdef PERSON_DEBUG
std::cerr << "pqiperson::connect() WARNING, clearing rate cap" << std::endl; std::cerr << "pqiperson::connect() clearing rate cap" << std::endl;
#endif #endif
setRateCap_locked(0,0); setRateCap_locked(0,0);
@ -583,61 +556,66 @@ int pqiperson::connect(uint32_t type, const struct sockaddr_storage &raddr,
} }
void pqiperson::getRates(RsBwRates &rates) void pqiperson::getRates(RsBwRates &rates)
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ RS_STACK_MUTEX(mPersonMtx);
// get the rate from the active one. // get the rate from the active one.
if ((!active) || (activepqi == NULL)) if ((!active) || (activepqi == NULL))
return; return;
activepqi -> getRates(rates);
activepqi->getRates(rates);
} }
int pqiperson::gatherStatistics(std::list<RSTrafficClue>& out_lst,std::list<RSTrafficClue>& in_lst)
int pqiperson::gatherStatistics(std::list<RSTrafficClue>& out_lst,
std::list<RSTrafficClue>& in_lst)
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ RS_STACK_MUTEX(mPersonMtx);
// get the rate from the active one. // get the rate from the active one.
if ((!active) || (activepqi == NULL)) if ((!active) || (activepqi == NULL))
return 0; return 0;
return activepqi -> gatherStatistics(out_lst,in_lst); return activepqi->gatherStatistics(out_lst, in_lst);
} }
int pqiperson::getQueueSize(bool in)
int pqiperson::getQueueSize(bool in)
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ RS_STACK_MUTEX(mPersonMtx);
// get the rate from the active one. // get the rate from the active one.
if ((!active) || (activepqi == NULL)) if ((!active) || (activepqi == NULL))
return 0; return 0;
return activepqi -> getQueueSize(in);
return activepqi->getQueueSize(in);
} }
bool pqiperson::getCryptoParams(RsPeerCryptoParams& params) bool pqiperson::getCryptoParams(RsPeerCryptoParams & params)
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ RS_STACK_MUTEX(mPersonMtx);
if(active && activepqi != NULL) if(active && activepqi != NULL)
return activepqi->getCryptoParams(params) ; return activepqi->getCryptoParams(params);
else else
{ {
params.connexion_state = 0 ; params.connexion_state = 0;
params.cipher_name.clear() ; params.cipher_name.clear();
params.cipher_bits_1 = 0 ; params.cipher_bits_1 = 0;
params.cipher_bits_2 = 0 ; params.cipher_bits_2 = 0;
params.cipher_version.clear() ; params.cipher_version.clear();
return false ; return false ;
} }
} }
bool pqiconnect::getCryptoParams(RsPeerCryptoParams& params) bool pqiconnect::getCryptoParams(RsPeerCryptoParams & params)
{ {
pqissl *ssl = dynamic_cast<pqissl*>(ni) ; pqissl *ssl = dynamic_cast<pqissl*>(ni);
if(ssl != NULL) if(ssl != NULL)
{ {
ssl->getCryptoParams(params) ; ssl->getCryptoParams(params);
return true ; return true;
} }
else else
{ {
@ -650,31 +628,30 @@ bool pqiconnect::getCryptoParams(RsPeerCryptoParams& params)
} }
} }
float pqiperson::getRate(bool in) float pqiperson::getRate(bool in)
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ RS_STACK_MUTEX(mPersonMtx);
// get the rate from the active one. // get the rate from the active one.
if ((!active) || (activepqi == NULL)) if ((!active) || (activepqi == NULL))
return 0; return 0;
return activepqi -> getRate(in); return activepqi -> getRate(in);
} }
void pqiperson::setMaxRate(bool in, float val) void pqiperson::setMaxRate(bool in, float val)
{ {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ RS_STACK_MUTEX(mPersonMtx);
// set to all of them. (and us) // set to all of them. (and us)
PQInterface::setMaxRate(in, val); PQInterface::setMaxRate(in, val);
// clean up the children. // clean up the children.
std::map<uint32_t, pqiconnect *>::iterator it; std::map<uint32_t, pqiconnect *>::iterator it;
for(it = kids.begin(); it != kids.end(); ++it) for(it = kids.begin(); it != kids.end(); ++it)
{
(it->second) -> setMaxRate(in, val); (it->second) -> setMaxRate(in, val);
}
} }
void pqiperson::setRateCap(float val_in, float val_out) void pqiperson::setRateCap(float val_in, float val_out)
{ {
// This methods might be called all the way down from pqiperson::tick() down // This methods might be called all the way down from pqiperson::tick() down
// to pqissludp while completing a UDP connexion, causing a deadlock. // to pqissludp while completing a UDP connexion, causing a deadlock.
@ -686,28 +663,30 @@ void pqiperson::setRateCap(float val_in, float val_out)
// The lock cannot be locked by the same thread between the first test and // The lock cannot be locked by the same thread between the first test and
// the "else" statement, so there is no possibility for this code to fail. // the "else" statement, so there is no possibility for this code to fail.
// //
// We could actually put that code in RsMutex::lock() // We could actually put that code in RsMutex::lock()?
// TODO: 2015/12/19 This code is already in RsMutex::lock() but is guarded
// by RSTHREAD_SELF_LOCKING_GUARD which is specifically unset in the header
// Why is that code guarded? Do it have an impact on performance?
// Or we should not get in the situation of trying to relock the mutex on
// the same thread NEVER?
if(pthread_equal(mPersonMtx.owner(),pthread_self())) // 1 - unlocked, or already locked by same thread if(pthread_equal(mPersonMtx.owner(), pthread_self()))
return setRateCap_locked(val_in, val_out); // -> do nothing // Unlocked, or already locked by same thread
setRateCap_locked(val_in, val_out);
else else
{ // 2 - lock was free or locked by different thread => wait. {
RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ // Lock was free or locked by different thread => wait.
RS_STACK_MUTEX(mPersonMtx);
setRateCap_locked(val_in, val_out); setRateCap_locked(val_in, val_out);
} }
} }
void pqiperson::setRateCap_locked(float val_in, float val_out) void pqiperson::setRateCap_locked(float val_in, float val_out)
{ {
// set to all of them. (and us) // set to all of them. (and us)
PQInterface::setRateCap(val_in, val_out); PQInterface::setRateCap(val_in, val_out);
// clean up the children. // clean up the children.
std::map<uint32_t, pqiconnect *>::iterator it; std::map<uint32_t, pqiconnect *>::iterator it;
for(it = kids.begin(); it != kids.end(); ++it) for(it = kids.begin(); it != kids.end(); ++it)
{ (it->second)->setRateCap(val_in, val_out);
(it->second) -> setRateCap(val_in, val_out);
}
} }

View file

@ -83,22 +83,20 @@ protected:
class pqipersongrp; class pqipersongrp;
class NotifyData class NotifyData
{ {
public: public:
NotifyData() NotifyData() : mNi(NULL), mState(0)
:mNi(NULL), mState(0)
{ {
sockaddr_storage_clear(mAddr); sockaddr_storage_clear(mAddr);
} }
NotifyData(NetInterface *ni, int state, const struct sockaddr_storage &addr) NotifyData(NetInterface *ni, int state, const sockaddr_storage &addr) :
:mNi(ni), mState(state), mAddr(addr) { return; } mNi(ni), mState(state), mAddr(addr) {}
NetInterface *mNi; NetInterface *mNi;
int mState; int mState;
struct sockaddr_storage mAddr; sockaddr_storage mAddr;
}; };
@ -106,82 +104,76 @@ class pqiperson: public PQInterface
{ {
public: public:
pqiperson(const RsPeerId& id, pqipersongrp *ppg); pqiperson(const RsPeerId& id, pqipersongrp *ppg);
virtual ~pqiperson(); // must clean up children. virtual ~pqiperson(); // must clean up children.
// control of the connection. // control of the connection.
int reset(); int reset();
int listen(); int listen();
int stoplistening(); int stoplistening();
int connect(uint32_t type, const struct sockaddr_storage &raddr, int connect(uint32_t type, const sockaddr_storage &raddr,
const struct sockaddr_storage &proxyaddr, const struct sockaddr_storage &srcaddr, const sockaddr_storage &proxyaddr, const sockaddr_storage &srcaddr,
uint32_t delay, uint32_t period, uint32_t timeout, uint32_t flags, uint32_t bandwidth, uint32_t delay, uint32_t period, uint32_t timeout, uint32_t flags,
const std::string &domain_addr, uint16_t domain_port); uint32_t bandwidth, const std::string &domain_addr, uint16_t domain_port);
int fullstopthreads(); int fullstopthreads();
int receiveHeartbeat();
int receiveHeartbeat();
// add in connection method. // add in connection method.
int addChildInterface(uint32_t type, pqiconnect *pqi); int addChildInterface(uint32_t type, pqiconnect *pqi);
virtual bool getCryptoParams(RsPeerCryptoParams&) ; virtual bool getCryptoParams(RsPeerCryptoParams&);
// The PQInterface interface. // The PQInterface interface.
virtual int SendItem(RsItem *,uint32_t& serialized_size); virtual int SendItem(RsItem *,uint32_t& serialized_size);
virtual int SendItem(RsItem *item) virtual int SendItem(RsItem *item)
{ {
std::cerr << "Warning pqiperson::sendItem(RsItem*) should not be called. Plz call SendItem(RsItem *,uint32_t& serialized_size) instead." << std::endl; std::cerr << "Warning pqiperson::sendItem(RsItem*) should not be called."
uint32_t serialized_size ; << "Plz call SendItem(RsItem *,uint32_t& serialized_size) instead."
return SendItem(item,serialized_size) ; << std::endl;
} uint32_t serialized_size;
virtual RsItem *GetItem(); return SendItem(item, serialized_size);
virtual bool RecvItem(RsItem *item); }
virtual RsItem *GetItem();
virtual bool RecvItem(RsItem *item);
virtual int status(); virtual int status();
virtual int tick(); virtual int tick();
// overloaded callback function for the child - notify of a change. // overloaded callback function for the child - notify of a change.
virtual int notifyEvent(NetInterface *ni, int event, const struct sockaddr_storage &addr); virtual int notifyEvent(NetInterface *ni, int event, const struct sockaddr_storage &addr);
// PQInterface for rate control overloaded.... // PQInterface for rate control overloaded....
virtual int getQueueSize(bool in); virtual int getQueueSize(bool in);
virtual void getRates(RsBwRates &rates); virtual void getRates(RsBwRates &rates);
virtual float getRate(bool in); virtual float getRate(bool in);
virtual void setMaxRate(bool in, float val); virtual void setMaxRate(bool in, float val);
virtual void setRateCap(float val_in, float val_out); virtual void setRateCap(float val_in, float val_out);
virtual int gatherStatistics(std::list<RSTrafficClue>& outqueue_lst,std::list<RSTrafficClue>& inqueue_lst) ; virtual int gatherStatistics(std::list<RSTrafficClue>& outqueue_lst,
std::list<RSTrafficClue>& inqueue_lst);
private:
void processNotifyEvents();
int handleNotifyEvent_locked(NetInterface *ni, int event,
const sockaddr_storage &addr);
RsMutex mNotifyMtx; // LOCKS Notify Queue
private:
void processNotifyEvents();
int handleNotifyEvent_locked(NetInterface *ni, int event, const struct sockaddr_storage &addr);
RsMutex mNotifyMtx; /**** LOCKS Notify Queue ****/
std::list<NotifyData> mNotifyQueue; std::list<NotifyData> mNotifyQueue;
RsMutex mPersonMtx; /**** LOCKS below ****/ RsMutex mPersonMtx; // LOCKS below
int reset_locked(); int reset_locked();
void setRateCap_locked(float val_in, float val_out); void setRateCap_locked(float val_in, float val_out);
std::map<uint32_t, pqiconnect *> kids; std::map<uint32_t, pqiconnect *> kids;
bool active; bool active;
pqiconnect *activepqi; pqiconnect *activepqi;
bool inConnectAttempt; bool inConnectAttempt;
int waittimes; int waittimes;
time_t lastHeartbeatReceived;//use to track connection failure time_t lastHeartbeatReceived; // use to track connection failure
private: /* Helper functions */
pqipersongrp *pqipg; /* parent for callback */ pqipersongrp *pqipg; /* parent for callback */
}; };
#endif #endif