Added Mutex protection to the core comms (pqihandler + pqipersongrp).

also added a Mutex to the ServiceServer.

  This will hopefully fix the random threading crashes.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@829 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2008-11-22 13:15:07 +00:00
parent c46b823261
commit 4610d1ddac
5 changed files with 93 additions and 17 deletions

View File

@ -39,6 +39,8 @@ const int pqihandlerzone = 34283;
pqihandler::pqihandler(SecurityPolicy *Global) pqihandler::pqihandler(SecurityPolicy *Global)
{ {
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
// The global security.... // The global security....
// if something is disabled here... // if something is disabled here...
// cannot be enabled by module. // cannot be enabled by module.
@ -62,9 +64,12 @@ pqihandler::pqihandler(SecurityPolicy *Global)
int pqihandler::tick() int pqihandler::tick()
{ {
int moreToTick = 0;
{ RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
// tick all interfaces... // tick all interfaces...
std::map<std::string, SearchModule *>::iterator it; std::map<std::string, SearchModule *>::iterator it;
int moreToTick = 0;
for(it = mods.begin(); it != mods.end(); it++) for(it = mods.begin(); it != mods.end(); it++)
{ {
if (0 < ((it -> second) -> pqi) -> tick()) if (0 < ((it -> second) -> pqi) -> tick())
@ -76,13 +81,14 @@ int pqihandler::tick()
} }
} }
// get the items, and queue them correctly // get the items, and queue them correctly
if (0 < GetItems()) if (0 < locked_GetItems())
{ {
#ifdef DEBUG_TICK #ifdef DEBUG_TICK
std::cerr << "pqihandler::tick() moreToTick from GetItems()" << std::endl; std::cerr << "pqihandler::tick() moreToTick from GetItems()" << std::endl;
#endif #endif
moreToTick = 1; moreToTick = 1;
} }
} /****** UNLOCK ******/
UpdateRates(); UpdateRates();
return moreToTick; return moreToTick;
@ -92,6 +98,7 @@ int pqihandler::tick()
int pqihandler::status() int pqihandler::status()
{ {
std::map<std::string, SearchModule *>::iterator it; std::map<std::string, SearchModule *>::iterator it;
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
{ // for output { // for output
std::ostringstream out; std::ostringstream out;
@ -121,6 +128,7 @@ int pqihandler::status()
bool pqihandler::AddSearchModule(SearchModule *mod) bool pqihandler::AddSearchModule(SearchModule *mod)
{ {
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
// if peerid used -> error. // if peerid used -> error.
std::map<std::string, SearchModule *>::iterator it; std::map<std::string, SearchModule *>::iterator it;
if (mod->peerid != mod->pqi->PeerId()) if (mod->peerid != mod->pqi->PeerId())
@ -167,6 +175,7 @@ bool pqihandler::AddSearchModule(SearchModule *mod)
bool pqihandler::RemoveSearchModule(SearchModule *mod) bool pqihandler::RemoveSearchModule(SearchModule *mod)
{ {
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
std::map<std::string, SearchModule *>::iterator it; std::map<std::string, SearchModule *>::iterator it;
for(it = mods.begin(); it != mods.end(); it++) for(it = mods.begin(); it != mods.end(); it++)
{ {
@ -180,7 +189,7 @@ bool pqihandler::RemoveSearchModule(SearchModule *mod)
} }
// dummy output check // dummy output check
int pqihandler::checkOutgoingRsItem(RsItem *item, int global) int pqihandler::locked_checkOutgoingRsItem(RsItem *item, int global)
{ {
pqioutput(PQL_WARNING, pqihandlerzone, pqioutput(PQL_WARNING, pqihandlerzone,
"pqihandler::checkOutgoingPQItem() NULL fn"); "pqihandler::checkOutgoingPQItem() NULL fn");
@ -192,6 +201,8 @@ int pqihandler::checkOutgoingRsItem(RsItem *item, int global)
// generalised output // generalised output
int pqihandler::HandleRsItem(RsItem *item, int allowglobal) int pqihandler::HandleRsItem(RsItem *item, int allowglobal)
{ {
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
std::map<std::string, SearchModule *>::iterator it; std::map<std::string, SearchModule *>::iterator it;
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
"pqihandler::HandleRsItem()"); "pqihandler::HandleRsItem()");
@ -208,7 +219,7 @@ int pqihandler::HandleRsItem(RsItem *item, int allowglobal)
return -1; return -1;
} }
if (!checkOutgoingRsItem(item, allowglobal)) if (!locked_checkOutgoingRsItem(item, allowglobal))
{ {
std::ostringstream out; std::ostringstream out;
out << "pqihandler::HandleRsItem() checkOutgoingPQItem"; out << "pqihandler::HandleRsItem() checkOutgoingPQItem";
@ -294,7 +305,7 @@ int pqihandler::SendRsRawItem(RsRawItem *ns)
// system that is completely biased and slow... // system that is completely biased and slow...
// someone please fix. // someone please fix.
int pqihandler::GetItems() int pqihandler::locked_GetItems()
{ {
std::map<std::string, SearchModule *>::iterator it; std::map<std::string, SearchModule *>::iterator it;
@ -331,7 +342,7 @@ int pqihandler::GetItems()
item->PeerId(mod->pqi->PeerId()); item->PeerId(mod->pqi->PeerId());
} }
SortnStoreItem(item); locked_SortnStoreItem(item);
count++; count++;
} }
} }
@ -361,7 +372,7 @@ int pqihandler::GetItems()
void pqihandler::SortnStoreItem(RsItem *item) void pqihandler::locked_SortnStoreItem(RsItem *item)
{ {
/* get class type / subtype out of the item */ /* get class type / subtype out of the item */
uint8_t vers = item -> PacketVersion(); uint8_t vers = item -> PacketVersion();
@ -463,6 +474,8 @@ void pqihandler::SortnStoreItem(RsItem *item)
// much like the input stuff. // much like the input stuff.
RsCacheItem *pqihandler::GetSearchResult() RsCacheItem *pqihandler::GetSearchResult()
{ {
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
if (in_result.size() != 0) if (in_result.size() != 0)
{ {
RsCacheItem *fi = dynamic_cast<RsCacheItem *>(in_result.front()); RsCacheItem *fi = dynamic_cast<RsCacheItem *>(in_result.front());
@ -475,6 +488,8 @@ RsCacheItem *pqihandler::GetSearchResult()
RsCacheRequest *pqihandler::RequestedSearch() RsCacheRequest *pqihandler::RequestedSearch()
{ {
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
if (in_search.size() != 0) if (in_search.size() != 0)
{ {
RsCacheRequest *fi = dynamic_cast<RsCacheRequest *>(in_search.front()); RsCacheRequest *fi = dynamic_cast<RsCacheRequest *>(in_search.front());
@ -487,6 +502,8 @@ RsCacheRequest *pqihandler::RequestedSearch()
RsFileRequest *pqihandler::GetFileRequest() RsFileRequest *pqihandler::GetFileRequest()
{ {
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
if (in_request.size() != 0) if (in_request.size() != 0)
{ {
RsFileRequest *fi = dynamic_cast<RsFileRequest *>(in_request.front()); RsFileRequest *fi = dynamic_cast<RsFileRequest *>(in_request.front());
@ -499,6 +516,8 @@ RsFileRequest *pqihandler::GetFileRequest()
RsFileData *pqihandler::GetFileData() RsFileData *pqihandler::GetFileData()
{ {
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
if (in_data.size() != 0) if (in_data.size() != 0)
{ {
RsFileData *fi = dynamic_cast<RsFileData *>(in_data.front()); RsFileData *fi = dynamic_cast<RsFileData *>(in_data.front());
@ -511,6 +530,8 @@ RsFileData *pqihandler::GetFileData()
RsRawItem *pqihandler::GetRsRawItem() RsRawItem *pqihandler::GetRsRawItem()
{ {
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
if (in_service.size() != 0) if (in_service.size() != 0)
{ {
RsRawItem *fi = dynamic_cast<RsRawItem *>(in_service.front()); RsRawItem *fi = dynamic_cast<RsRawItem *>(in_service.front());
@ -547,6 +568,9 @@ int pqihandler::UpdateRates()
int maxxed_in = 0; int maxxed_in = 0;
int maxxed_out = 0; int maxxed_out = 0;
/* Lock once rates have been retrieved */
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
// loop through modules.... // loop through modules....
for(it = mods.begin(); it != mods.end(); it++) for(it = mods.begin(); it != mods.end(); it++)
{ {
@ -586,7 +610,7 @@ int pqihandler::UpdateRates()
//std::cerr << " Excess B/W " << extra_bw_out; //std::cerr << " Excess B/W " << extra_bw_out;
//std::cerr << " Available B/W " << avail_out << std::endl; //std::cerr << " Available B/W " << avail_out << std::endl;
StoreCurrentRates(used_bw_in, used_bw_out); locked_StoreCurrentRates(used_bw_in, used_bw_out);
if (used_bw_in > avail_in) if (used_bw_in > avail_in)
{ {
@ -714,11 +738,13 @@ int pqihandler::UpdateRates()
void pqihandler::getCurrentRates(float &in, float &out) void pqihandler::getCurrentRates(float &in, float &out)
{ {
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
in = rateTotal_in; in = rateTotal_in;
out = rateTotal_out; out = rateTotal_out;
} }
void pqihandler::StoreCurrentRates(float in, float out) void pqihandler::locked_StoreCurrentRates(float in, float out)
{ {
rateTotal_in = in; rateTotal_in = in;
rateTotal_out = out; rateTotal_out = out;

View File

@ -29,6 +29,8 @@
#include "pqi/pqi.h" #include "pqi/pqi.h"
#include "pqi/pqisecurity.h" #include "pqi/pqisecurity.h"
#include "util/rsthreads.h"
#include <map> #include <map>
#include <list> #include <list>
@ -85,12 +87,14 @@ void getCurrentRates(float &in, float &out);
/* check to be overloaded by those that can /* check to be overloaded by those that can
* generates warnings otherwise * generates warnings otherwise
*/ */
virtual int checkOutgoingRsItem(RsItem *item, int global);
int HandleRsItem(RsItem *ns, int allowglobal); int HandleRsItem(RsItem *ns, int allowglobal);
int GetItems(); virtual int locked_checkOutgoingRsItem(RsItem *item, int global);
void SortnStoreItem(RsItem *item); int locked_GetItems();
void locked_SortnStoreItem(RsItem *item);
RsMutex coreMtx; /* MUTEX */
std::map<std::string, SearchModule *> mods; std::map<std::string, SearchModule *> mods;
SecurityPolicy *globsec; SecurityPolicy *globsec;
@ -103,7 +107,7 @@ void SortnStoreItem(RsItem *item);
// rate control. // rate control.
int UpdateRates(); int UpdateRates();
void StoreCurrentRates(float in, float out); void locked_StoreCurrentRates(float in, float out);
float rateIndiv_in; float rateIndiv_in;
float rateIndiv_out; float rateIndiv_out;
@ -116,6 +120,7 @@ void StoreCurrentRates(float in, float out);
inline void pqihandler::setMaxIndivRate(bool in, float val) inline void pqihandler::setMaxIndivRate(bool in, float val)
{ {
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
if (in) if (in)
rateIndiv_in = val; rateIndiv_in = val;
else else
@ -125,6 +130,7 @@ inline void pqihandler::setMaxIndivRate(bool in, float val)
inline float pqihandler::getMaxIndivRate(bool in) inline float pqihandler::getMaxIndivRate(bool in)
{ {
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
if (in) if (in)
return rateIndiv_in; return rateIndiv_in;
else else
@ -133,6 +139,7 @@ inline float pqihandler::getMaxIndivRate(bool in)
inline void pqihandler::setMaxRate(bool in, float val) inline void pqihandler::setMaxRate(bool in, float val)
{ {
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
if (in) if (in)
rateMax_in = val; rateMax_in = val;
else else
@ -142,6 +149,7 @@ inline void pqihandler::setMaxRate(bool in, float val)
inline float pqihandler::getMaxRate(bool in) inline float pqihandler::getMaxRate(bool in)
{ {
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
if (in) if (in)
return rateMax_in; return rateMax_in;
else else

View File

@ -35,6 +35,13 @@ const int pqipersongrpzone = 354;
*#define PGRP_DEBUG 1 *#define PGRP_DEBUG 1
****/ ****/
/* MUTEX NOTES:
* Functions like GetRsRawItem() lock itself (pqihandler) and
* likewise ServiceServer and ConfigMgr mutex themselves.
* This means the only things we need to worry about are:
* pqilistener and when accessing pqihandlers data.
*/
// handle the tunnel services. // handle the tunnel services.
int pqipersongrp::tickServiceRecv() int pqipersongrp::tickServiceRecv()
{ {
@ -76,13 +83,13 @@ int pqipersongrp::tickServiceSend()
p3ServiceServer::tick(); p3ServiceServer::tick();
while(NULL != (pqi = outgoing())) while(NULL != (pqi = outgoing())) /* outgoing has own locking */
{ {
++i; ++i;
pqioutput(PQL_DEBUG_BASIC, pqipersongrpzone, pqioutput(PQL_DEBUG_BASIC, pqipersongrpzone,
"pqipersongrp::tickTunnelServer() OutGoing RsItem"); "pqipersongrp::tickTunnelServer() OutGoing RsItem");
SendRsRawItem(pqi); SendRsRawItem(pqi); /* Locked by pqihandler */
} }
if (0 < i) if (0 < i)
{ {
@ -105,10 +112,13 @@ int pqipersongrp::tick()
* but not to important. * but not to important.
*/ */
{ RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
if (pqil) if (pqil)
{ {
pqil -> tick(); pqil -> tick();
} }
} /* UNLOCKED */
int i = 0; int i = 0;
if (tickServiceSend()) if (tickServiceSend())
@ -141,10 +151,13 @@ int pqipersongrp::tick()
int pqipersongrp::status() int pqipersongrp::status()
{ {
{ RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
if (pqil) if (pqil)
{ {
pqil -> status(); pqil -> status();
} }
} /* UNLOCKED */
return pqihandler::status(); return pqihandler::status();
} }
@ -164,6 +177,7 @@ int pqipersongrp::init_listener()
peerConnectState state; peerConnectState state;
mConnMgr->getOwnNetStatus(state); mConnMgr->getOwnNetStatus(state);
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
pqil = createListener(state.localaddr); pqil = createListener(state.localaddr);
} }
return 1; return 1;
@ -174,11 +188,19 @@ int pqipersongrp::restart_listener()
// stop it, // stop it,
// change the address. // change the address.
// restart. // restart.
if (pqil) bool haveListener = false;
{ RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
haveListener = (pqil != NULL);
} /* UNLOCKED */
if (haveListener)
{ {
peerConnectState state; peerConnectState state;
mConnMgr->getOwnNetStatus(state); mConnMgr->getOwnNetStatus(state);
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
pqil -> resetlisten(); pqil -> resetlisten();
pqil -> setListenAddr(state.localaddr); pqil -> setListenAddr(state.localaddr);
pqil -> setuplisten(); pqil -> setuplisten();
@ -186,7 +208,9 @@ int pqipersongrp::restart_listener()
return 1; return 1;
} }
/* NOT bothering to protect Config with a mutex.... it is not going to change
* and has its own internal mutexs.
*/
int pqipersongrp::setConfig(p3GeneralConfig *cfg) int pqipersongrp::setConfig(p3GeneralConfig *cfg)
{ {
config = cfg; config = cfg;
@ -286,6 +310,7 @@ int pqipersongrp::addPeer(std::string id)
#endif #endif
SearchModule *sm = NULL; SearchModule *sm = NULL;
{ RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
std::map<std::string, SearchModule *>::iterator it; std::map<std::string, SearchModule *>::iterator it;
it = mods.find(id); it = mods.find(id);
if (it != mods.end()) if (it != mods.end())
@ -306,6 +331,7 @@ int pqipersongrp::addPeer(std::string id)
// reset it to start it working. // reset it to start it working.
pqip -> reset(); pqip -> reset();
pqip -> listen(); pqip -> listen();
} /* UNLOCKED */
return AddSearchModule(sm); return AddSearchModule(sm);
} }
@ -320,6 +346,8 @@ int pqipersongrp::removePeer(std::string id)
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
it = mods.find(id); it = mods.find(id);
if (it != mods.end()) if (it != mods.end())
{ {
@ -343,6 +371,7 @@ int pqipersongrp::connectPeer(std::string id)
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
{ RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/
std::map<std::string, SearchModule *>::iterator it; std::map<std::string, SearchModule *>::iterator it;
it = mods.find(id); it = mods.find(id);
if (it == mods.end()) if (it == mods.end())
@ -407,6 +436,9 @@ int pqipersongrp::connectPeer(std::string id)
p->connect(ptype, addr, delay, period, timeout); p->connect(ptype, addr, delay, period, timeout);
} /* UNLOCKED */
/* */ /* */
return 1; return 1;
} }

View File

@ -35,6 +35,7 @@ const int pqiservicezone = 60478;
p3ServiceServer::p3ServiceServer() p3ServiceServer::p3ServiceServer()
{ {
RsStackMutex stack(srvMtx); /********* LOCKED *********/
#ifdef SERVICE_DEBUG #ifdef SERVICE_DEBUG
pqioutput(PQL_DEBUG_BASIC, pqiservicezone, pqioutput(PQL_DEBUG_BASIC, pqiservicezone,
@ -47,6 +48,8 @@ p3ServiceServer::p3ServiceServer()
int p3ServiceServer::addService(pqiService *ts) int p3ServiceServer::addService(pqiService *ts)
{ {
RsStackMutex stack(srvMtx); /********* LOCKED *********/
#ifdef SERVICE_DEBUG #ifdef SERVICE_DEBUG
pqioutput(PQL_DEBUG_BASIC, pqiservicezone, pqioutput(PQL_DEBUG_BASIC, pqiservicezone,
"p3ServiceServer::addService()"); "p3ServiceServer::addService()");
@ -67,6 +70,8 @@ int p3ServiceServer::addService(pqiService *ts)
int p3ServiceServer::incoming(RsRawItem *item) int p3ServiceServer::incoming(RsRawItem *item)
{ {
RsStackMutex stack(srvMtx); /********* LOCKED *********/
#ifdef SERVICE_DEBUG #ifdef SERVICE_DEBUG
pqioutput(PQL_DEBUG_BASIC, pqiservicezone, pqioutput(PQL_DEBUG_BASIC, pqiservicezone,
"p3ServiceServer::incoming()"); "p3ServiceServer::incoming()");
@ -120,6 +125,7 @@ int p3ServiceServer::incoming(RsRawItem *item)
RsRawItem *p3ServiceServer::outgoing() RsRawItem *p3ServiceServer::outgoing()
{ {
RsStackMutex stack(srvMtx); /********* LOCKED *********/
#ifdef SERVICE_DEBUG #ifdef SERVICE_DEBUG
pqioutput(PQL_DEBUG_ALL, pqiservicezone, pqioutput(PQL_DEBUG_ALL, pqiservicezone,
@ -186,6 +192,8 @@ RsRawItem *p3ServiceServer::outgoing()
int p3ServiceServer::tick() int p3ServiceServer::tick()
{ {
RsStackMutex stack(srvMtx); /********* LOCKED *********/
#ifdef SERVICE_DEBUG #ifdef SERVICE_DEBUG
pqioutput(PQL_DEBUG_ALL, pqiservicezone, pqioutput(PQL_DEBUG_ALL, pqiservicezone,
"p3ServiceServer::tick()"); "p3ServiceServer::tick()");

View File

@ -28,6 +28,7 @@
#define PQI_SERVICE_HEADER #define PQI_SERVICE_HEADER
#include "pqi/pqi_base.h" #include "pqi/pqi_base.h"
#include "util/rsthreads.h"
// PQI Service, is a generic lower layer on which services can run on. // PQI Service, is a generic lower layer on which services can run on.
// //
@ -90,6 +91,7 @@ int tick();
private: private:
RsMutex srvMtx;
std::map<uint32_t, pqiService *> services; std::map<uint32_t, pqiService *> services;
std::map<uint32_t, pqiService *>::iterator rrit; std::map<uint32_t, pqiService *>::iterator rrit;