* Added new interface functions for start / stop DHT and to get stats.

* Implemented start/stop interface in udpbitdht and bemanager
 * added Restart of DHT if it fails to get going.
 * added start / stop functionality to bdnode and bdstore
 * added cleanup code to rest of bitdht classes.
 * reworked NetworkSize calc functions.
 * added thread debugging (prints at start / stop / join).
 * TESTS: added utest.h header file for automated unit testing (from libretroshare)
 * TESTS: bdmetric_test started conversion to automated tests
 * TESTS: udpbitdht_nettest. Added dht start / stop and network reset (thread join) functionality.
 * TESTS: fresh bdboot.txt



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@3678 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2010-10-17 20:55:32 +00:00
parent 884f0e7a22
commit 74961774cc
18 changed files with 1193 additions and 601 deletions

View file

@ -277,3 +277,11 @@ int bdHashSpace::cleanHashSpace(bdNodeId *min, bdNodeId *max, time_t maxAge)
return 1;
}
int bdHashSpace::clear()
{
mHashTable.clear();
return 1;
}

View file

@ -76,6 +76,7 @@ int modify(bdNodeId *id, std::string key, bdHashEntry *entry, uint32_t modFlags
int printHashSpace(std::ostream&);
int cleanHashSpace(bdNodeId *min, bdNodeId *max, time_t maxAge);
int clear();
private:

View file

@ -176,6 +176,12 @@ virtual void removeCallback(BitDhtCallback *cb) = 0;
virtual int getDhtPeerAddress(const bdNodeId *id, struct sockaddr_in &from) = 0;
virtual int getDhtValue(const bdNodeId *id, std::string key, std::string &value) = 0;
/* stats and Dht state */
virtual int startDht() = 0;
virtual int stopDht() = 0;
virtual int stateDht() = 0; /* STOPPED, STARTING, ACTIVE, FAILED */
virtual uint32_t statsNetworkSize() = 0;
virtual uint32_t statsBDVersionSize() = 0; /* same version as us! */
};
#endif

View file

