diff --git a/libretroshare/src/pqi/pqihandler.cc b/libretroshare/src/pqi/pqihandler.cc index 5013b9a4e..8fdb5fe81 100644 --- a/libretroshare/src/pqi/pqihandler.cc +++ b/libretroshare/src/pqi/pqihandler.cc @@ -39,6 +39,8 @@ const int pqihandlerzone = 34283; pqihandler::pqihandler(SecurityPolicy *Global) { + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ + // The global security.... // if something is disabled here... // cannot be enabled by module. @@ -62,9 +64,12 @@ pqihandler::pqihandler(SecurityPolicy *Global) int pqihandler::tick() { + int moreToTick = 0; + + { RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ + // tick all interfaces... std::map::iterator it; - int moreToTick = 0; for(it = mods.begin(); it != mods.end(); it++) { if (0 < ((it -> second) -> pqi) -> tick()) @@ -76,13 +81,14 @@ int pqihandler::tick() } } // get the items, and queue them correctly - if (0 < GetItems()) + if (0 < locked_GetItems()) { #ifdef DEBUG_TICK std::cerr << "pqihandler::tick() moreToTick from GetItems()" << std::endl; #endif moreToTick = 1; } + } /****** UNLOCK ******/ UpdateRates(); return moreToTick; @@ -92,6 +98,7 @@ int pqihandler::tick() int pqihandler::status() { std::map::iterator it; + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ { // for output std::ostringstream out; @@ -121,6 +128,7 @@ int pqihandler::status() bool pqihandler::AddSearchModule(SearchModule *mod) { + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ // if peerid used -> error. std::map::iterator it; if (mod->peerid != mod->pqi->PeerId()) @@ -167,6 +175,7 @@ bool pqihandler::AddSearchModule(SearchModule *mod) bool pqihandler::RemoveSearchModule(SearchModule *mod) { + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ std::map::iterator it; for(it = mods.begin(); it != mods.end(); it++) { @@ -180,7 +189,7 @@ bool pqihandler::RemoveSearchModule(SearchModule *mod) } // dummy output check -int pqihandler::checkOutgoingRsItem(RsItem *item, int global) +int pqihandler::locked_checkOutgoingRsItem(RsItem *item, int global) { pqioutput(PQL_WARNING, pqihandlerzone, "pqihandler::checkOutgoingPQItem() NULL fn"); @@ -192,6 +201,8 @@ int pqihandler::checkOutgoingRsItem(RsItem *item, int global) // generalised output int pqihandler::HandleRsItem(RsItem *item, int allowglobal) { + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ + std::map::iterator it; pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "pqihandler::HandleRsItem()"); @@ -208,7 +219,7 @@ int pqihandler::HandleRsItem(RsItem *item, int allowglobal) return -1; } - if (!checkOutgoingRsItem(item, allowglobal)) + if (!locked_checkOutgoingRsItem(item, allowglobal)) { std::ostringstream out; out << "pqihandler::HandleRsItem() checkOutgoingPQItem"; @@ -294,7 +305,7 @@ int pqihandler::SendRsRawItem(RsRawItem *ns) // system that is completely biased and slow... // someone please fix. -int pqihandler::GetItems() +int pqihandler::locked_GetItems() { std::map::iterator it; @@ -331,7 +342,7 @@ int pqihandler::GetItems() item->PeerId(mod->pqi->PeerId()); } - SortnStoreItem(item); + locked_SortnStoreItem(item); count++; } } @@ -361,7 +372,7 @@ int pqihandler::GetItems() -void pqihandler::SortnStoreItem(RsItem *item) +void pqihandler::locked_SortnStoreItem(RsItem *item) { /* get class type / subtype out of the item */ uint8_t vers = item -> PacketVersion(); @@ -463,6 +474,8 @@ void pqihandler::SortnStoreItem(RsItem *item) // much like the input stuff. RsCacheItem *pqihandler::GetSearchResult() { + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ + if (in_result.size() != 0) { RsCacheItem *fi = dynamic_cast(in_result.front()); @@ -475,6 +488,8 @@ RsCacheItem *pqihandler::GetSearchResult() RsCacheRequest *pqihandler::RequestedSearch() { + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ + if (in_search.size() != 0) { RsCacheRequest *fi = dynamic_cast(in_search.front()); @@ -487,6 +502,8 @@ RsCacheRequest *pqihandler::RequestedSearch() RsFileRequest *pqihandler::GetFileRequest() { + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ + if (in_request.size() != 0) { RsFileRequest *fi = dynamic_cast(in_request.front()); @@ -499,6 +516,8 @@ RsFileRequest *pqihandler::GetFileRequest() RsFileData *pqihandler::GetFileData() { + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ + if (in_data.size() != 0) { RsFileData *fi = dynamic_cast(in_data.front()); @@ -511,6 +530,8 @@ RsFileData *pqihandler::GetFileData() RsRawItem *pqihandler::GetRsRawItem() { + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ + if (in_service.size() != 0) { RsRawItem *fi = dynamic_cast(in_service.front()); @@ -547,6 +568,9 @@ int pqihandler::UpdateRates() int maxxed_in = 0; int maxxed_out = 0; + /* Lock once rates have been retrieved */ + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ + // loop through modules.... for(it = mods.begin(); it != mods.end(); it++) { @@ -586,7 +610,7 @@ int pqihandler::UpdateRates() //std::cerr << " Excess B/W " << extra_bw_out; //std::cerr << " Available B/W " << avail_out << std::endl; - StoreCurrentRates(used_bw_in, used_bw_out); + locked_StoreCurrentRates(used_bw_in, used_bw_out); if (used_bw_in > avail_in) { @@ -714,11 +738,13 @@ int pqihandler::UpdateRates() void pqihandler::getCurrentRates(float &in, float &out) { + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ + in = rateTotal_in; out = rateTotal_out; } -void pqihandler::StoreCurrentRates(float in, float out) +void pqihandler::locked_StoreCurrentRates(float in, float out) { rateTotal_in = in; rateTotal_out = out; diff --git a/libretroshare/src/pqi/pqihandler.h b/libretroshare/src/pqi/pqihandler.h index 1dc67ec1a..169c7520f 100644 --- a/libretroshare/src/pqi/pqihandler.h +++ b/libretroshare/src/pqi/pqihandler.h @@ -29,6 +29,8 @@ #include "pqi/pqi.h" #include "pqi/pqisecurity.h" +#include "util/rsthreads.h" + #include #include @@ -85,12 +87,14 @@ void getCurrentRates(float &in, float &out); /* check to be overloaded by those that can * generates warnings otherwise */ -virtual int checkOutgoingRsItem(RsItem *item, int global); int HandleRsItem(RsItem *ns, int allowglobal); -int GetItems(); -void SortnStoreItem(RsItem *item); +virtual int locked_checkOutgoingRsItem(RsItem *item, int global); +int locked_GetItems(); +void locked_SortnStoreItem(RsItem *item); + + RsMutex coreMtx; /* MUTEX */ std::map mods; SecurityPolicy *globsec; @@ -103,7 +107,7 @@ void SortnStoreItem(RsItem *item); // rate control. int UpdateRates(); -void StoreCurrentRates(float in, float out); +void locked_StoreCurrentRates(float in, float out); float rateIndiv_in; float rateIndiv_out; @@ -116,6 +120,7 @@ void StoreCurrentRates(float in, float out); inline void pqihandler::setMaxIndivRate(bool in, float val) { + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ if (in) rateIndiv_in = val; else @@ -125,6 +130,7 @@ inline void pqihandler::setMaxIndivRate(bool in, float val) inline float pqihandler::getMaxIndivRate(bool in) { + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ if (in) return rateIndiv_in; else @@ -133,6 +139,7 @@ inline float pqihandler::getMaxIndivRate(bool in) inline void pqihandler::setMaxRate(bool in, float val) { + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ if (in) rateMax_in = val; else @@ -142,6 +149,7 @@ inline void pqihandler::setMaxRate(bool in, float val) inline float pqihandler::getMaxRate(bool in) { + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ if (in) return rateMax_in; else diff --git a/libretroshare/src/pqi/pqipersongrp.cc b/libretroshare/src/pqi/pqipersongrp.cc index 5c0a0ec39..93ccb4fd5 100644 --- a/libretroshare/src/pqi/pqipersongrp.cc +++ b/libretroshare/src/pqi/pqipersongrp.cc @@ -35,6 +35,13 @@ const int pqipersongrpzone = 354; *#define PGRP_DEBUG 1 ****/ +/* MUTEX NOTES: + * Functions like GetRsRawItem() lock itself (pqihandler) and + * likewise ServiceServer and ConfigMgr mutex themselves. + * This means the only things we need to worry about are: + * pqilistener and when accessing pqihandlers data. + */ + // handle the tunnel services. int pqipersongrp::tickServiceRecv() { @@ -76,13 +83,13 @@ int pqipersongrp::tickServiceSend() p3ServiceServer::tick(); - while(NULL != (pqi = outgoing())) + while(NULL != (pqi = outgoing())) /* outgoing has own locking */ { ++i; pqioutput(PQL_DEBUG_BASIC, pqipersongrpzone, "pqipersongrp::tickTunnelServer() OutGoing RsItem"); - SendRsRawItem(pqi); + SendRsRawItem(pqi); /* Locked by pqihandler */ } if (0 < i) { @@ -105,10 +112,13 @@ int pqipersongrp::tick() * but not to important. */ + { RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ if (pqil) { pqil -> tick(); } + } /* UNLOCKED */ + int i = 0; if (tickServiceSend()) @@ -141,10 +151,13 @@ int pqipersongrp::tick() int pqipersongrp::status() { + { RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ if (pqil) { pqil -> status(); } + } /* UNLOCKED */ + return pqihandler::status(); } @@ -164,6 +177,7 @@ int pqipersongrp::init_listener() peerConnectState state; mConnMgr->getOwnNetStatus(state); + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ pqil = createListener(state.localaddr); } return 1; @@ -174,10 +188,18 @@ int pqipersongrp::restart_listener() // stop it, // change the address. // restart. - if (pqil) + bool haveListener = false; + { RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ + haveListener = (pqil != NULL); + } /* UNLOCKED */ + + + if (haveListener) { peerConnectState state; mConnMgr->getOwnNetStatus(state); + + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ pqil -> resetlisten(); pqil -> setListenAddr(state.localaddr); @@ -186,7 +208,9 @@ int pqipersongrp::restart_listener() return 1; } - +/* NOT bothering to protect Config with a mutex.... it is not going to change + * and has its own internal mutexs. + */ int pqipersongrp::setConfig(p3GeneralConfig *cfg) { config = cfg; @@ -286,6 +310,7 @@ int pqipersongrp::addPeer(std::string id) #endif SearchModule *sm = NULL; + { RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ std::map::iterator it; it = mods.find(id); if (it != mods.end()) @@ -306,6 +331,7 @@ int pqipersongrp::addPeer(std::string id) // reset it to start it working. pqip -> reset(); pqip -> listen(); + } /* UNLOCKED */ return AddSearchModule(sm); } @@ -320,6 +346,8 @@ int pqipersongrp::removePeer(std::string id) std::cerr << std::endl; #endif + RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ + it = mods.find(id); if (it != mods.end()) { @@ -343,6 +371,7 @@ int pqipersongrp::connectPeer(std::string id) std::cerr << std::endl; #endif + { RsStackMutex stack(coreMtx); /**************** LOCKED MUTEX ****************/ std::map::iterator it; it = mods.find(id); if (it == mods.end()) @@ -407,6 +436,9 @@ int pqipersongrp::connectPeer(std::string id) p->connect(ptype, addr, delay, period, timeout); + } /* UNLOCKED */ + + /* */ return 1; } diff --git a/libretroshare/src/pqi/pqiservice.cc b/libretroshare/src/pqi/pqiservice.cc index 89b35a9d9..83042d5b4 100644 --- a/libretroshare/src/pqi/pqiservice.cc +++ b/libretroshare/src/pqi/pqiservice.cc @@ -35,6 +35,7 @@ const int pqiservicezone = 60478; p3ServiceServer::p3ServiceServer() { + RsStackMutex stack(srvMtx); /********* LOCKED *********/ #ifdef SERVICE_DEBUG pqioutput(PQL_DEBUG_BASIC, pqiservicezone, @@ -47,6 +48,8 @@ p3ServiceServer::p3ServiceServer() int p3ServiceServer::addService(pqiService *ts) { + RsStackMutex stack(srvMtx); /********* LOCKED *********/ + #ifdef SERVICE_DEBUG pqioutput(PQL_DEBUG_BASIC, pqiservicezone, "p3ServiceServer::addService()"); @@ -67,6 +70,8 @@ int p3ServiceServer::addService(pqiService *ts) int p3ServiceServer::incoming(RsRawItem *item) { + RsStackMutex stack(srvMtx); /********* LOCKED *********/ + #ifdef SERVICE_DEBUG pqioutput(PQL_DEBUG_BASIC, pqiservicezone, "p3ServiceServer::incoming()"); @@ -120,6 +125,7 @@ int p3ServiceServer::incoming(RsRawItem *item) RsRawItem *p3ServiceServer::outgoing() { + RsStackMutex stack(srvMtx); /********* LOCKED *********/ #ifdef SERVICE_DEBUG pqioutput(PQL_DEBUG_ALL, pqiservicezone, @@ -186,6 +192,8 @@ RsRawItem *p3ServiceServer::outgoing() int p3ServiceServer::tick() { + RsStackMutex stack(srvMtx); /********* LOCKED *********/ + #ifdef SERVICE_DEBUG pqioutput(PQL_DEBUG_ALL, pqiservicezone, "p3ServiceServer::tick()"); diff --git a/libretroshare/src/pqi/pqiservice.h b/libretroshare/src/pqi/pqiservice.h index 1dd4ad84f..5286d2013 100644 --- a/libretroshare/src/pqi/pqiservice.h +++ b/libretroshare/src/pqi/pqiservice.h @@ -28,6 +28,7 @@ #define PQI_SERVICE_HEADER #include "pqi/pqi_base.h" +#include "util/rsthreads.h" // PQI Service, is a generic lower layer on which services can run on. // @@ -90,6 +91,7 @@ int tick(); private: + RsMutex srvMtx; std::map services; std::map::iterator rrit;