Reviewed Mutex in pqihandler.

- In general it is okay, except for: DataRate Handling, and Win32 specific: WaitingList.
 - Cleaned up some functions.
 - renamed createPerson and createListener to locked_createPerson and locked_createListener.
       This is required as neither Listener nor Person are thread safe (TODO).




git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@5888 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2012-11-25 16:36:55 +00:00
parent 696e7735a1
commit 0fa8d55062
5 changed files with 50 additions and 69 deletions

View File

@ -194,7 +194,7 @@ bool pqihandler::RemoveSearchModule(SearchModule *mod)
}
// dummy output check
int pqihandler::locked_checkOutgoingRsItem(RsItem */*item*/, int /*global*/)
int pqihandler::locked_checkOutgoingRsItem(RsItem * /*item*/, int /*global*/)
{
//pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "pqihandler::checkOutgoingPQItem() NULL fn");

View File

@ -119,12 +119,13 @@ int pqipersongrp::tick()
* but not to important.
*/
{ RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
if (pqil)
{
pqil -> tick();
}
} /* UNLOCKED */
{
RsStackMutex stack(coreMtx); /******* LOCKED MUTEX **********/
if (pqil)
{
pqil -> tick();
}
} /* UNLOCKED */
int i = 0;
@ -158,12 +159,13 @@ int pqipersongrp::tick()
int pqipersongrp::status()
{
{ RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
if (pqil)
{
pqil -> status();
}
} /* UNLOCKED */
{
RsStackMutex stack(coreMtx); /******* LOCKED MUTEX **********/
if (pqil)
{
pqil -> status();
}
} /* UNLOCKED */
return pqihandler::status();
}
@ -175,6 +177,7 @@ int pqipersongrp::init_listener()
/* extract our information from the p3ConnectMgr */
if (initFlags & PQIPERSON_NO_LISTENER)
{
RsStackMutex stack(coreMtx); /******* LOCKED MUTEX **********/
pqil = NULL;
}
else
@ -183,8 +186,8 @@ int pqipersongrp::init_listener()
*/
struct sockaddr_in laddr = mLinkMgr->getLocalAddress();
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
pqil = createListener(laddr);
RsStackMutex stack(coreMtx); /******* LOCKED MUTEX **********/
pqil = locked_createListener(laddr);
}
return 1;
}
@ -199,7 +202,7 @@ bool pqipersongrp::resetListener(struct sockaddr_in &local)
// change the address.
// restart.
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
RsStackMutex stack(coreMtx); /******* LOCKED MUTEX **********/
if (pqil != NULL)
{
@ -391,33 +394,24 @@ int pqipersongrp::addPeer(std::string id)
#endif
SearchModule *sm = NULL;
{ RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
std::map<std::string, SearchModule *>::iterator it;
it = mods.find(id);
if (it != mods.end())
{
pqioutput(PQL_DEBUG_BASIC, pqipersongrpzone,
"pqipersongrp::addPeer() Peer already in Use!");
std::cerr << " pqipersongrp::addPeer() ERROR Peer already in use! id: " << id;
std::cerr << std::endl;
{
// The Mutex is required here as pqiListener is not thread-safe.
RsStackMutex stack(coreMtx); /******* LOCKED MUTEX **********/
pqiperson *pqip = locked_createPerson(id, pqil);
// attach to pqihandler
sm = new SearchModule();
sm -> peerid = id;
sm -> pqi = pqip;
sm -> sp = secpolicy_create();
// reset it to start it working.
pqioutput(PQL_WARNING, pqipersongrpzone, "pqipersongrp::addPeer() => reset() called to initialise new person");
pqip -> reset();
pqip -> listen();
return -1;
}
pqiperson *pqip = createPerson(id, pqil);
// attach to pqihandler
sm = new SearchModule();
sm -> peerid = id;
sm -> pqi = pqip;
sm -> sp = secpolicy_create();
// reset it to start it working.
pqioutput(PQL_WARNING, pqipersongrpzone, "pqipersongrp::addPeer() => reset() called to initialise new person");
pqip -> reset();
pqip -> listen();
} /* UNLOCKED */
} /* UNLOCKED */
return AddSearchModule(sm);
}
@ -438,8 +432,6 @@ int pqipersongrp::removePeer(std::string id)
if (it != mods.end())
{
SearchModule *mod = it->second;
// Don't duplicate remove!!!
//RemoveSearchModule(mod);
secpolicy_delete(mod -> sp);
pqiperson *p = (pqiperson *) mod -> pqi;
p -> stoplistening();
@ -491,15 +483,12 @@ int pqipersongrp::connectPeer(std::string id
///////////////////////////////////////////////////////////
#endif
)
{
/* get status from p3connectMgr */
#ifdef PGRP_DEBUG
std::cerr << " pqipersongrp::connectPeer() id: " << id << " does nothing yet! ";
std::cerr << std::endl;
#endif
{
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
if (!mLinkMgr)
return 0;
if (id == mLinkMgr->getOwnId())
{
#ifdef PGRP_DEBUG
@ -517,11 +506,6 @@ int pqipersongrp::connectPeer(std::string id
SearchModule *mod = it->second;
pqiperson *p = (pqiperson *) mod -> pqi;
/* get address from p3connmgr */
if (!mLinkMgr)
return 0;
#ifdef WINDOWS_SYS
///////////////////////////////////////////////////////////
// hack for too many connections
@ -653,10 +637,6 @@ int pqipersongrp::connectPeer(std::string id
p->connect(ptype, addr, proxyaddr, srcaddr, delay, period, timeout, flags, bandwidth);
} /* UNLOCKED */
/* */
return 1;
}
@ -687,14 +667,14 @@ bool pqipersongrp::notifyConnect(std::string id, uint32_t ptype, bool success
#include "pqi/pqibin.h"
pqilistener * pqipersongrpDummy::createListener(struct sockaddr_in /*laddr*/)
pqilistener * pqipersongrpDummy::locked_createListener(struct sockaddr_in /*laddr*/)
{
pqilistener *listener = new pqilistener();
return listener;
}
pqiperson * pqipersongrpDummy::createPerson(std::string id, pqilistener */*listener*/)
pqiperson * pqipersongrpDummy::locked_createPerson(std::string id, pqilistener * /*listener*/)
{
pqioutput(PQL_DEBUG_BASIC, pqipersongrpzone, "pqipersongrpDummy::createPerson() PeerId: " + id);

View File

@ -97,8 +97,9 @@ virtual int status();
protected:
/********* FUNCTIONS to OVERLOAD for specialisation ********/
virtual pqilistener *createListener(struct sockaddr_in laddr) = 0;
virtual pqiperson *createPerson(std::string id, pqilistener *listener) = 0;
// THESE NEED TO BE LOCKED UNTIL PQILISTENER IS THREAD-SAFE.
virtual pqilistener *locked_createListener(struct sockaddr_in laddr) = 0;
virtual pqiperson *locked_createPerson(std::string id, pqilistener *listener) = 0;
/********* FUNCTIONS to OVERLOAD for specialisation ********/
/* Overloaded RsItem Check
@ -131,8 +132,8 @@ class pqipersongrpDummy: public pqipersongrp
protected:
/********* FUNCTIONS to OVERLOAD for specialisation ********/
virtual pqilistener *createListener(struct sockaddr_in laddr);
virtual pqiperson *createPerson(std::string id, pqilistener *listener);
virtual pqilistener *locked_createListener(struct sockaddr_in laddr);
virtual pqiperson *locked_createPerson(std::string id, pqilistener *listener);
/********* FUNCTIONS to OVERLOAD for specialisation ********/
};

View File

@ -49,13 +49,13 @@ const int pqipersongrpzone = 354;
#endif
pqilistener * pqisslpersongrp::createListener(struct sockaddr_in laddr)
pqilistener * pqisslpersongrp::locked_createListener(struct sockaddr_in laddr)
{
pqilistener *listener = new pqissllistener(laddr, mPeerMgr);
return listener;
}
pqiperson * pqisslpersongrp::createPerson(std::string id, pqilistener *listener)
pqiperson * pqisslpersongrp::locked_createPerson(std::string id, pqilistener *listener)
{
pqioutput(PQL_DEBUG_BASIC, pqipersongrpzone, "pqipersongrp::createPerson() PeerId: " + id);

View File

@ -41,8 +41,8 @@ class pqisslpersongrp: public pqipersongrp
protected:
/********* FUNCTIONS to OVERLOAD for specialisation ********/
virtual pqilistener *createListener(struct sockaddr_in laddr);
virtual pqiperson *createPerson(std::string id, pqilistener *listener);
virtual pqilistener *locked_createListener(struct sockaddr_in laddr);
virtual pqiperson *locked_createPerson(std::string id, pqilistener *listener);
/********* FUNCTIONS to OVERLOAD for specialisation ********/
private: