Added First Part of tracking missing messages.

* Added RsDistribDummyMsg class to each GroupInfo.
  * Added function to check for missing parents.
  * added print functions for debugging.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@4010 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2011-02-03 00:41:16 +00:00
parent 78aacb927f
commit dec7a36835
2 changed files with 257 additions and 0 deletions

View File

@ -241,6 +241,8 @@ void p3GroupDistrib::run() /* called once the thread is started */
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
int printed = 0;
while(1) while(1)
{ {
/* */ /* */
@ -286,6 +288,15 @@ void p3GroupDistrib::run() /* called once the thread is started */
#else #else
Sleep(1000); Sleep(1000);
#endif #endif
/* HACK for debugging */
if (printed < 10)
{
RsStackMutex stack(distribMtx);
locked_printAllDummyMsgs();
printed++;
}
} }
} }
} }
@ -809,6 +820,9 @@ void p3GroupDistrib::loadMsg(RsDistribSignedMsg *newMsg, const std::string &src,
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
// Interface to handle Dummy Msgs.
locked_CheckNewMsgDummies(git->second, msg, src, historical);
/* Callback for any derived classes to play with */ /* Callback for any derived classes to play with */
locked_eventNewMsg(&(git->second), msg, src, historical); locked_eventNewMsg(&(git->second), msg, src, historical);
@ -3718,3 +3732,229 @@ bool p3GroupDistrib::groupsChanged(std::list<std::string> &groupIds)
} }
/***************************************************************************************/
/***************************************************************************************/
/******************* Handle Missing Messages ***********************************/
/***************************************************************************************/
/***************************************************************************************/
/* Find missing messages */
/* LOGIC:
*
* dummy(grpId, threadId, parentId, msgId);
*
* add new msg....
* - search for threadId.
* - if missing add thread head: dummy(grpId, threadId, NULL, threadId).
*
* - search for parentId
* - if = threadId, we just added it (ok).
* - if missing add dummy(grpId, threadId, threadId, parentId).
*
* - check for matching dummy msgId.
* - if yes, delete.
* - if msg->parentId != dummy->parentId, then there is still a missing parent.
*
*/
#define DISTRIB_DUMMYMSG_DEBUG 1
RsDistribDummyMsg::RsDistribDummyMsg( std::string tId, std::string pId, std::string mId)
:threadId(tId), parentId(pId), msgId(mId)
{
return;
}
std::ostream &operator<<(std::ostream &out, const RsDistribDummyMsg &msg)
{
out << "DummyMsg(" << msg.threadId << "," << msg.parentId << "," << msg.msgId << ")";
return out;
}
bool p3GroupDistrib::locked_CheckNewMsgDummies(GroupInfo &grp, RsDistribMsg *msg, std::string id, bool historical)
{
std::string threadId = msg->threadId;
std::string parentId = msg->parentId;
std::string msgId = msg->msgId;
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_CheckNewMsgDummies(grpId:" << grp.grpId << ", threadId: " << threadId;
std::cerr << ", parentId:" << parentId << ", msgId: " << msgId << ")";
std::cerr << std::endl;
#endif
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_CheckNewMsgDummies() Pre Printout";
std::cerr << std::endl;
locked_printDummyMsgs(grp);
#endif
/* search for threadId */
if (threadId != "")
{
std::map<std::string, RsDistribMsg *>::iterator tit = grp.msgs.find(threadId);
if (tit == grp.msgs.end()) // not there!
{
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_CheckNewMsgDummies() No ThreadId Msg, Adding DummyMsg";
std::cerr << std::endl;
#endif
locked_addDummyMsg(grp, threadId, "", threadId);
}
else
{
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_CheckNewMsgDummies() Found ThreadId Msg";
std::cerr << std::endl;
#endif
}
}
if (parentId != "")
{
/* search for parentId */
std::map<std::string, RsDistribMsg *>::iterator pit = grp.msgs.find(parentId);
if (pit == grp.msgs.end()) // not there!
{
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_CheckNewMsgDummies() No ParentId Msg, Adding DummyMsg";
std::cerr << std::endl;
#endif
locked_addDummyMsg(grp, threadId, threadId, parentId);
}
else
{
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_CheckNewMsgDummies() Found ParentId Msg";
std::cerr << std::endl;
#endif
}
}
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_CheckNewMsgDummies() Checking for DummyMsg";
std::cerr << std::endl;
#endif
/* remove existing dummy */
locked_clearDummyMsg(grp, msgId);
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_CheckNewMsgDummies() Post Printout";
std::cerr << std::endl;
locked_printDummyMsgs(grp);
#endif
return true;
}
bool p3GroupDistrib::locked_addDummyMsg(GroupInfo &grp, std::string threadId, std::string parentId, std::string msgId)
{
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_addDummyMsg(grpId:" << grp.grpId << ", threadId: " << threadId;
std::cerr << ", parentId:" << parentId << ", msgId: " << msgId << ")";
std::cerr << std::endl;
#endif
if (msgId == "")
{
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_addDummyMsg() ERROR not adding empty MsgId";
std::cerr << std::endl;
#endif
return false;
}
/* search for the msg Id */
std::map<std::string, RsDistribDummyMsg>::iterator dit = grp.dummyMsgs.find(msgId);
if (dit == grp.dummyMsgs.end()) // not there!
{
grp.dummyMsgs[msgId] = RsDistribDummyMsg(threadId, parentId, msgId);
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_addDummyMsg() Adding Dummy Msg";
std::cerr << std::endl;
#endif
}
else
{
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_addDummyMsg() Dummy Msg already there: " << dit->second;
std::cerr << std::endl;
#endif
}
return true;
}
bool p3GroupDistrib::locked_clearDummyMsg(GroupInfo &grp, std::string msgId)
{
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_clearDummyMsg(grpId:" << grp.grpId << ", msgId: " << msgId << ")";
std::cerr << std::endl;
#endif
/* search for the msg Id */
std::map<std::string, RsDistribDummyMsg>::iterator dit = grp.dummyMsgs.find(msgId);
if (dit != grp.dummyMsgs.end())
{
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_clearDummyMsg() Erasing Dummy Msg: " << dit->second;
std::cerr << std::endl;
#endif
grp.dummyMsgs.erase(dit);
}
else
{
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_clearDummyMsg() Msg not found";
std::cerr << std::endl;
#endif
}
return true;
}
bool p3GroupDistrib::locked_printAllDummyMsgs()
{
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_printAllDummyMsg()";
std::cerr << std::endl;
#endif
std::map<std::string, GroupInfo>::iterator it;
for(it = mGroups.begin(); it != mGroups.end(); it++)
{
locked_printDummyMsgs(it->second);
}
}
bool p3GroupDistrib::locked_printDummyMsgs(GroupInfo &grp)
{
#ifdef DISTRIB_DUMMYMSG_DEBUG
std::cerr << "p3GroupDistrib::locked_printDummyMsg(grpId:" << grp.grpId << ")";
std::cerr << std::endl;
#endif
/* search for the msg Id */
std::map<std::string, RsDistribDummyMsg>::iterator dit;
for(dit = grp.dummyMsgs.begin(); dit != grp.dummyMsgs.end(); dit++)
{
std::cerr << dit->second;
std::cerr << std::endl;
}
return true;
}

View File

@ -102,6 +102,8 @@ const uint32_t GROUP_KEY_DISTRIB_ADMIN = 0x0040;
class RsDistribDummyMsg class RsDistribDummyMsg
{ {
public: public:
RsDistribDummyMsg( std::string tId, std::string pId, std::string mId);
RsDistribDummyMsg() { return; }
std::string threadId; std::string threadId;
std::string parentId; std::string parentId;
std::string msgId; std::string msgId;
@ -170,6 +172,7 @@ class GroupInfo
std::list<std::string> sources; std::list<std::string> sources;
std::map<std::string, RsDistribMsg *> msgs; std::map<std::string, RsDistribMsg *> msgs;
std::map<std::string, RsDistribDummyMsg> dummyMsgs; // dummyMsgs.
/***********************************/ /***********************************/
@ -698,6 +701,20 @@ class p3GroupDistrib: public CacheSource, public CacheStore, public p3Config, pu
/***************************************************************************************/ /***************************************************************************************/
/***************************************************************************************/ /***************************************************************************************/
/***************************************************************************************/
/**************************** DummyMsgs Functions **************************************/
/***************************************************************************************/
public:
bool locked_CheckNewMsgDummies(GroupInfo &info, RsDistribMsg *msg, std::string id, bool historical);
bool locked_addDummyMsg(GroupInfo &info, std::string threadId, std::string parentId, std::string msgId);
bool locked_clearDummyMsg(GroupInfo &info, std::string msgId);
bool locked_printAllDummyMsgs();
bool locked_printDummyMsgs(GroupInfo &info);
/* key cache functions - we use .... (not overloaded) /* key cache functions - we use .... (not overloaded)
*/ */