@ -54,13 +54,17 @@
* #define DEBUG_MGR_PKT 1
***/
bdNodeManager::bdNodeManager(bdNodeId *id, std::string dhtVersion, std::string bootfile, bdDhtFunctions *fns)
:bdNode(id, dhtVersion, bootfile, fns)
{
mMode = BITDHT_MGR_STATE_STARTUP;
mMode = BITDHT_MGR_STATE_OFF;
mFns = fns;
mModeTS = 0 ;
mNetworkSize = 0;
mBdNetworkSize = 0;
/* setup a query for self */
#ifdef DEBUG_MGR
std::cerr << "bdNodeManager::bdNodeManager() ID: ";
@ -69,6 +73,57 @@ bdNodeManager::bdNodeManager(bdNodeId *id, std::string dhtVersion, std::string b
#endif
}
int bdNodeManager::stopDht()
{
time_t now = time(NULL);
/* clean up node */
shutdownNode();
/* flag queries as inactive */
/* check if exists already */
std::map<bdNodeId, bdQueryPeer>::iterator it;
for(it = mActivePeers.begin(); it != mActivePeers.end(); it++)
{
it->second.mStatus = BITDHT_QUERY_READY;
}
/* set state flag */
mMode = BITDHT_MGR_STATE_OFF;
mModeTS = now;
return 1;
}
int bdNodeManager::startDht()
{
time_t now = time(NULL);
/* set startup mode */
restartNode();
mMode = BITDHT_MGR_STATE_STARTUP;
mModeTS = now;
return 1;
}
/* STOPPED, STARTING, ACTIVE, FAILED */
int bdNodeManager::stateDht()
{
return mMode;
}
uint32_t bdNodeManager::statsNetworkSize()
{
return mNetworkSize;
}
/* same version as us! */
uint32_t bdNodeManager::statsBDVersionSize()
{
return mBdNetworkSize;
}
void bdNodeManager::addFindNode(bdNodeId *id, uint32_t qflags)
@ -166,6 +221,15 @@ void bdNodeManager::iteration()
time_t modeAge = now - mModeTS;
switch(mMode)
{
case BITDHT_MGR_STATE_OFF:
{
#ifdef DEBUG_MGR
std::cerr << "bdNodeManager::iteration(): OFF";
std::cerr << std::endl;
#endif
}
break;
case BITDHT_MGR_STATE_STARTUP:
/* 10 seconds startup .... then switch to ACTIVE */
@ -189,17 +253,26 @@ void bdNodeManager::iteration()
#define MAX_FINDSELF_TIME 60
#define MIN_OP_SPACE_SIZE 100
{
uint32_t nodeSpaceSize = mNodeSpace.calcSpaceSize();
#ifdef DEBUG_MGR
std::cerr << "bdNodeManager::iteration() Finding Oneself: NodeSpace Size:" << mNodeSpace.size();
std::cerr << "bdNodeManager::iteration() Finding Oneself: ";
std::cerr << "NodeSpace Size:" << nodeSpaceSize;
std::cerr << std::endl;
#endif
if ((modeAge > MAX_FINDSELF_TIME) ||
(mNodeSpace.size() > MIN_OP_SPACE_SIZE))
{
//mMode = BITDHT_MGR_STATE_ACTIVE;
mMode = BITDHT_MGR_STATE_REFRESH;
mModeTS = now;
if (nodeSpaceSize > MIN_OP_SPACE_SIZE)
{
mMode = BITDHT_MGR_STATE_REFRESH;
mModeTS = now;
}
if (modeAge > MAX_FINDSELF_TIME)
{
mMode = BITDHT_MGR_STATE_FAILED;
mModeTS = now;
}
}
break;
@ -253,15 +326,37 @@ void bdNodeManager::iteration()
case BITDHT_MGR_STATE_QUIET:
{
#ifdef DEBUG_MGR
std::cerr << "bdNodeManager::iteration(): QUIET";
std::cerr << std::endl;
#endif
}
break;
default:
case BITDHT_MGR_STATE_FAILED:
{
#ifdef DEBUG_MGR
std::cerr << "bdNodeManager::iteration(): FAILED ==> STARTUP";
std::cerr << std::endl;
#endif
stopDht();
startDht();
}
break;
}
/* tick parent */
bdNode::iteration();
if (mMode == BITDHT_MGR_STATE_OFF)
{
bdNode::iterationOff();
}
else
{
/* tick parent */
bdNode::iteration();
}
}
@ -271,6 +366,11 @@ int bdNodeManager::status()
printState();
checkStatus();
/* update the network numbers */
mNetworkSize = mNodeSpace.calcNetworkSize();
mBdNetworkSize = mNodeSpace.calcNetworkSizeWithFlag(
BITDHT_PEER_STATUS_DHT_APPL);
return 1;

View file

@ -70,11 +70,13 @@ class bdQueryPeer
};
#define BITDHT_MGR_STATE_OFF 0
#define BITDHT_MGR_STATE_STARTUP 1
#define BITDHT_MGR_STATE_FINDSELF 2
#define BITDHT_MGR_STATE_ACTIVE 3
#define BITDHT_MGR_STATE_REFRESH 4
#define BITDHT_MGR_STATE_QUIET 5
#define BITDHT_MGR_STATE_FAILED 6
#define MAX_STARTUP_TIME 10
#define MAX_REFRESH_TIME 10
@ -112,6 +114,12 @@ virtual void removeCallback(BitDhtCallback *cb);
virtual int getDhtPeerAddress(const bdNodeId *id, struct sockaddr_in &from);
virtual int getDhtValue(const bdNodeId *id, std::string key, std::string &value);
/* stats and Dht state */
virtual int startDht();
virtual int stopDht();
virtual int stateDht(); /* STOPPED, STARTING, ACTIVE, FAILED */
virtual uint32_t statsNetworkSize();
virtual uint32_t statsBDVersionSize(); /* same version as us! */
/******************* Internals *************************/
// Overloaded from bdnode for external node callback.
@ -140,6 +148,9 @@ void startQueries();
bdDhtFunctions *mFns;
uint32_t mNetworkSize;
uint32_t mBdNetworkSize;
/* future node functions */
//addPeerPing(foundId);
//clearPing(it->first);

View file

