Improved the dht msg history storage.

- specify how long we store for.
 - cleanup old msgs.
 - improve printing of history.
 - add timeline storage as well.
 - disabled by default, enable USE_HISTORY in bdnode.c

There appears to be a bug related to copying bdId's around.
Some of the bootstrap ids are malformed, and this crashes rs.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@5724 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2012-10-25 22:36:22 +00:00
parent 83795a7c7d
commit cc9e933362
6 changed files with 239 additions and 28 deletions

View file

@ -2,12 +2,15 @@
#include "bitdht/bdhistory.h"
#include "bitdht/bdstddht.h"
#include "bitdht/bdmsgs.h"
#define MIN_RESEND_PERIOD 60
void bdMsgHistoryList::addMsg(time_t ts, uint32_t msgType, bool incoming)
{
// std::cerr << "bdMsgHistoryList::addMsg()";
// std::cerr << std::endl;
uint32_t msg = msgType | (incoming ? MSG_DIRECTION_INCOMING : MSG_DIRECTION_OUTGOING);
msgHistory.insert(std::make_pair(ts, msg));
}
@ -23,9 +26,26 @@ int bdMsgHistoryList::msgCount(time_t start_ts, time_t end_ts)
return count;
}
void bdMsgHistoryList::msgClear()
bool bdMsgHistoryList::msgClear(time_t before)
{
if (before == 0)
{
msgHistory.clear();
return true;
}
// Delete the old stuff in the list.
while((msgHistory.begin() != msgHistory.end()) && (msgHistory.begin()->first < before))
{
msgHistory.erase(msgHistory.begin());
}
// return true if empty.
if (msgHistory.begin() == msgHistory.end())
{
return true;
}
return false;
}
@ -68,13 +88,25 @@ void bdMsgHistoryList::printHistory(std::ostream &out, int mode, time_t start_ts
if (MSG_DIRECTION_INCOMING & it->second)
{
out << " =>" << it->second - MSG_DIRECTION_INCOMING;
out << " ";
uint32_t type = it->second-MSG_DIRECTION_INCOMING;
out << "( => ";
std::string name;
if (bitdht_msgtype(type, name))
{
out << name;
}
out << " )";
}
else
{
out << " ";
out << it->second - MSG_DIRECTION_OUTGOING << "=> ";
out << "( ";
uint32_t type = it->second - MSG_DIRECTION_OUTGOING;
std::string name;
if (bitdht_msgtype(type, name))
{
out << name;
}
out << " <= )";
}
}
@ -146,11 +178,25 @@ bool bdMsgHistoryList::validPeer()
return false;
}
bdHistory::bdHistory(time_t store_period)
:mStorePeriod(store_period) { return; }
void bdHistory::addMsg(const bdId *id, bdToken * /*transId*/, uint32_t msgType, bool incoming)
{
//std::cerr << "bdHistory::addMsg() ";
//bdStdPrintId(std::cerr, id);
//std::cerr << std::endl;
time_t now = time(NULL);
std::map<bdId, bdMsgHistoryList>::iterator it;
bdMsgHistoryList &histRef = mHistory[*id]; /* will instaniate empty */
histRef.addMsg(time(NULL), msgType, incoming);
histRef.addMsg(now, msgType, incoming);
/* add to mMsgTimeline */
mMsgTimeline.insert(std::make_pair(now, MsgRegister(id, msgType, incoming)));
}
void bdHistory::printMsgs()
@ -158,6 +204,10 @@ void bdHistory::printMsgs()
/* print and clear msgs */
std::ostream &out = std::cerr;
std::cerr << "bdHistory::printMsgs()";
std::cerr << std::endl;
std::map<bdId, bdMsgHistoryList> ::iterator it;
for(it = mHistory.begin(); it != mHistory.end(); it++)
{
@ -171,8 +221,78 @@ void bdHistory::printMsgs()
it->second.printHistory(out, 0, 0, time(NULL));
}
}
out << "Msg Timeline:";
time_t now = time(NULL);
std::multimap<time_t, MsgRegister>::iterator hit;
for(hit = mMsgTimeline.begin(); hit != mMsgTimeline.end(); hit++)
{
out << now - hit->first << " ";
bdStdPrintId(out, &(hit->second.id));
if (hit->second.incoming)
{
out << " => ";
}
else
{
out << " <= ";
}
std::string name;
if (bitdht_msgtype(hit->second.msgType, name))
{
out << name;
}
else
{
out << "UNKNOWN MSG";
}
out << std::endl;
}
}
void bdHistory::cleanupOldMsgs()
{
std::cerr << "bdHistory::cleanupOldMsgs()";
std::cerr << std::endl;
if (mStorePeriod == 0)
{
return; // no cleanup
}
std::list<bdId> to_cleanup;
std::list<bdId>::iterator cit;
time_t before = time(NULL) - mStorePeriod;
// Delete the old stuff in the list.
while((mMsgTimeline.begin() != mMsgTimeline.end()) && (mMsgTimeline.begin()->first < before))
{
std::multimap<time_t, MsgRegister>::iterator it = mMsgTimeline.begin();
to_cleanup.push_back(it->second.id);
mMsgTimeline.erase(it);
}
// remove old msgs, delete entry if its empty.
std::map<bdId, bdMsgHistoryList>::iterator hit;
for(cit = to_cleanup.begin(); cit != to_cleanup.end(); cit++)
{
hit = mHistory.find(*cit);
if (hit != mHistory.end())
{
if (hit->second.msgClear(before))
{
mHistory.erase(hit);
}
}
}
}
void bdHistory::clearHistory()
{
mHistory.clear();
@ -180,7 +300,6 @@ void bdHistory::clearHistory()
bool bdHistory::canSend(const bdId *id)
{
std::map<bdId, bdMsgHistoryList> ::iterator it;
it = mHistory.find(*id);
if (it != mHistory.end())

View file

@ -12,13 +12,26 @@
/**** DEBUGGING HISTORY ****/
class MsgRegister
{
public:
MsgRegister() { return; }
MsgRegister(const bdId *inId, uint32_t inMsgType, bool inIncoming)
:id(*inId), msgType(inMsgType), incoming(inIncoming) { return; }
bdId id;
uint32_t msgType;
bool incoming;
};
class bdMsgHistoryList
{
public:
void addMsg(time_t ts, uint32_t msgType, bool incoming);
int msgCount(time_t start_ts, time_t end_ts);
void msgClear();
bool msgClear(time_t before); // 0 => clear all.
void printHistory(std::ostream &out, int mode, time_t start_ts, time_t end_ts);
bool canSend();
@ -32,8 +45,12 @@ bool validPeer();
class bdHistory
{
public:
bdHistory(time_t store_period);
void addMsg(const bdId *id, bdToken *transId, uint32_t msgType, bool incoming);
void printMsgs();
void cleanupOldMsgs();
void clearHistory();
bool canSend(const bdId *id);
@ -42,6 +59,9 @@ bool validPeer(const bdId *id);
/* recent history */
//std::list<bdId> lastMsgs;
std::map<bdId, bdMsgHistoryList> mHistory;
std::multimap<time_t, MsgRegister> mMsgTimeline;
int mStorePeriod;
};

View file

@ -1015,3 +1015,62 @@ int bitdht_connect_genmsg(bdToken *tid, bdNodeId *id, int msgtype, bdId *src, bd
}
bool bitdht_msgtype(uint32_t msg_type, std::string &name)
{
switch(msg_type)
{
case BITDHT_MSG_TYPE_PING:
name = "PING";
break;
case BITDHT_MSG_TYPE_PONG:
name = "PONG";
break;
case BITDHT_MSG_TYPE_FIND_NODE:
name = "FIND_NODE";
break;
case BITDHT_MSG_TYPE_REPLY_NODE:
name = "REPLY_NODE";
break;
case BITDHT_MSG_TYPE_GET_HASH:
name = "GET_HASH";
break;
case BITDHT_MSG_TYPE_REPLY_HASH:
name = "REPLY_HASH";
break;
case BITDHT_MSG_TYPE_REPLY_NEAR:
name = "REPLY_NEAR";
break;
case BITDHT_MSG_TYPE_POST_HASH:
name = "POST_HASH";
break;
case BITDHT_MSG_TYPE_REPLY_POST:
name = "REPLY_POST";
break;
case BITDHT_MSG_TYPE_CONNECT:
name = "CONNECT";
break;
case BITDHT_MSG_TYPE_CONNECT_REQUEST:
name = "CONNECT_REQUEST";
break;
case BITDHT_MSG_TYPE_CONNECT_REPLY:
name = "CONNECT_REPLY";
break;
case BITDHT_MSG_TYPE_CONNECT_START:
name = "CONNECT_START";
break;
case BITDHT_MSG_TYPE_CONNECT_ACK:
name = "CONNECT_ACK";
break;
default:
name = "UNKNOWN";
return false;
break;
}
return true;
}

View file

@ -104,6 +104,8 @@ int beMsgMatchString(be_node *n, const char *str, int len);
uint32_t beMsgGetY(be_node *n);
uint32_t beMsgType(be_node *n);
bool bitdht_msgtype(uint32_t msg_type, std::string &name);
uint32_t convertBdVersionToVID(bdVersion *version);

View file

@ -66,9 +66,12 @@
//#define DISABLE_BAD_PEER_FILTER 1
//#define USE_HISTORY 1
#define HISTORY_PERIOD 60
bdNode::bdNode(bdNodeId *ownId, std::string dhtVersion, std::string bootfile, bdDhtFunctions *fns)
:mNodeSpace(ownId, fns), mQueryMgr(NULL), mConnMgr(NULL), mFilterPeers(NULL), mOwnId(*ownId), mDhtVersion(dhtVersion), mStore(bootfile, fns), mFns(fns), mFriendList(ownId)
:mNodeSpace(ownId, fns), mQueryMgr(NULL), mConnMgr(NULL), mFilterPeers(NULL), mOwnId(*ownId), mDhtVersion(dhtVersion), mStore(bootfile, fns), mFns(fns), mFriendList(ownId), mHistory(HISTORY_PERIOD)
{
init(); /* (uses this pointers) stuff it - do it here! */
@ -258,15 +261,17 @@ void bdNode::printState()
mQueryMgr->printQueries();
mConnMgr->printConnections();
#ifdef USE_HISTORY
mHistory.printMsgs();
#endif
std::cerr << "Outstanding Potential Peers: " << mPotentialPeers.size();
std::cerr << std::endl;
std::cerr << "Outstanding Query Requests: " << mRemoteQueries.size();
std::cerr << std::endl;
#ifdef USE_HISTORY
mHistory.cleanupOldMsgs();
mHistory.printMsgs();
#endif
mAccount.printStats(std::cerr);
}
@ -344,6 +349,7 @@ void bdNode::iteration()
/* don't send too many queries ... check history first */
#if 0
#ifdef USE_HISTORY
if (mHistory.validPeer(&pid))
{
@ -356,6 +362,7 @@ void bdNode::iteration()
#endif
}
#endif
#endif
/**** TEMP ****/
@ -402,7 +409,7 @@ void bdNode::iteration()
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::iteration() Pinging Out-Of-Date Peer: ";
mFns->bdPrintId(std::cerr, *oit);
mFns->bdPrintId(std::cerr, &(*oit));
std::cerr << std::endl;
#endif
}
@ -423,7 +430,6 @@ void bdNode::send_ping(bdId *id)
{
bdToken transId;
genNewTransId(&transId);
//registerOutgoingMsg(&id, &transId, BITDHT_MSG_TYPE_PING);
msgout_ping(id, &transId);
}
@ -434,7 +440,6 @@ void bdNode::send_query(bdId *id, bdNodeId *targetNodeId)
/* push out query */
bdToken transId;
genNewTransId(&transId);
//registerOutgoingMsg(&id, &transId, BITDHT_MSG_TYPE_FIND_NODE);
msgout_find_node(id, &transId, targetNodeId);
@ -453,7 +458,6 @@ void bdNode::send_connect_msg(bdId *id, int msgtype, bdId *srcAddr, bdId *destAd
/* push out query */
bdToken transId;
genNewTransId(&transId);
//registerOutgoingMsg(&id, &transId, BITDHT_MSG_TYPE_FIND_NODE);
msgout_connect_genmsg(id, &transId, msgtype, srcAddr, destAddr, mode, param, status);
@ -526,7 +530,7 @@ void bdNode::checkPotentialPeer(bdId *id, bdId *src)
}
void bdNode::addPotentialPeer(bdId *id, bdId */*src*/)
void bdNode::addPotentialPeer(bdId *id, bdId * /*src*/)
{
mPotentialPeers.push_back(*id);
}
@ -799,8 +803,13 @@ void bdNode::msgout_ping(bdId *id, bdToken *transId)
std::cerr << std::endl;
#endif
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_PING);
// THIS IS CRASHING HISTORY.
// LIKELY ID is not always valid!
// Either PotentialPeers or Out-Of-Date Peers.
//registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_PING);
bdId dupId(*id);
registerOutgoingMsg(&dupId, transId, BITDHT_MSG_TYPE_PING);
/* create string */
char msg[10240];
@ -1472,7 +1481,7 @@ void bdNode::recvPkt(char *msg, int len, struct sockaddr_in addr)
/* Construct Source Id */
bdId srcId(id, addr);
checkIncomingMsg(&srcId, &transId, beType);
registerIncomingMsg(&srcId, &transId, beType);
switch(beType)
{
case BITDHT_MSG_TYPE_PING: /* a: id, transId */
@ -2192,11 +2201,11 @@ void bdNode::registerOutgoingMsg(bdId *id, bdToken *transId, uint32_t msgType)
uint32_t bdNode::checkIncomingMsg(bdId *id, bdToken *transId, uint32_t msgType)
uint32_t bdNode::registerIncomingMsg(bdId *id, bdToken *transId, uint32_t msgType)
{
#ifdef DEBUG_MSG_CHECKS
std::cerr << "bdNode::checkIncomingMsg(";
std::cerr << "bdNode::registerIncomingMsg(";
mFns->bdPrintId(std::cerr, id);
std::cerr << ", " << msgType << ")";
std::cerr << std::endl;

View file

@ -219,7 +219,8 @@ void recvPkt(char *msg, int len, struct sockaddr_in addr);
/* transId handling */
void genNewTransId(bdToken *token);
void registerOutgoingMsg(bdId *id, bdToken *transId, uint32_t msgType);
uint32_t checkIncomingMsg(bdId *id, bdToken *transId, uint32_t msgType);
uint32_t registerIncomingMsg(bdId *id, bdToken *transId, uint32_t msgType);
void cleanupTransIdRegister();
@ -250,6 +251,8 @@ void recvPkt(char *msg, int len, struct sockaddr_in addr);
bdFriendList mFriendList;
bdPeerQueue mBadPeerQueue;
bdHistory mHistory; /* for understanding the DHT */
private:
uint32_t mNodeOptionFlags;
@ -258,7 +261,6 @@ void recvPkt(char *msg, int len, struct sockaddr_in addr);
uint32_t mMaxAllowedMsgs;
uint32_t mRelayMode;
bdHistory mHistory; /* for understanding the DHT */
std::list<bdRemoteQuery> mRemoteQueries;