NETWORK REWORK (cont)

pqiperson / pqipersongrp: generally okay, One Nasty BUG.

  * removed pqipersongrp::getPeer() fn. Violated Mutex protections!!!!
  * reworked heartbeat code, which used getPeer() fn.
  * switched all notifyConnect() calls to include remote_addr.
  * added explicit stoplistening() call when removing peer.
  * removed funny DO_NEXT_ATTEMPT callback code.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@3216 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2010-06-25 22:00:38 +00:00
parent 00d0d999dc
commit 9f4deb1cf7
3 changed files with 44 additions and 49 deletions

View File

@ -81,7 +81,6 @@ int pqiperson::SendItem(RsItem *i)
out << " Now deleting..."; out << " Now deleting...";
delete i; delete i;
} }
pqioutput(PQL_DEBUG_BASIC, pqipersonzone, out.str()); pqioutput(PQL_DEBUG_BASIC, pqipersonzone, out.str());
return 0; // queued. return 0; // queued.
} }
@ -173,7 +172,7 @@ int pqiperson::notifyEvent(NetInterface *ni, int newState)
} }
/* find the pqi, */ /* find the pqi, */
pqiconnect *pqi = NULL; pqiconnect *pqi = NULL;
uint32_t type = 0; uint32_t type = 0;
std::map<uint32_t, pqiconnect *>::iterator it; std::map<uint32_t, pqiconnect *>::iterator it;
@ -184,16 +183,16 @@ int pqiperson::notifyEvent(NetInterface *ni, int newState)
std::ostringstream out; std::ostringstream out;
out << "pqiperson::connectattempt() Kid# "; out << "pqiperson::connectattempt() Kid# ";
out << i << " of " << kids.size(); out << i << " of " << kids.size();
out << std::endl; out << std::endl;
out << " type: " << (it->first); out << " type: " << (it->first);
out << " ni: " << (it->second)->ni; out << " ni: " << (it->second)->ni;
out << " in_ni: " << ni; out << " in_ni: " << ni;
pqioutput(PQL_DEBUG_BASIC, pqipersonzone, out.str()); pqioutput(PQL_DEBUG_BASIC, pqipersonzone, out.str());
i++; i++;
if ((it->second)->thisNetInterface(ni)) if ((it->second)->thisNetInterface(ni))
{ {
pqi = (it->second); pqi = (it->second);
type = (it->first); type = (it->first);
} }
} }
@ -203,6 +202,7 @@ int pqiperson::notifyEvent(NetInterface *ni, int newState)
pqioutput(PQL_WARNING, pqipersonzone, "Unknown notfyEvent Source!"); pqioutput(PQL_WARNING, pqipersonzone, "Unknown notfyEvent Source!");
return -1; return -1;
} }
switch(newState) switch(newState)
{ {
@ -233,11 +233,11 @@ int pqiperson::notifyEvent(NetInterface *ni, int newState)
pqioutput(PQL_WARNING, pqipersonzone, pqioutput(PQL_WARNING, pqipersonzone,
"CONNECT_SUCCESS->marking so! (resetting others)"); "CONNECT_SUCCESS->marking so! (resetting others)");
// mark as active. // mark as active.
active = true; active = true;
lastHeartbeatReceived = 0; lastHeartbeatReceived = 0;
activepqi = pqi; activepqi = pqi;
inConnectAttempt = false; inConnectAttempt = false;
/* reset all other children? (clear up long UDP attempt) */ /* reset all other children? (clear up long UDP attempt) */
for(it = kids.begin(); it != kids.end(); it++) for(it = kids.begin(); it != kids.end(); it++)
@ -262,7 +262,7 @@ int pqiperson::notifyEvent(NetInterface *ni, int newState)
if (active) if (active)
{ {
if (activepqi->thisNetInterface(ni)) if (activepqi == pqi)
{ {
pqioutput(PQL_WARNING, pqipersonzone, pqioutput(PQL_WARNING, pqipersonzone,
"CONNECT_FAILED->marking so!"); "CONNECT_FAILED->marking so!");
@ -280,14 +280,12 @@ int pqiperson::notifyEvent(NetInterface *ni, int newState)
} }
/* notify up (But not if we are actually active: rtn -1 case above) */ /* notify up (But not if we are actually active: rtn -1 case above) */
if (!active) { if (pqipg)
if (pqipg) {
pqipg->notifyConnect(PeerId(), type, false); struct sockaddr_in raddr;
} else { sockaddr_clear(&raddr);
if (pqipg) pqipg->notifyConnect(PeerId(), type, false, raddr);
pqipg->notifyConnect(PeerId(), PQI_CONNECT_DO_NEXT_ATTEMPT, false); }
return -1;
}
return 1; return 1;

View File

@ -290,7 +290,6 @@ void pqipersongrp::statusChange(const std::list<pqipeer> &plist)
if (it->actions & RS_PEER_CONNECT_REQ) if (it->actions & RS_PEER_CONNECT_REQ)
{ {
connectPeer(it->id); connectPeer(it->id);
} }
} }
@ -366,6 +365,7 @@ int pqipersongrp::removePeer(std::string id)
//RemoveSearchModule(mod); //RemoveSearchModule(mod);
secpolicy_delete(mod -> sp); secpolicy_delete(mod -> sp);
pqiperson *p = (pqiperson *) mod -> pqi; pqiperson *p = (pqiperson *) mod -> pqi;
p -> stoplistening();
p -> reset(); p -> reset();
delete p; delete p;
mods.erase(it); mods.erase(it);
@ -373,27 +373,33 @@ int pqipersongrp::removePeer(std::string id)
return 1; return 1;
} }
pqiperson *pqipersongrp::getPeer(std::string id) int pqipersongrp::tagHeartbeatRecvd(std::string id)
{ {
std::map<std::string, SearchModule *>::iterator it; std::map<std::string, SearchModule *>::iterator it;
#ifdef PGRP_DEBUG #ifdef PGRP_DEBUG
std::cerr << " pqipersongrp::getPeer() id: " << id; std::cerr << " pqipersongrp::tagHeartbeatRecvd() id: " << id;
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
it = mods.find(id); it = mods.find(id);
if (it != mods.end()) if (it != mods.end())
{ {
SearchModule *mod = it->second; SearchModule *mod = it->second;
pqiperson *p = (pqiperson *) mod -> pqi; pqiperson *p = (pqiperson *) mod -> pqi;
return p; p->receiveHeartbeat();
} return 1;
return NULL; }
return 0;
} }
int pqipersongrp::connectPeer(std::string id) int pqipersongrp::connectPeer(std::string id)
{ {
/* get status from p3connectMgr */ /* get status from p3connectMgr */
@ -489,16 +495,9 @@ int pqipersongrp::connectPeer(std::string id)
return 1; return 1;
} }
bool pqipersongrp::notifyConnect(std::string id, uint32_t ptype, bool success) { bool pqipersongrp::notifyConnect(std::string id, uint32_t ptype, bool success, struct sockaddr_in raddr)
struct sockaddr_in remote_peer_address;
sockaddr_clear(&remote_peer_address);
return notifyConnect(id, ptype, success, remote_peer_address);
}
bool pqipersongrp::notifyConnect(std::string id, uint32_t ptype, bool success, struct sockaddr_in remote_peer_address)
{ {
uint32_t type = 0; uint32_t type = 0;
if (ptype == PQI_CONNECT_TCP) if (ptype == PQI_CONNECT_TCP)
{ {
type = RS_NET_CONN_TCP_ALL; type = RS_NET_CONN_TCP_ALL;
@ -512,13 +511,9 @@ bool pqipersongrp::notifyConnect(std::string id, uint32_t ptype, bool success
type = RS_NET_CONN_TUNNEL; type = RS_NET_CONN_TUNNEL;
} }
if (mConnMgr) {
if (ptype == PQI_CONNECT_DO_NEXT_ATTEMPT) { if (mConnMgr)
mConnMgr->doNextAttempt(id); mConnMgr->connectResult(id, success, type, raddr);
} else {
mConnMgr->connectResult(id, success, type, remote_peer_address);
}
}
return (NULL != mConnMgr); return (NULL != mConnMgr);
} }

View File

@ -67,13 +67,15 @@ virtual void statusChange(const std::list<pqipeer> &plist);
/******************* Peer Control **************************/ /******************* Peer Control **************************/
virtual int addPeer(std::string id); /* can be overloaded for testing */ virtual int addPeer(std::string id); /* can be overloaded for testing */
virtual pqiperson *getPeer(std::string id); /* can be overloaded for testing */
int removePeer(std::string id); int removePeer(std::string id);
int connectPeer(std::string id); int connectPeer(std::string id);
/* Work-around to dodgy pointer stuff */
int tagHeartbeatRecvd(std::string id);
/*** callback from children ****/ /*** callback from children ****/
bool notifyConnect(std::string id, uint32_t type, bool success, struct sockaddr_in remote_peer_address); bool notifyConnect(std::string id, uint32_t type, bool success, struct sockaddr_in remote_peer_address);
bool notifyConnect(std::string id, uint32_t type, bool success); //bool notifyConnect(std::string id, uint32_t type, bool success);
// tick interfaces. // tick interfaces.
virtual int tick(); virtual int tick();