@ -61,41 +61,7 @@
bdNode::bdNode(bdNodeId *ownId, std::string dhtVersion, std::string bootfile, bdDhtFunctions *fns)
:mOwnId(*ownId), mNodeSpace(ownId, fns), mStore(bootfile, fns), mDhtVersion(dhtVersion), mFns(fns)
{
/* setup */
bdPeer peer;
while(mStore.getPeer(&peer))
{
addPotentialPeer(&(peer.mPeerId));
}
mCounterOutOfDatePing = 0;
mCounterPings = 0;
mCounterPongs = 0;
mCounterQueryNode = 0;
mCounterQueryHash = 0;
mCounterReplyFindNode = 0;
mCounterReplyQueryHash = 0;
mCounterRecvPing = 0;
mCounterRecvPong = 0;
mCounterRecvQueryNode = 0;
mCounterRecvQueryHash = 0;
mCounterRecvReplyFindNode = 0;
mCounterRecvReplyQueryHash = 0;
mLpfOutOfDatePing = 0;
mLpfPings = 0;
mLpfPongs = 0;
mLpfQueryNode = 0;
mLpfQueryHash = 0;
mLpfReplyFindNode = 0;
mLpfReplyQueryHash = 0;
mLpfRecvPing = 0;
mLpfRecvPong = 0;
mLpfRecvQueryNode = 0;
mLpfRecvQueryHash = 0;
mLpfRecvReplyFindNode = 0;
mLpfRecvReplyQueryHash = 0;
resetStats();
}
void bdNode::getOwnId(bdNodeId *id)
@ -103,19 +69,47 @@ void bdNode::getOwnId(bdNodeId *id)
*id = mOwnId;
}
#if 0
void bdNode::run()
/***** Startup / Shutdown ******/
void bdNode::restartNode()
{
resetStats();
mStore.reloadFromStore();
/* setup */
while(1)
bdPeer peer;
while(mStore.getPeer(&peer))
{
iteration();
sleep(1);
addPotentialPeer(&(peer.mPeerId));
}
}
#endif
void bdNode::shutdownNode()
{
/* clear the queries */
mLocalQueries.clear();
mRemoteQueries.clear();
/* clear the space */
mNodeSpace.clear();
mHashSpace.clear();
/* clear other stuff */
mPotentialPeers.clear();
mStore.clear();
/* clean up any outgoing messages */
while(mOutgoingMsgs.size() > 0)
{
bdNodeNetMsg *msg = mOutgoingMsgs.front();
mOutgoingMsgs.pop_front();
/* cleanup message */
delete msg;
}
}
/* Crappy initial store... use bdspace as answer */
void bdNode::updateStore()
@ -156,6 +150,20 @@ void bdNode::printQueries()
}
}
void bdNode::iterationOff()
{
/* clean up any incoming messages */
while(mIncomingMsgs.size() > 0)
{
bdNodeNetMsg *msg = mIncomingMsgs.front();
mIncomingMsgs.pop_front();
/* cleanup message */
delete msg;
}
}
void bdNode::iteration()
{
#ifdef DEBUG_NODE_MULTIPEER
@ -313,14 +321,6 @@ void bdNode::doStats()
mLpfReplyQueryHash *= (LPF_FACTOR);
mLpfReplyQueryHash += (1.0 - LPF_FACTOR) * mCounterReplyQueryHash;
mCounterOutOfDatePing = 0;
mCounterPings = 0;
mCounterPongs = 0;
mCounterQueryNode = 0;
mCounterQueryHash = 0;
mCounterReplyFindNode = 0;
mCounterReplyQueryHash = 0;
mLpfRecvPing *= (LPF_FACTOR);
mLpfRecvPing += (1.0 - LPF_FACTOR) * mCounterRecvPing;
mLpfRecvPong *= (LPF_FACTOR);
@ -334,12 +334,8 @@ void bdNode::doStats()
mLpfRecvReplyQueryHash *= (LPF_FACTOR);
mLpfRecvReplyQueryHash += (1.0 - LPF_FACTOR) * mCounterRecvReplyQueryHash;
mCounterRecvPing = 0;
mCounterRecvPong = 0;
mCounterRecvQueryNode = 0;
mCounterRecvQueryHash = 0;
mCounterRecvReplyFindNode = 0;
mCounterRecvReplyQueryHash = 0;
resetCounters();
}
void bdNode::printStats(std::ostream &out)
@ -370,8 +366,43 @@ void bdNode::printStats(std::ostream &out)
out << std::endl;
}
void bdNode::resetCounters()
{
mCounterOutOfDatePing = 0;
mCounterPings = 0;
mCounterPongs = 0;
mCounterQueryNode = 0;
mCounterQueryHash = 0;
mCounterReplyFindNode = 0;
mCounterReplyQueryHash = 0;
mCounterRecvPing = 0;
mCounterRecvPong = 0;
mCounterRecvQueryNode = 0;
mCounterRecvQueryHash = 0;
mCounterRecvReplyFindNode = 0;
mCounterRecvReplyQueryHash = 0;
}
void bdNode::resetStats()
{
mLpfOutOfDatePing = 0;
mLpfPings = 0;
mLpfPongs = 0;
mLpfQueryNode = 0;
mLpfQueryHash = 0;
mLpfReplyFindNode = 0;
mLpfReplyQueryHash = 0;
mLpfRecvPing = 0;
mLpfRecvPong = 0;
mLpfRecvQueryNode = 0;
mLpfRecvQueryHash = 0;
mLpfRecvReplyFindNode = 0;
mLpfRecvReplyQueryHash = 0;
resetCounters();
}
void bdNode::checkPotentialPeer(bdId *id)
@ -694,7 +725,7 @@ void bdNode::msgout_pong(bdId *id, bdToken *transId)
/* generate message, send to udp */
bdToken vid;
int vlen = BITDHT_TOKEN_MAX_LEN;
uint32_t vlen = BITDHT_TOKEN_MAX_LEN;
if (mDhtVersion.size() < vlen)
{
vlen = mDhtVersion.size();

View file

@ -97,6 +97,10 @@ class bdNode
bdNode(bdNodeId *id, std::string dhtVersion, std::string bootfile,
bdDhtFunctions *fns);
/* startup / shutdown node */
void restartNode();
void shutdownNode();
// virtual so manager can do callback.
// peer flags defined in bdiface.h
virtual void addPeer(const bdId *id, uint32_t peerflags);
@ -109,6 +113,7 @@ class bdNode
void clearQuery(const bdNodeId *id);
void QueryStatus(std::map<bdNodeId, bdQueryStatus> &statusMap);
void iterationOff();
void iteration();
void processRemoteQuery();
void updateStore();
@ -176,6 +181,8 @@ void recvPkt(char *msg, int len, struct sockaddr_in addr);
void printStats(std::ostream &out);
void printQueries();
void resetCounters();
void resetStats();
protected:

View file

@ -313,6 +313,19 @@ bdSpace::bdSpace(bdNodeId *ownId, bdDhtFunctions *fns)
return;
}
/* empty the buckets */
int bdSpace::clear()
{
std::vector<bdBucket>::iterator it;
/* iterate through the buckets, and sort by distance */
for(it = buckets.begin(); it != buckets.end(); it++)
{
it->entries.clear();
}
return 1;
}
int bdSpace::find_nearest_nodes(const bdNodeId *id, int number, std::list<bdId> /*excluding*/, std::multimap<bdMetric, bdId> &nearest)
{
std::multimap<bdMetric, bdId> closest;
@ -735,7 +748,84 @@ int bdSpace::printDHT()
}
int bdSpace::calcSizes()
uint32_t bdSpace::calcNetworkSize()
{
std::vector<bdBucket>::iterator it;
/* little summary */
unsigned long long sum = 0;
unsigned long long no_peers = 0;
uint32_t count = 0;
bool doPrint = false;
bool doAvg = false;
int i = 0;
for(it = buckets.begin(); it != buckets.end(); it++, i++)
{
int size = it->entries.size();
int shift = BITDHT_KEY_BITLEN - i;
bool toBig = false;
if (shift > BITDHT_ULLONG_BITS - mFns->bdBucketBitSize() - 1)
{
toBig = true;
shift = BITDHT_ULLONG_BITS - mFns->bdBucketBitSize() - 1;
}
unsigned long long no_nets = ((unsigned long long) 1 << shift);
/* use doPrint so it acts as a single switch */
if (size && !doAvg && !doPrint)
{
doAvg = true;
}
if (size && !doPrint)
{
doPrint = true;
}
if (size == 0)
{
/* reset counters - if empty slot - to discount outliers in average */
sum = 0;
no_peers = 0;
count = 0;
}
if (!toBig)
{
no_peers = no_nets * size;
}
if (doPrint && doAvg && !toBig)
{
if (size == mFns->bdNodesPerBucket())
{
/* last average */
doAvg = false;
}
if (no_peers != 0)
{
sum += no_peers;
count++;
}
}
}
uint32_t NetSize = 0;
if (count != 0)
{
NetSize = sum / count;
}
//std::cerr << "bdSpace::calcNetworkSize() : " << NetSize;
//std::cerr << std::endl;
return NetSize;
}
uint32_t bdSpace::calcNetworkSizeWithFlag(uint32_t withFlag)
{
std::vector<bdBucket>::iterator it;
@ -750,7 +840,16 @@ int bdSpace::calcSizes()
int i = 0;
for(it = buckets.begin(); it != buckets.end(); it++, i++)
{
int size = it->entries.size();
int size = 0;
std::list<bdPeer>::iterator lit;
for(lit = it->entries.begin(); lit != it->entries.end(); lit++)
{
if (withFlag & lit->mPeerFlags)
{
size++;
}
}
totalcount += size;
int shift = BITDHT_KEY_BITLEN - i;
@ -804,31 +903,29 @@ int bdSpace::calcSizes()
}
mLastSize = totalcount;
if (count == 0)
uint32_t NetSize = 0;
if (count != 0)
{
mLastNetSize = 0;
}
else
{
mLastNetSize = sum / count;
NetSize = sum / count;
}
return 1;
//std::cerr << "bdSpace::calcNetworkSize() : " << NetSize;
//std::cerr << std::endl;
return NetSize;
}
uint32_t bdSpace::size()
uint32_t bdSpace::calcSpaceSize()
{
calcSizes();
return mLastSize;
std::vector<bdBucket>::iterator it;
/* little summary */
uint32_t totalcount = 0;
for(it = buckets.begin(); it != buckets.end(); it++)
{
totalcount += it->entries.size();
}
return totalcount;
}
uint32_t bdSpace::netSize()
{
calcSizes();
return mLastNetSize;
}

