Completed Mutex protection for chat, msgs, and disc services.

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@334 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2008-02-04 21:40:34 +00:00
parent 54063ab434
commit 925b1a780e
5 changed files with 143 additions and 71 deletions

View File

@ -25,25 +25,14 @@
#include "services/p3chatservice.h"
#include "pqi/pqidebug.h"
#include <sstream>
/**************** PQI_USE_XPGP ******************/
#if defined(PQI_USE_XPGP)
#define CHAT_DEBUG 1
#include "pqi/xpgpcert.h"
#else /* X509 Certificates */
/**************** PQI_USE_XPGP ******************/
#include "pqi/sslcert.h"
#endif /* X509 Certificates */
/**************** PQI_USE_XPGP ******************/
const int p3chatzone = 1745;
/************ NOTE *********************************
* This Service is so simple that there is no
* mutex protection required!
*
*/
p3ChatService::p3ChatService(p3ConnectMgr *cm)
:p3Service(RS_SERVICE_TYPE_CHAT), mConnMgr(cm)
@ -53,15 +42,23 @@ p3ChatService::p3ChatService(p3ConnectMgr *cm)
int p3ChatService::tick()
{
pqioutput(PQL_DEBUG_BASIC, p3chatzone,
"p3ChatService::tick()");
#ifdef CHAT_DEBUG
std::cerr << "p3ChatService::tick()";
std::cerr << std::endl;
#endif
return 0;
}
int p3ChatService::status()
{
pqioutput(PQL_DEBUG_BASIC, p3chatzone,
"p3ChatService::status()");
#ifdef CHAT_DEBUG
std::cerr << "p3ChatService::status()";
std::cerr << std::endl;
#endif
return 1;
}
@ -78,11 +75,13 @@ int p3ChatService::sendChat(std::wstring msg)
/* add in own id -> so get reflection */
ids.push_back(mConnMgr->getOwnId());
#ifdef CHAT_DEBUG
std::cerr << "p3ChatService::sendChat()";
std::cerr << std::endl;
#endif
for(it = ids.begin(); it != ids.end(); it++)
{
pqioutput(PQL_DEBUG_BASIC, p3chatzone,
"p3ChatService::sendChat()");
RsChatItem *ci = new RsChatItem();
ci->PeerId(*it);
@ -90,13 +89,13 @@ int p3ChatService::sendChat(std::wstring msg)
ci->sendTime = time(NULL);
ci->message = msg;
{
std::ostringstream out;
out << "Chat Item we are sending:" << std::endl;
ci -> print(out);
pqioutput(PQL_DEBUG_BASIC, p3chatzone, out.str());
}
#ifdef CHAT_DEBUG
std::cerr << "p3ChatService::sendChat() Item:";
std::cerr << std::endl;
ci->print(std::cerr);
std::cerr << std::endl;
#endif
sendItem(ci);
}
@ -106,8 +105,10 @@ int p3ChatService::sendChat(std::wstring msg)
int p3ChatService::sendPrivateChat(std::wstring msg, std::string id)
{
// make chat item....
pqioutput(PQL_DEBUG_BASIC, p3chatzone,
"p3ChatService::sendPrivateChat()");
#ifdef CHAT_DEBUG
std::cerr << "p3ChatService::sendPrivateChat()";
std::cerr << std::endl;
#endif
RsChatItem *ci = new RsChatItem();
@ -116,13 +117,12 @@ int p3ChatService::sendPrivateChat(std::wstring msg, std::string id)
ci->sendTime = time(NULL);
ci->message = msg;
{
std::ostringstream out;
out << "Private Chat Item we are sending:" << std::endl;
ci -> print(out);
out << "Sending to:" << id << std::endl;
pqioutput(PQL_DEBUG_BASIC, p3chatzone, out.str());
}
#ifdef CHAT_DEBUG
std::cerr << "p3ChatService::sendPrivateChat() Item:";
std::cerr << std::endl;
ci->print(std::cerr);
std::cerr << std::endl;
#endif
sendItem(ci);

View File

@ -58,6 +58,11 @@ const uint32_t P3DISC_FLAGS_OWN_DETAILS = 0x0010;
#define P3DISC_DEBUG 1
/*********** NOTE ***************
*
* Only need Mutexs for neighbours information
*/
/******************************************************************************************
****************************** NEW DISCOVERY *******************************************
******************************************************************************************
@ -66,6 +71,8 @@ const uint32_t P3DISC_FLAGS_OWN_DETAILS = 0x0010;
p3disc::p3disc(p3AuthMgr *am, p3ConnectMgr *cm)
:p3Service(RS_SERVICE_TYPE_DISC), mAuthMgr(am), mConnMgr(cm)
{
RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/
addSerialType(new RsDiscSerialiser());
mRemoteDisc = true;
@ -98,6 +105,13 @@ int p3disc::handleIncoming()
std::cerr << std::endl;
#endif
bool discOn;
{
RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/
discOn = mRemoteDisc;
}
// if off discard item.
if (!mRemoteDisc)
{
@ -542,8 +556,9 @@ void p3disc::recvPeerFriendMsg(RsDiscReply *item)
int p3disc::addDiscoveryData(std::string fromId, std::string aboutId,
struct sockaddr_in laddr, struct sockaddr_in raddr, uint32_t flags, time_t ts)
{
/* Store Network information */
RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/
/* Store Network information */
std::map<std::string, autoneighbour>::iterator it;
if (neighbours.end() == (it = neighbours.find(aboutId)))
{
@ -604,6 +619,9 @@ int p3disc::addDiscoveryData(std::string fromId, std::string aboutId,
bool p3disc::potentialproxies(std::string id, std::list<std::string> proxyIds)
{
/* find id -> and extract the neighbour_of ids */
RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/
std::map<std::string, autoneighbour>::iterator it;
std::map<std::string, autoserver>::iterator sit;
if (neighbours.end() == (it = neighbours.find(id)))
@ -622,6 +640,8 @@ bool p3disc::potentialproxies(std::string id, std::list<std::string> proxyIds)
int p3disc::idServers()
{
RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/
std::map<std::string, autoneighbour>::iterator nit;
std::map<std::string, autoserver>::iterator sit;
int cts = time(NULL);
@ -652,7 +672,7 @@ int p3disc::idServers()
out << std::endl;
out << "\t\tLocalAddr: ";
out << inet_ntoa(sit->second.localAddr.sin_addr);
out <<":"<< ntohs(sit->second.remoteAddr.sin_port);
out <<":"<< ntohs(sit->second.localAddr.sin_port);
out << std::endl;
out << "\t\tRemoteAddr: ";
out << inet_ntoa(sit->second.remoteAddr.sin_addr);

View File

@ -110,13 +110,16 @@ int addDiscoveryData(std::string fromId, std::string aboutId,
bool potentialproxies(std::string id, std::list<std::string> proxyIds);
int idServers();
/* data */
private:
p3AuthMgr *mAuthMgr;
p3ConnectMgr *mConnMgr;
/* data */
RsMutex mDiscMtx;
bool mRemoteDisc;
bool mLocalDisc;

View File

@ -49,27 +49,49 @@ const int msgservicezone = 54319;
* (3) from storage...
*/
static unsigned int msgUniqueId = 1;
unsigned int getNewUniqueMsgId()
{
return msgUniqueId++;
}
p3MsgService::p3MsgService(p3ConnectMgr *cm)
:p3Service(RS_SERVICE_TYPE_MSG), mConnMgr(cm),
msgChanged(1), msgMajorChanged(1)
msgChanged(1), mMsgUniqueId(1)
{
addSerialType(new RsMsgSerialiser());
}
bool p3MsgService::ModifiedMsgs()
uint32_t p3MsgService::getNewUniqueMsgId()
{
bool m1 = msgChanged.Changed();
bool m2 = msgMajorChanged.Changed();
return (m1 || m2);
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
return mMsgUniqueId++;
}
/****** Mods/Notifications ****/
bool p3MsgService::MsgsChanged()
{
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
bool m1 = msgChanged.Changed();
return (m1);
}
bool p3MsgService::MsgNotifications()
{
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
return (msgNotifications.size() > 0);
}
bool p3MsgService::getMessageNotifications(std::list<MsgInfoSummary> &noteList)
{
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
noteList = msgNotifications;
msgNotifications.clear();
return (noteList.size() > 0);
}
int p3MsgService::tick()
{
pqioutput(PQL_DEBUG_BASIC, msgservicezone,
@ -102,27 +124,30 @@ int p3MsgService::incomingMsgs()
{
++i;
mi -> recvTime = time(NULL);
mi -> msgFlags = RS_MSG_FLAGS_NEW;
mi -> msgId = getNewUniqueMsgId();
std::string mesg;
RsStackMutex stack(mMsgMtx); /*** STACK LOCKED MTX ***/
if (mi -> PeerId() == mConnMgr->getOwnId())
{
/* from the loopback device */
mi -> msgFlags = RS_MSG_FLAGS_OUTGOING;
mi -> msgFlags |= RS_MSG_FLAGS_OUTGOING;
}
else
{
/* from a peer */
mi -> msgFlags = 0;
MsgInfoSummary mis;
initRsMIS(mi, mis);
msgNotifications.push_back(mis);
}
/* new as well! */
mi -> msgFlags |= RS_MSG_FLAGS_NEW;
/* STORE MsgID */
mi -> msgId = getNewUniqueMsgId();
imsg[mi->msgId] = mi;
msgChanged.IndicateChanged();
/**** STACK UNLOCKED ***/
}
return 1;
}
@ -139,8 +164,9 @@ int p3MsgService::checkOutgoingMessages()
std::list<uint32_t>::iterator it;
std::list<uint32_t> toErase;
std::map<uint32_t, RsMsgItem *>::iterator mit;
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
std::map<uint32_t, RsMsgItem *>::iterator mit;
for(mit = msgOutgoing.begin(); mit != msgOutgoing.end(); mit++)
{
@ -204,6 +230,9 @@ int p3MsgService::save_config()
/* now we create a pqiarchive, and stream all the msgs into it
*/
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
std::string statelog = config_dir + "/msgs.rst";
RsSerialiser *rss = new RsSerialiser();
rss->addSerialType(new RsMsgSerialiser());
@ -256,6 +285,8 @@ int p3MsgService::load_config()
RsItem *item;
RsMsgItem *mitem;
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
while((item = pa_in -> GetItem()))
{
if (NULL != (mitem = dynamic_cast<RsMsgItem *>(item)))
@ -313,6 +344,8 @@ void p3MsgService::loadWelcomeMsg()
msg -> msgId = getNewUniqueMsgId();
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
imsg[msg->msgId] = msg;
}
@ -331,6 +364,8 @@ bool p3MsgService::getMessageSummaries(std::list<MsgInfoSummary> &msgList)
/* do stuff */
msgList.clear();
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
std::map<uint32_t, RsMsgItem *>::iterator mit;
for(mit = imsg.begin(); mit != imsg.end(); mit++)
{
@ -354,6 +389,8 @@ bool p3MsgService::getMessage(std::string mId, MessageInfo &msg)
std::map<uint32_t, RsMsgItem *>::iterator mit;
uint32_t msgId = atoi(mId.c_str());
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
mit = imsg.find(msgId);
if (mit == imsg.end())
{
@ -376,6 +413,8 @@ bool p3MsgService::removeMsgId(std::string mid)
std::map<uint32_t, RsMsgItem *>::iterator mit;
uint32_t msgId = atoi(mid.c_str());
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
mit = imsg.find(msgId);
if (mit != imsg.end())
{
@ -383,7 +422,6 @@ bool p3MsgService::removeMsgId(std::string mid)
imsg.erase(mit);
delete mi;
msgChanged.IndicateChanged();
msgMajorChanged.IndicateChanged();
return true;
}
@ -395,7 +433,6 @@ bool p3MsgService::removeMsgId(std::string mid)
msgOutgoing.erase(mit);
delete mi;
msgChanged.IndicateChanged();
msgMajorChanged.IndicateChanged();
return true;
}
@ -408,6 +445,8 @@ bool p3MsgService::markMsgIdRead(std::string mid)
std::map<uint32_t, RsMsgItem *>::iterator mit;
uint32_t msgId = atoi(mid.c_str());
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
mit = imsg.find(msgId);
if (mit != imsg.end())
{
@ -427,6 +466,8 @@ int p3MsgService::sendMessage(RsMsgItem *item)
pqioutput(PQL_DEBUG_BASIC, msgservicezone,
"p3MsgService::sendMessage()");
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
/* add pending flag */
item->msgFlags |=
(RS_MSG_FLAGS_OUTGOING |

View File

@ -47,7 +47,10 @@ class p3MsgService: public p3Service
p3MsgService(p3ConnectMgr *cm);
/* External Interface */
bool ModifiedMsgs();
bool MsgsChanged(); /* should update display */
bool MsgNotifications(); /* popup - messages */
bool getMessageNotifications(std::list<MsgInfoSummary> &noteList);
bool getMessageSummaries(std::list<MsgInfoSummary> &msgList);
bool getMessage(std::string mid, MessageInfo &msg);
@ -74,6 +77,7 @@ int status();
private:
uint32_t getNewUniqueMsgId();
int sendMessage(RsMsgItem *item);
int incomingMsgs();
@ -81,18 +85,22 @@ void initRsMI(RsMsgItem *msg, MessageInfo &mi);
void initRsMIS(RsMsgItem *msg, MsgInfoSummary &mis);
RsMsgItem *initMIRsMsg(MessageInfo &info, std::string to);
p3ConnectMgr *mConnMgr;
/* Mutex Required for stuff below */
RsMutex msgMtx;
RsMutex mMsgMtx;
std::map<uint32_t, RsMsgItem *> imsg;
std::map<uint32_t, RsMsgItem *> msgOutgoing; /* ones that haven't made it out yet! */
/* stored list of messages */
std::map<uint32_t, RsMsgItem *> imsg;
/* ones that haven't made it out yet! */
std::map<uint32_t, RsMsgItem *> msgOutgoing;
p3ConnectMgr *mConnMgr;
/* List of notifications to post via Toaster */
std::list<MsgInfoSummary> msgNotifications;
Indicator msgChanged;
Indicator msgMajorChanged;
uint32_t mMsgUniqueId;
std::string config_dir;
};