From 9cf531fc9a7dd9cc6e5bad94630c274d3db2dd6c Mon Sep 17 00:00:00 2001 From: drbob Date: Tue, 19 Oct 2010 22:03:38 +0000 Subject: [PATCH] Modified bitdht to enable simultaneous peer searches. * Added maximum msg limits (20, 50, 100 msgs per sec). 50 = default. * Priority send out pings, with 10% queries. * switched bdquery list in bdnode to use pointers... so peers can be easily reordered (queue for queries). * Add all find peers, once dht initialised. * Added reset if FINDSELF fails. (don't know if this is a good idea!) * simplified DHT printouts. verbose versions are enabled with debug. git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@3686 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libbitdht/src/bitdht/bdmanager.cc | 10 ++- libbitdht/src/bitdht/bdnode.cc | 118 ++++++++++++++++++++++++------ libbitdht/src/bitdht/bdnode.h | 2 +- libbitdht/src/bitdht/bdpeer.cc | 11 ++- libbitdht/src/bitdht/bdquery.cc | 41 ++++++++++- libbitdht/src/bitdht/bdquery.h | 3 +- 6 files changed, 153 insertions(+), 32 deletions(-) diff --git a/libbitdht/src/bitdht/bdmanager.cc b/libbitdht/src/bitdht/bdmanager.cc index 495de785c..0809c8837 100644 --- a/libbitdht/src/bitdht/bdmanager.cc +++ b/libbitdht/src/bitdht/bdmanager.cc @@ -184,7 +184,9 @@ void bdNodeManager::startQueries() uint32_t qflags = it->second.mQFlags | BITDHT_QFLAGS_DISGUISE; addQuery(&(it->first), qflags); - return; + + // add all queries at the same time! + //return; } } return; @@ -251,16 +253,16 @@ void bdNodeManager::iteration() case BITDHT_MGR_STATE_FINDSELF: /* 60 seconds further startup .... then switch to ACTIVE */ #define MAX_FINDSELF_TIME 60 -#define MIN_OP_SPACE_SIZE 100 +#define MIN_OP_SPACE_SIZE 50 /* 100 seemed hard! */ { uint32_t nodeSpaceSize = mNodeSpace.calcSpaceSize(); #ifdef DEBUG_MGR +#endif std::cerr << "bdNodeManager::iteration() Finding Oneself: "; std::cerr << "NodeSpace Size:" << nodeSpaceSize; std::cerr << std::endl; -#endif if (nodeSpaceSize > MIN_OP_SPACE_SIZE) { @@ -338,9 +340,9 @@ void bdNodeManager::iteration() default: case BITDHT_MGR_STATE_FAILED: { -#ifdef DEBUG_MGR std::cerr << "bdNodeManager::iteration(): FAILED ==> STARTUP"; std::cerr << std::endl; +#ifdef DEBUG_MGR #endif stopDht(); startDht(); diff --git a/libbitdht/src/bitdht/bdnode.cc b/libbitdht/src/bitdht/bdnode.cc index b6331ddf0..f003cad42 100644 --- a/libbitdht/src/bitdht/bdnode.cc +++ b/libbitdht/src/bitdht/bdnode.cc @@ -141,12 +141,12 @@ void bdNode::printQueries() std::cerr << std::endl; int i = 0; - std::list::iterator it; + std::list::iterator it; for(it = mLocalQueries.begin(); it != mLocalQueries.end(); it++, i++) { fprintf(stderr, "Query #%d:\n", i); - it->printQuery(); - fprintf(stderr, "\n\n"); + (*it)->printQuery(); + fprintf(stderr, "\n"); } } @@ -190,7 +190,59 @@ void bdNode::iteration() delete msg; } - int i = 0; + + /* assume that this is called once per second... limit the messages + * in theory, a query can generate up to 10 peers (which will all require a ping!). + * we want to handle all the pings we can... so we don't hold up the process. + * but we also want enough queries to keep things moving. + * so allow up to 90% of messages to be pings. + * + * ignore responses to other peers... as the number is very small generally + */ + +#define BDNODE_MESSAGE_RATE_HIGH 1 +#define BDNODE_MESSAGE_RATE_MED 2 +#define BDNODE_MESSAGE_RATE_LOW 3 +#define BDNODE_MESSAGE_RATE_TRICKLE 4 + +#define BDNODE_HIGH_MSG_RATE 100 +#define BDNODE_MED_MSG_RATE 50 +#define BDNODE_LOW_MSG_RATE 20 +#define BDNODE_TRICKLE_MSG_RATE 5 + + int maxMsgs = BDNODE_MED_MSG_RATE; + int mAllowedMsgRate = BDNODE_MESSAGE_RATE_MED; + + switch(mAllowedMsgRate) + { + case BDNODE_MESSAGE_RATE_HIGH: + maxMsgs = BDNODE_HIGH_MSG_RATE; + break; + + case BDNODE_MESSAGE_RATE_MED: + maxMsgs = BDNODE_MED_MSG_RATE; + break; + + case BDNODE_MESSAGE_RATE_LOW: + maxMsgs = BDNODE_LOW_MSG_RATE; + break; + + case BDNODE_MESSAGE_RATE_TRICKLE: + maxMsgs = BDNODE_TRICKLE_MSG_RATE; + break; + + default: + break; + + } + + + + int allowedPings = 0.9 * maxMsgs; + int sentMsgs = 0; + int sentPings = 0; + +#if 0 int ilim = mLocalQueries.size() * 15; if (ilim < 20) { @@ -200,9 +252,9 @@ void bdNode::iteration() { ilim = 500; } +#endif - - while((mPotentialPeers.size() > 0) && (i < ilim)) + while((mPotentialPeers.size() > 0) && (sentMsgs < allowedPings)) { /* check history ... is we have pinged them already... * then simulate / pretend we have received a pong, @@ -211,6 +263,7 @@ void bdNode::iteration() bdId pid = mPotentialPeers.front(); mPotentialPeers.pop_front(); + /* don't send too many queries ... check history first */ #ifdef USE_HISTORY @@ -236,7 +289,9 @@ void bdNode::iteration() genNewTransId(&transId); //registerOutgoingMsg(&pid, &transId, BITDHT_MSG_TYPE_PING); msgout_ping(&pid, &transId); - i++; + + sentMsgs++; + sentPings++; #ifdef DEBUG_NODE_MSGS std::cerr << "bdNode::iteration() Pinging Potential Peer : "; @@ -248,11 +303,19 @@ void bdNode::iteration() } } - - for(it = mLocalQueries.begin(); it != mLocalQueries.end(); it++) + + /* allow each query to send up to one query... until maxMsgs has been reached */ + int numQueries = mLocalQueries.size(); + int sentQueries = 0; + int i = 0; + while((i < numQueries) && (sentMsgs < maxMsgs)) { + bdQuery *query = mLocalQueries.front(); + mLocalQueries.pop_front(); + mLocalQueries.push_back(query); + /* go through the possible queries */ - if (it->nextQuery(id, targetNodeId)) + if (query->nextQuery(id, targetNodeId)) { /* push out query */ bdToken transId; @@ -269,8 +332,17 @@ void bdNode::iteration() std::cerr << std::endl; #endif mCounterQueryNode++; + sentMsgs++; + sentQueries++; } + i++; } + + std::cerr << "bdNode::iteration() maxMsgs: " << maxMsgs << " sentPings: " << sentPings; + std::cerr << " / " << allowedPings; + std::cerr << " sentQueries: " << sentQueries; + std::cerr << " / " << numQueries; + std::cerr << std::endl; /* process remote query too */ processRemoteQuery(); @@ -409,10 +481,10 @@ void bdNode::checkPotentialPeer(bdId *id) { bool isWorthyPeer = false; /* also push to queries */ - std::list::iterator it; + std::list::iterator it; for(it = mLocalQueries.begin(); it != mLocalQueries.end(); it++) { - if (it->addPotentialPeer(id, 0)) + if ((*it)->addPotentialPeer(id, 0)) { isWorthyPeer = true; } @@ -444,10 +516,10 @@ void bdNode::addPeer(const bdId *id, uint32_t peerflags) #endif /* iterate through queries */ - std::list::iterator it; + std::list::iterator it; for(it = mLocalQueries.begin(); it != mLocalQueries.end(); it++) { - it->addPeer(id, peerflags); + (*it)->addPeer(id, peerflags); } mNodeSpace.add_peer(id, peerflags); @@ -511,19 +583,21 @@ void bdNode::addQuery(const bdNodeId *id, uint32_t qflags) startList.push_back(it->second); } - bdQuery query(id, startList, qflags, mFns); + bdQuery *query = new bdQuery(id, startList, qflags, mFns); mLocalQueries.push_back(query); } void bdNode::clearQuery(const bdNodeId *rmId) { - std::list::iterator it; + std::list::iterator it; for(it = mLocalQueries.begin(); it != mLocalQueries.end();) { - if (it->mId == *rmId) + if ((*it)->mId == *rmId) { + bdQuery *query = (*it); it = mLocalQueries.erase(it); + delete query; } else { @@ -534,14 +608,14 @@ void bdNode::clearQuery(const bdNodeId *rmId) void bdNode::QueryStatus(std::map &statusMap) { - std::list::iterator it; + std::list::iterator it; for(it = mLocalQueries.begin(); it != mLocalQueries.end(); it++) { bdQueryStatus status; - status.mStatus = it->mState; - status.mQFlags = it->mQueryFlags; - it->result(status.mResults); - statusMap[it->mId] = status; + status.mStatus = (*it)->mState; + status.mQFlags = (*it)->mQueryFlags; + (*it)->result(status.mResults); + statusMap[(*it)->mId] = status; } } diff --git a/libbitdht/src/bitdht/bdnode.h b/libbitdht/src/bitdht/bdnode.h index f55e8fe72..27d961a50 100644 --- a/libbitdht/src/bitdht/bdnode.h +++ b/libbitdht/src/bitdht/bdnode.h @@ -202,7 +202,7 @@ void recvPkt(char *msg, int len, struct sockaddr_in addr); bdHistory mHistory; /* for understanding the DHT */ - std::list mLocalQueries; + std::list mLocalQueries; std::list mRemoteQueries; std::list mPotentialPeers; diff --git a/libbitdht/src/bitdht/bdpeer.cc b/libbitdht/src/bitdht/bdpeer.cc index 03a54a968..988f5e852 100644 --- a/libbitdht/src/bitdht/bdpeer.cc +++ b/libbitdht/src/bitdht/bdpeer.cc @@ -608,14 +608,16 @@ int bdSpace::printDHT() std::vector::iterator it; std::list::iterator eit; - fprintf(stderr, "bdSpace::printDHT()\n"); /* iterate through the buckets, and sort by distance */ int i = 0; + +#ifdef BITDHT_DEBUG + fprintf(stderr, "bdSpace::printDHT()\n"); for(it = buckets.begin(); it != buckets.end(); it++, i++) { if (it->entries.size() > 0) { - fprintf(stderr, "Bucket %d ----------------------------\n", i); + fprintf(stderr, "Bucket %d ----------------------------\n", i); } for(eit = it->entries.begin(); eit != it->entries.end(); eit++) @@ -631,8 +633,11 @@ int bdSpace::printDHT() fprintf(stderr, "\n"); } } +#endif + + fprintf(stderr, "--------------------------------------\n"); + fprintf(stderr, "DHT Table Summary --------------------\n"); fprintf(stderr, "--------------------------------------\n"); - fprintf(stderr, "Summary ------------------------------\n"); /* little summary */ unsigned long long sum = 0; diff --git a/libbitdht/src/bitdht/bdquery.cc b/libbitdht/src/bitdht/bdquery.cc index 8ec238245..bd5e10c0f 100644 --- a/libbitdht/src/bitdht/bdquery.cc +++ b/libbitdht/src/bitdht/bdquery.cc @@ -80,6 +80,7 @@ bdQuery::bdQuery(const bdNodeId *id, std::list &startList, uint32_t queryF mState = BITDHT_QUERY_QUERYING; mQueryFlags = queryFlags; mQueryTS = now; + mSearchTime = 0; mQueryIdlePeerRetryPeriod = QUERY_IDLE_RETRY_PEER_PERIOD; @@ -227,6 +228,11 @@ int bdQuery::nextQuery(bdId &id, bdNodeId &targetNodeId) #endif /* if we get here - query finished */ + if (mState == BITDHT_QUERY_QUERYING) + { + /* store query time */ + mSearchTime = now - mQueryTS; + } /* check if we found the node */ if (mClosest.size() > 0) @@ -544,15 +550,22 @@ int bdQuery::addPotentialPeer(const bdId *id, uint32_t mode) int bdQuery::printQuery() { - +#ifdef DEBUG_QUERY fprintf(stderr, "bdQuery::printQuery()\n"); +#endif + time_t ts = time(NULL); fprintf(stderr, "Query for: "); mFns->bdPrintNodeId(std::cerr, &mId); fprintf(stderr, " Query State: %d", mState); fprintf(stderr, " Query Age %ld secs", ts-mQueryTS); + if (mState >= BITDHT_QUERY_FAILURE) + { + fprintf(stderr, " Search Time: %d secs", mSearchTime); + } fprintf(stderr, "\n"); +#ifdef DEBUG_QUERY fprintf(stderr, "Closest Available Peers:\n"); std::multimap::iterator it; for(it = mClosest.begin(); it != mClosest.end(); it++) @@ -577,6 +590,32 @@ int bdQuery::printQuery() fprintf(stderr," LastRecv: %ld ago", ts-it->second.mLastRecvTime); fprintf(stderr, "\n"); } +#else + // shortened version. + fprintf(stderr, "Closest Available Peer: "); + std::multimap::iterator it = mClosest.begin(); + if (it != mClosest.end()) + { + mFns->bdPrintId(std::cerr, &(it->second.mPeerId)); + fprintf(stderr, " Bucket: %d ", mFns->bdBucketDistance(&(it->first))); + fprintf(stderr," Found: %ld ago", ts-it->second.mFoundTime); + fprintf(stderr," LastSent: %ld ago", ts-it->second.mLastSendTime); + fprintf(stderr," LastRecv: %ld ago", ts-it->second.mLastRecvTime); + } + fprintf(stderr, "\n"); + + fprintf(stderr, "Closest Potential Peer: "); + it = mPotentialClosest.begin(); + if (it != mPotentialClosest.end()) + { + mFns->bdPrintId(std::cerr, &(it->second.mPeerId)); + fprintf(stderr, " Bucket: %d ", mFns->bdBucketDistance(&(it->first))); + fprintf(stderr," Found: %ld ago", ts-it->second.mFoundTime); + fprintf(stderr," LastSent: %ld ago", ts-it->second.mLastSendTime); + fprintf(stderr," LastRecv: %ld ago", ts-it->second.mLastRecvTime); + } + fprintf(stderr, "\n"); +#endif return 1; } diff --git a/libbitdht/src/bitdht/bdquery.h b/libbitdht/src/bitdht/bdquery.h index 75b1adc7a..1c700032f 100644 --- a/libbitdht/src/bitdht/bdquery.h +++ b/libbitdht/src/bitdht/bdquery.h @@ -34,7 +34,7 @@ /* Query result flags are in bdiface.h */ #define BITDHT_MIN_QUERY_AGE 10 -#define BITDHT_MAX_QUERY_AGE 300 +#define BITDHT_MAX_QUERY_AGE 1800 /* 30 minutes */ class bdQuery { @@ -58,6 +58,7 @@ int printQuery(); uint32_t mState; time_t mQueryTS; uint32_t mQueryFlags; + int32_t mSearchTime; int32_t mQueryIdlePeerRetryPeriod; // seconds between retries.