View file

@ -146,6 +146,8 @@ class bdSpace
bdSpace(bdNodeId *ownId, bdDhtFunctions *fns);
int clear();
/* accessors */
int find_nearest_nodes(const bdNodeId *id, int number,
std::list<bdId> excluding, std::multimap<bdMetric, bdId> &nearest);
@ -154,9 +156,9 @@ int out_of_date_peer(bdId &id); // side-effect updates, send flag on peer.
int add_peer(const bdId *id, uint32_t mode);
int printDHT();
int calcSizes();
uint32_t size();
uint32_t netSize();
uint32_t calcNetworkSize();
uint32_t calcNetworkSizeWithFlag(uint32_t withFlag);
uint32_t calcSpaceSize();
/* to add later */
int updateOwnId(bdNodeId *newOwnId);
@ -166,9 +168,6 @@ int updateOwnId(bdNodeId *newOwnId);
std::vector<bdBucket> buckets;
bdNodeId mOwnId;
bdDhtFunctions *mFns;
uint32_t mLastSize;
uint32_t mLastNetSize;
};

View file

@ -41,14 +41,27 @@ bdStore::bdStore(std::string file, bdDhtFunctions *fns)
#endif
/* read data from file */
mIndex = 0;
mStoreFile = file;
FILE *fd = fopen(file.c_str(), "r");
reloadFromStore();
}
int bdStore::clear()
{
mIndex = 0;
store.clear();
return 1;
}
int bdStore::reloadFromStore()
{
clear();
FILE *fd = fopen(mStoreFile.c_str(), "r");
if (!fd)
{
fprintf(stderr, "Failed to Open File: %s ... No Peers\n", file.c_str());
return;
fprintf(stderr, "Failed to Open File: %s ... No Peers\n", mStoreFile.c_str());
return 0;
}
@ -84,6 +97,8 @@ bdStore::bdStore(std::string file, bdDhtFunctions *fns)
fprintf(stderr, "Read %ld Peers\n", (long) store.size());
#endif
return 1;
}
int bdStore::getPeer(bdPeer *peer)

View file

@ -36,6 +36,10 @@ class bdStore
public:
bdStore(std::string file, bdDhtFunctions *fns);
int reloadFromStore(); /* for restarts */
int clear();
int getPeer(bdPeer *peer);
void addStore(bdPeer *peer);
void writeStore(std::string file);