diff --git a/libbitdht/src/bitdht/bdnode.cc b/libbitdht/src/bitdht/bdnode.cc index d53b6c9d9..b09aaa661 100644 --- a/libbitdht/src/bitdht/bdnode.cc +++ b/libbitdht/src/bitdht/bdnode.cc @@ -71,7 +71,9 @@ #define HISTORY_PERIOD 60 bdNode::bdNode(bdNodeId *ownId, std::string dhtVersion, std::string bootfile, bdDhtFunctions *fns) - :mNodeSpace(ownId, fns), mQueryMgr(NULL), mConnMgr(NULL), mFilterPeers(NULL), mOwnId(*ownId), mDhtVersion(dhtVersion), mStore(bootfile, fns), mFns(fns), mFriendList(ownId), mHistory(HISTORY_PERIOD) + :mNodeSpace(ownId, fns), mQueryMgr(NULL), mConnMgr(NULL), + mFilterPeers(NULL), mOwnId(*ownId), mDhtVersion(dhtVersion), mStore(bootfile, fns), mFns(fns), + mFriendList(ownId), mHistory(HISTORY_PERIOD) { init(); /* (uses this pointers) stuff it - do it here! */ @@ -264,14 +266,17 @@ void bdNode::printState() std::cerr << "Outstanding Potential Peers: " << mPotentialPeers.size(); std::cerr << std::endl; - std::cerr << "Outstanding Query Requests: " << mRemoteQueries.size(); - std::cerr << std::endl; - #ifdef USE_HISTORY mHistory.cleanupOldMsgs(); mHistory.printMsgs(); mHistory.analysePeers(); mHistory.peerTypeAnalysis(); + + // Incoming Query Analysis. + std::cerr << "Outstanding Query Requests: " << mRemoteQueries.size(); + std::cerr << std::endl; + + mQueryHistory.printMsgs(); #endif mAccount.printStats(std::cerr); @@ -647,12 +652,28 @@ void bdNode::processRemoteQuery() while(nProcessed < MAX_REMOTE_PROCESS_PER_CYCLE) { /* extra exit clause */ - if (mRemoteQueries.size() < 1) return; + if (mRemoteQueries.size() < 1) + { +#ifdef USE_HISTORY + if (nProcessed) + { + mQueryHistory.cleanupOldMsgs(); + } +#endif + return; + } bdRemoteQuery &query = mRemoteQueries.front(); - + + // filtering. + bool badPeer = false; +#ifdef USE_HISTORY + // store result in badPeer to activate the filtering. + mQueryHistory.addIncomingQuery(query.mQueryTS, &(query.mId), &(query.mQuery)); +#endif + /* discard older ones (stops queue getting overloaded) */ - if (query.mQueryTS > oldTS) + if ((query.mQueryTS > oldTS) && (!badPeer)) { /* recent enough to process! */ nProcessed++; @@ -722,16 +743,27 @@ void bdNode::processRemoteQuery() } else { - std::cerr << "bdNode::processRemoteQuery() Query Too Old: Discarding: "; + if (badPeer) + { + std::cerr << "bdNode::processRemoteQuery() Query from BadPeer: Discarding: "; + } + else + { + std::cerr << "bdNode::processRemoteQuery() Query Too Old: Discarding: "; + } mFns->bdPrintId(std::cerr, &(query.mId)); std::cerr << std::endl; #ifdef DEBUG_NODE_MSGS #endif } - mRemoteQueries.pop_front(); + } + +#ifdef USE_HISTORY + mQueryHistory.cleanupOldMsgs(); +#endif } diff --git a/libbitdht/src/bitdht/bdnode.h b/libbitdht/src/bitdht/bdnode.h index a41bdab53..af2a226ed 100644 --- a/libbitdht/src/bitdht/bdnode.h +++ b/libbitdht/src/bitdht/bdnode.h @@ -253,6 +253,8 @@ void recvPkt(char *msg, int len, struct sockaddr_in addr); bdHistory mHistory; /* for understanding the DHT */ + bdQueryHistory mQueryHistory; /* for determining old peers */ + private: uint32_t mNodeOptionFlags; diff --git a/libbitdht/src/bitdht/bdquery.cc b/libbitdht/src/bitdht/bdquery.cc index aa87975d0..e93000a84 100644 --- a/libbitdht/src/bitdht/bdquery.cc +++ b/libbitdht/src/bitdht/bdquery.cc @@ -26,6 +26,7 @@ #include "bitdht/bdquery.h" +#include "bitdht/bdstddht.h" #include "util/bdnet.h" #include @@ -1060,6 +1061,10 @@ int bdQuery::printQuery() /********************************* Remote Query **************************************/ + +#define QUERY_HISTORY_LIMIT 10 // Typically get max of 4-6 per 10minutes. +#define QUERY_HISTORY_PERIOD 600 + bdRemoteQuery::bdRemoteQuery(bdId *id, bdNodeId *query, bdToken *transId, uint32_t query_type) :mId(*id), mQuery(*query), mTransId(*transId), mQueryType(query_type) { @@ -1069,4 +1074,130 @@ bdRemoteQuery::bdRemoteQuery(bdId *id, bdNodeId *query, bdToken *transId, uint32 +bdQueryHistoryList::bdQueryHistoryList() + :mBadPeer(false) +{ + +} + + +bool bdQueryHistoryList::addIncomingQuery(time_t recvd, const bdNodeId *aboutId) +{ + mList.insert(std::make_pair(recvd, *aboutId)); + mBadPeer = (mList.size() > QUERY_HISTORY_LIMIT); + return mBadPeer; +} + + +// returns true if empty. +bool bdQueryHistoryList::cleanupMsgs(time_t before) +{ + if (before == 0) + { + mList.clear(); + return true; + } + + // Delete the old stuff in the list. + while((mList.begin() != mList.end()) && (mList.begin()->first < before)) + { + mList.erase(mList.begin()); + } + + // return true if empty. + if (mList.begin() == mList.end()) + { + return true; + } + return false; +} + +bdQueryHistory::bdQueryHistory() + :mStorePeriod(QUERY_HISTORY_PERIOD) +{ + return; +} + +bool bdQueryHistory::addIncomingQuery(time_t recvd, const bdId *id, const bdNodeId *aboutId) +{ + std::map::iterator it; + + it = mHistory.find(*id); + if (it == mHistory.end()) + { + mHistory[*id] = bdQueryHistoryList(); + it = mHistory.find(*id); + } + + return (it->second).addIncomingQuery(recvd, aboutId); +} + +bool bdQueryHistory::isBadPeer(const bdId *id) +{ + std::map::iterator it; + + it = mHistory.find(*id); + if (it == mHistory.end()) + { + return false; + } + + return it->second.mBadPeer; +} + + +void bdQueryHistory::cleanupOldMsgs() +{ + if (mStorePeriod == 0) + { + return; // no cleanup. + } + + time_t before = time(NULL) - mStorePeriod; + std::map::iterator it; + for(it = mHistory.begin(); it != mHistory.end(); ) + { + if (it->second.cleanupMsgs(before)) + { + std::map::iterator tit(it); + ++tit; + mHistory.erase(it); + it = tit; + } + else + { + ++it; + } + } +} + + +void bdQueryHistory::printMsgs() +{ + std::ostream &out = std::cerr; + + out << "bdQueryHistory::printMsgs() IncomingQueries in last " << mStorePeriod; + out << " secs" << std::endl; + + std::map::iterator it; + for(it = mHistory.begin(); it != mHistory.end(); it++) + { + out << "\t"; + bdStdPrintId(out, &(it->first)); + out << " " << it->second.mList.size(); + if (it->second.mBadPeer) + { + out << " BadPeer"; + } + out << std::endl; + } +} + + + + + + + + diff --git a/libbitdht/src/bitdht/bdquery.h b/libbitdht/src/bitdht/bdquery.h index 60660116c..2c17ae1d5 100644 --- a/libbitdht/src/bitdht/bdquery.h +++ b/libbitdht/src/bitdht/bdquery.h @@ -127,5 +127,36 @@ class bdRemoteQuery time_t mQueryTS; }; + + +class bdQueryHistoryList +{ + public: + bdQueryHistoryList(); + +bool addIncomingQuery(time_t recvd, const bdNodeId *aboutId); // calcs and returns mBadPeer +bool cleanupMsgs(time_t before); // returns true if empty. + + bool mBadPeer; + std::multimap mList; +}; + +class bdQueryHistory +{ + public: + bdQueryHistory(); + +bool addIncomingQuery(time_t recvd, const bdId *id, const bdNodeId *aboutId); +void printMsgs(); + +void cleanupOldMsgs(); + +bool isBadPeer(const bdId *id); + + int mStorePeriod; + std::map mHistory; +}; + + #endif