Major reorganisation of bitdht library.

* Moved Local Query Logic into bdquerymgr
 * Moved Connection Logic into bdConnectManager.
 * bdnode now mainly contains the low level packet/message handling and routing.
 * added Peer Filtering. (at bdnode level).
 * added Filtering to bdStore (so we won't recontact them).
 * added new Accounting class.
 * added ExtraFlags to bdPeer (UNSTABLE | ATTACHED) at the moment.
 * used "Similar" functions in bdSpace to detect UNSTABLE peers.
 * changed the "Similar" functions signatures.
 * added "Attached" Mode if we are UNSTABLE.
 * change out_of_data_peer() => scanOutOfDatePeers() much more efficient!
 * probably other stuff too!
 * probably introduced some bugs too ;(



git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-peernet@4348 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2011-06-28 11:05:35 +00:00
parent f55d8f9a88
commit 3a7f95812d
20 changed files with 1458 additions and 987 deletions

View File

@ -50,6 +50,9 @@ bdAccount::bdAccount()
mLabel[BDACCOUNT_MSG_REPLYFINDNODE] = "REPLYFINDNODE ";
mLabel[BDACCOUNT_MSG_REPLYQUERYHASH] = "REPLYQUERYHASH ";
mLabel[BDACCOUNT_MSG_POSTHASH] = "POSTHASH ";
mLabel[BDACCOUNT_MSG_REPLYPOSTHASH] = "REPLYPOSTHASH ";
mLabel[BDACCOUNT_MSG_CONNECTREQUEST] = "CONNECTREQUEST ";
mLabel[BDACCOUNT_MSG_CONNECTREPLY] = "CONNECTREPLY ";
mLabel[BDACCOUNT_MSG_CONNECTSTART] = "CONNECTSTART ";

View File

@ -38,12 +38,15 @@
#define BDACCOUNT_MSG_REPLYFINDNODE 5
#define BDACCOUNT_MSG_REPLYQUERYHASH 6
#define BDACCOUNT_MSG_CONNECTREQUEST 7
#define BDACCOUNT_MSG_CONNECTREPLY 8
#define BDACCOUNT_MSG_CONNECTSTART 9
#define BDACCOUNT_MSG_CONNECTACK 10
#define BDACCOUNT_MSG_POSTHASH 7
#define BDACCOUNT_MSG_REPLYPOSTHASH 8
#define BDACCOUNT_NUM_ENTRIES 11
#define BDACCOUNT_MSG_CONNECTREQUEST 9
#define BDACCOUNT_MSG_CONNECTREPLY 10
#define BDACCOUNT_MSG_CONNECTSTART 11
#define BDACCOUNT_MSG_CONNECTACK 12
#define BDACCOUNT_NUM_ENTRIES 13
class bdAccount
{

File diff suppressed because it is too large Load Diff

View File

@ -29,6 +29,8 @@
#include "bitdht/bdiface.h"
class bdQueryManager;
class bdNodePublisher;
/************************************************************************************************************
************************************** ProxyTuple + Connection State ****************************************
@ -167,5 +169,106 @@ class bdConnectionRequest
std::ostream &operator<<(std::ostream &out, const bdConnectionRequest &req);
std::ostream &operator<<(std::ostream &out, const bdConnection &conn);
#endif
/*********
* The Connection Management Class.
* this encapsulates all of the functionality..
* except for a couple of message in/outs + callback.
*/
class bdConnectManager
{
public:
bdConnectManager(bdNodeId *ownid, bdSpace *space, bdQueryManager *qmgr, bdDhtFunctions *fns, bdNodePublisher *pub);
/* connection functions */
void requestConnection(bdNodeId *id, uint32_t modes);
void allowConnection(bdNodeId *id, uint32_t modes);
/* high level */
void shutdownConnections();
void printConnections();
/* Connections: Configuration */
void defaultConnectionOptions();
virtual void setConnectionOptions(uint32_t allowedModes, uint32_t flags);
/* Connections: Initiation */
int requestConnection(struct sockaddr_in *laddr, bdNodeId *target, uint32_t mode, uint32_t start);
int requestConnection_direct(struct sockaddr_in *laddr, bdNodeId *target);
int requestConnection_proxy(struct sockaddr_in *laddr, bdNodeId *target, uint32_t mode);
int killConnectionRequest(struct sockaddr_in *laddr, bdNodeId *target, uint32_t mode);
int checkExistingConnectionAttempt(bdNodeId *target);
void addPotentialConnectionProxy(const bdId *srcId, const bdId *target);
void updatePotentialConnectionProxy(const bdId *id, uint32_t mode);
int checkPeerForFlag(const bdId *id, uint32_t with_flag);
int tickConnections();
void iterateConnectionRequests();
int startConnectionAttempt(bdConnectionRequest *req);
// internal Callback -> normally continues to callbackConnect().
void callbackConnectRequest(bdId *srcId, bdId *proxyId, bdId *destId,
int mode, int point, int cbtype, int errcode);
/* Connections: Outgoing */
int startConnectionAttempt(bdId *proxyId, bdId *srcConnAddr, bdId *destConnAddr, int mode);
void AuthConnectionOk(bdId *srcId, bdId *proxyId, bdId *destId, int mode, int loc);
void AuthConnectionNo(bdId *srcId, bdId *proxyId, bdId *destId, int mode, int loc, int errcode);
void iterateConnections();
/* Connections: Utility State */
bdConnection *findExistingConnection(bdNodeId *srcId, bdNodeId *proxyId, bdNodeId *destId);
bdConnection *newConnection(bdNodeId *srcId, bdNodeId *proxyId, bdNodeId *destId);
int cleanConnection(bdNodeId *srcId, bdNodeId *proxyId, bdNodeId *destId);
int determinePosition(bdNodeId *sender, bdNodeId *src, bdNodeId *dest);
int determineProxyId(bdNodeId *sender, bdNodeId *src, bdNodeId *dest, bdNodeId *proxyId);
bdConnection *findSimilarConnection(bdNodeId *srcId, bdNodeId *destId);
bdConnection *findExistingConnectionBySender(bdId *sender, bdId *src, bdId *dest);
bdConnection *newConnectionBySender(bdId *sender, bdId *src, bdId *dest);
int cleanConnectionBySender(bdId *sender, bdId *src, bdId *dest);
// Overloaded Generalised Connection Callback.
virtual void callbackConnect(bdId *srcId, bdId *proxyId, bdId *destId,
int mode, int point, int cbtype, int errcode);
/* Connections: */
int recvedConnectionRequest(bdId *id, bdId *srcConnAddr, bdId *destConnAddr, int mode);
int recvedConnectionReply(bdId *id, bdId *srcConnAddr, bdId *destConnAddr, int mode, int status);
int recvedConnectionStart(bdId *id, bdId *srcConnAddr, bdId *destConnAddr, int mode, int bandwidth);
int recvedConnectionAck(bdId *id, bdId *srcConnAddr, bdId *destConnAddr, int mode);
private:
std::map<bdProxyTuple, bdConnection> mConnections;
std::map<bdNodeId, bdConnectionRequest> mConnectionRequests;
uint32_t mConfigAllowedModes;
bool mConfigAutoProxy;
/****************************** Connection Code (in bdconnection.cc) ****************************/
private:
bdNodeId mOwnId;
bdSpace *mNodeSpace;
bdQueryManager *mQueryMgr;
bdDhtFunctions *mFns;
bdNodePublisher *mPub;
};
#endif // BITDHT_CONNECTION_H

View File

@ -60,6 +60,15 @@ bool bdFilter::filtered(std::list<bdFilteredPeer> &answer)
return (answer.size() > 0);
}
bool bdFilter::filteredIPs(std::list<struct sockaddr_in> &answer)
{
std::list<bdFilteredPeer>::iterator it;
for(it = mFiltered.begin(); it != mFiltered.end(); it++)
{
answer.push_back(it->mAddr);
}
return (answer.size() > 0);
}
int bdFilter::checkPeer(const bdId *id, uint32_t mode)
{
@ -111,11 +120,30 @@ int bdFilter::addPeerToFilter(const bdId *id, uint32_t flags)
fp.mLastSeen = now;
mFiltered.push_back(fp);
}
return found;
uint32_t saddr = id->addr.sin_addr.s_addr;
mIpsBanned.insert(saddr);
std::cerr << "Adding New Banned Ip Address: " << inet_ntoa(id->addr.sin_addr);
std::cerr << std::endl;
return true;
}
return false;
}
/* fast check if the addr is in the structure */
int bdFilter::addrOkay(struct sockaddr_in *addr)
{
std::set<uint32_t>::const_iterator it = mIpsBanned.find(addr->sin_addr.s_addr);
if (it == mIpsBanned.end())
{
return 1; // Address is Okay!
}
std::cerr << "Detected Packet From Banned Ip Address: " << inet_ntoa(addr->sin_addr);
std::cerr << std::endl;
return 0;
}
bool bdFilter::isOwnIdWithoutBitDhtFlags(const bdId *id, uint32_t peerFlags)

View File

@ -32,6 +32,7 @@
#include "bitdht/bdiface.h"
#include <set>
/* Query result flags are in bdiface.h */
@ -54,8 +55,12 @@ class bdFilter
// get the answer.
bool filtered(std::list<bdFilteredPeer> &answer);
bool filteredIPs(std::list<struct sockaddr_in> &answer);
int checkPeer(const bdId *id, uint32_t peerFlags);
int addrOkay(struct sockaddr_in *addr);
private:
int addPeerToFilter(const bdId *id, uint32_t flags);
@ -67,6 +72,9 @@ bool isOwnIdWithoutBitDhtFlags(const bdId *id, uint32_t peerFlags);
std::list<bdFilteredPeer> mFiltered;
bdDhtFunctions *mFns;
// = addr.sin_addr.s_addr (uint32_t) stored in network order.
std::set<uint32_t> mIpsBanned;
};

View File

@ -95,8 +95,8 @@ virtual int bdDistance(const bdNodeId *n1, const bdNodeId *n2, bdMetric *metric)
virtual int bdBucketDistance(const bdNodeId *n1, const bdNodeId *n2) = 0;
virtual int bdBucketDistance(const bdMetric *metric) = 0;
virtual uint32_t bdSimilarId(const bdId *id1, const bdId *id2) = 0;
virtual void bdUpdateSimilarId(bdId *dest, const bdId *src) = 0;
virtual bool bdSimilarId(const bdId *id1, const bdId *id2) = 0;
virtual bool bdUpdateSimilarId(bdId *dest, const bdId *src) = 0;
virtual void bdRandomMidId(const bdNodeId *target, const bdNodeId *other, bdNodeId *mid) = 0;
@ -107,11 +107,15 @@ virtual void bdPrintNodeId(std::ostream &out, const bdNodeId *a) = 0;
/* NODE OPTIONS */
#define BITDHT_OPTIONS_MAINTAIN_UNSTABLE_PORT 0x00000001
/* peer flags
* order is important!
* higher bits = more priority.
* BITDHT_PEER_STATUS_RECVPING
* BITDHT_PEER_STATUS_RECVPONG
* BITDHT_PEER_STATUS_RECVNODES
* BITDHT_PEER_STATUS_RECVHASHES
@ -125,10 +129,11 @@ virtual void bdPrintNodeId(std::ostream &out, const bdNodeId *a) = 0;
#define BITDHT_PEER_STATUS_MASK_DHT 0x0000ff00
#define BITDHT_PEER_STATUS_MASK_KNOWN 0x00ff0000
#define BITDHT_PEER_STATUS_RECV_PONG 0x00000001
#define BITDHT_PEER_STATUS_RECV_NODES 0x00000002
#define BITDHT_PEER_STATUS_RECV_HASHES 0x00000004
#define BITDHT_PEER_STATUS_RECV_CONNECT_MSG 0x00000008
#define BITDHT_PEER_STATUS_RECV_PING 0x00000001
#define BITDHT_PEER_STATUS_RECV_PONG 0x00000002
#define BITDHT_PEER_STATUS_RECV_NODES 0x00000004
#define BITDHT_PEER_STATUS_RECV_HASHES 0x00000008
#define BITDHT_PEER_STATUS_RECV_CONNECT_MSG 0x00000010
#define BITDHT_PEER_STATUS_DHT_ENGINE 0x00000100
#define BITDHT_PEER_STATUS_DHT_ENGINE_VERSION 0x00000200
@ -140,6 +145,15 @@ virtual void bdPrintNodeId(std::ostream &out, const bdNodeId *a) = 0;
#define BITDHT_PEER_STATUS_DHT_FRIEND 0x00040000
// EXTRA FLAGS are our internal thoughts about the peer.
#define BITDHT_PEER_EXFLAG_MASK_BASIC 0x000000ff
#define BITDHT_PEER_EXFLAG_UNSTABLE 0x00000001 // Port changes.
#define BITDHT_PEER_EXFLAG_ATTACHED 0x00000002 // We will ping in heavily. (if unstable)
#define BITDHT_CONNECT_MODE_DIRECT 0x00000001
#define BITDHT_CONNECT_MODE_PROXY 0x00000002
@ -195,11 +209,15 @@ virtual void bdPrintNodeId(std::ostream &out, const bdNodeId *a) = 0;
class bdPeer
{
public:
bdPeer():mPeerFlags(0), mLastSendTime(0), mLastRecvTime(0), mFoundTime(0), mExtraFlags(0) { return; }
bdId mPeerId;
uint32_t mPeerFlags;
time_t mLastSendTime;
time_t mLastRecvTime;
time_t mFoundTime; /* time stamp that peer was found */
uint32_t mExtraFlags;
};
class bdBucket

View File

@ -42,6 +42,8 @@
#include "bitdht/bdmanager.h"
#include "bitdht/bdmsgs.h"
#include "bitdht/bencode.h"
#include "bitdht/bdquerymgr.h"
#include <algorithm>
#include <sstream>
@ -72,7 +74,6 @@ bdNodeManager::bdNodeManager(bdNodeId *id, std::string dhtVersion, std::string b
mNetworkSize = 0;
mBdNetworkSize = 0;
/* setup a query for self */
#ifdef DEBUG_MGR
std::cerr << "bdNodeManager::bdNodeManager() ID: ";
mFns->bdPrintNodeId(std::cerr, id);
@ -190,7 +191,7 @@ void bdNodeManager::startQueries()
it->second.mStatus = BITDHT_QUERY_QUERYING;
uint32_t qflags = it->second.mQFlags | BITDHT_QFLAGS_DISGUISE;
addQuery(&(it->first), qflags);
mQueryMgr->addQuery(&(it->first), qflags);
// add all queries at the same time!
//return;
@ -215,7 +216,7 @@ void bdNodeManager::removeFindNode(bdNodeId *id)
}
/* cleanup any actions */
clearQuery(&(it->first));
mQueryMgr->clearQuery(&(it->first));
//clearPing(&(it->first));
/* remove from map */
@ -250,7 +251,7 @@ void bdNodeManager::iteration()
#endif
bdNodeId id;
getOwnId(&id);
addQuery(&id, BITDHT_QFLAGS_DO_IDLE | BITDHT_QFLAGS_DISGUISE);
mQueryMgr->addQuery(&id, BITDHT_QFLAGS_DO_IDLE | BITDHT_QFLAGS_DISGUISE);
mMode = BITDHT_MGR_STATE_FINDSELF;
mModeTS = now;
@ -407,24 +408,22 @@ void bdNodeManager::QueryRandomLocalNet()
/* do standard find_peer message */
bdToken transId;
genNewTransId(&transId);
msgout_find_node(&id, &transId, &targetNodeId);
send_query(&id, &targetNodeId);
//#ifdef DEBUG_NODE_MSGS
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNodeManager::QueryRandomLocalNet() Querying : ";
mFns->bdPrintId(std::cerr, &id);
std::cerr << " searching for : ";
mFns->bdPrintNodeId(std::cerr, &targetNodeId);
std::cerr << std::endl;
//#endif
#endif
}
else
{
//#ifdef DEBUG_NODE_MSGS
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNodeManager::QueryRandomLocalNet() No LocalNet Peer Found";
std::cerr << std::endl;
//#endif
#endif
}
}
@ -464,7 +463,7 @@ int bdNodeManager::checkStatus()
std::map<bdNodeId, bdQueryStatus> queryStatus;
QueryStatus(queryStatus);
mQueryMgr->QueryStatus(queryStatus);
for(it = queryStatus.begin(); it != queryStatus.end(); it++)
{
@ -561,7 +560,7 @@ int bdNodeManager::checkStatus()
mFns->bdPrintNodeId(std::cerr, &(it->first));
std::cerr << std::endl;
#endif
clearQuery(&(it->first));
mQueryMgr->clearQuery(&(it->first));
}
/* FIND in activePeers */
@ -810,13 +809,13 @@ int bdNodeManager::getDhtBucket(const int idx, bdBucket &bucket)
int bdNodeManager::getDhtQueries(std::map<bdNodeId, bdQueryStatus> &queries)
{
bdNode::QueryStatus(queries);
mQueryMgr->QueryStatus(queries);
return 1;
}
int bdNodeManager::getDhtQueryStatus(const bdNodeId *id, bdQuerySummary &query)
{
return bdNode::QuerySummary(id, query);
return mQueryMgr->QuerySummary(id, query);
}
@ -1071,7 +1070,7 @@ void bdNodeManager::ConnectionRequest(struct sockaddr_in *laddr, bdNodeId *targe
std::cerr << "bdNodeManager::ConnectionRequest()";
std::cerr << std::endl;
bdNode::requestConnection(laddr, target, mode, start);
mConnMgr->requestConnection(laddr, target, mode, start);
}
void bdNodeManager::ConnectionAuth(bdId *srcId, bdId *proxyId, bdId *destId, uint32_t mode, uint32_t loc, uint32_t answer)
@ -1081,17 +1080,17 @@ void bdNodeManager::ConnectionAuth(bdId *srcId, bdId *proxyId, bdId *destId, uin
if (answer == BITDHT_CONNECT_ANSWER_OKAY)
{
AuthConnectionOk(srcId, proxyId, destId, mode, loc);
mConnMgr->AuthConnectionOk(srcId, proxyId, destId, mode, loc);
}
else
{
AuthConnectionNo(srcId, proxyId, destId, mode, loc, answer);
mConnMgr->AuthConnectionNo(srcId, proxyId, destId, mode, loc, answer);
}
}
void bdNodeManager::ConnectionOptions(uint32_t allowedModes, uint32_t flags)
{
bdNode::setConnectionOptions(allowedModes, flags);
mConnMgr->setConnectionOptions(allowedModes, flags);
}

View File

@ -28,9 +28,13 @@
#include "bitdht/bencode.h"
#include "bitdht/bdmsgs.h"
#include "bitdht/bdquerymgr.h"
#include "bitdht/bdfilter.h"
#include "util/bdnet.h"
#include "util/bdrandom.h"
#include <string.h>
#include <stdlib.h>
@ -60,12 +64,40 @@
bdNode::bdNode(bdNodeId *ownId, std::string dhtVersion, std::string bootfile, bdDhtFunctions *fns)
:mOwnId(*ownId), mNodeSpace(ownId, fns), mStore(bootfile, fns), mDhtVersion(dhtVersion), mFns(fns)
:mOwnId(*ownId), mNodeSpace(ownId, fns), mStore(bootfile, fns), mDhtVersion(dhtVersion), mFns(fns),
mQueryMgr(NULL), mConnMgr(NULL), mFilterPeers(NULL)
{
resetStats();
defaultConnectionOptions();
init(); /* (uses this pointers) stuff it - do it here! */
}
void bdNode::init()
{
mQueryMgr = new bdQueryManager(&mNodeSpace, mFns, this);
mConnMgr = new bdConnectManager(&mOwnId, &mNodeSpace, mQueryMgr, mFns, this);
std::list<bdFilteredPeer> emptyList;
mFilterPeers = new bdFilter(&mOwnId, emptyList, BITDHT_FILTER_REASON_OWNID, mFns);
setNodeOptions(BITDHT_OPTIONS_MAINTAIN_UNSTABLE_PORT);
}
#define ATTACH_NUMBER 10
void bdNode::setNodeOptions(uint32_t optFlags)
{
mNodeOptionFlags = optFlags;
if (optFlags & BITDHT_OPTIONS_MAINTAIN_UNSTABLE_PORT)
{
mNodeSpace.setAttachedFlag(BITDHT_PEER_STATUS_DHT_ENGINE, ATTACH_NUMBER);
}
else
{
mNodeSpace.setAttachedFlag(BITDHT_PEER_STATUS_DHT_ENGINE, 0);
}
}
void bdNode::getOwnId(bdNodeId *id)
{
*id = mOwnId;
@ -74,7 +106,7 @@ void bdNode::getOwnId(bdNodeId *id)
/***** Startup / Shutdown ******/
void bdNode::restartNode()
{
resetStats();
mAccount.resetStats();
mStore.reloadFromStore();
@ -90,7 +122,9 @@ void bdNode::restartNode()
void bdNode::shutdownNode()
{
/* clear the queries */
mLocalQueries.clear();
mQueryMgr->shutdownQueries();
mConnMgr->shutdownConnections();
mRemoteQueries.clear();
/* clear the space */
@ -127,32 +161,16 @@ void bdNode::printState()
mNodeSpace.printDHT();
printQueries();
mQueryMgr->printQueries();
mConnMgr->printConnections();
#ifdef USE_HISTORY
mHistory.printMsgs();
#endif
printStats(std::cerr);
mAccount.printStats(std::cerr);
}
void bdNode::printQueries()
{
std::cerr << "bdNode::printQueries() for Peer: ";
mFns->bdPrintNodeId(std::cerr, &mOwnId);
std::cerr << std::endl;
int i = 0;
std::list<bdQuery *>::iterator it;
for(it = mLocalQueries.begin(); it != mLocalQueries.end(); it++, i++)
{
fprintf(stderr, "Query #%d:\n", i);
(*it)->printQuery();
fprintf(stderr, "\n");
}
}
void bdNode::iterationOff()
{
/* clean up any incoming messages */
@ -244,18 +262,6 @@ void bdNode::iteration()
int sentMsgs = 0;
int sentPings = 0;
#if 0
int ilim = mLocalQueries.size() * 15;
if (ilim < 20)
{
ilim = 20;
}
if (ilim > 500)
{
ilim = 500;
}
#endif
while((mPotentialPeers.size() > 0) && (sentMsgs < allowedPings))
{
/* check history ... is we have pinged them already...
@ -285,12 +291,7 @@ void bdNode::iteration()
/**** TEMP ****/
{
bdToken transId;
genNewTransId(&transId);
//registerOutgoingMsg(&pid, &transId, BITDHT_MSG_TYPE_PING);
msgout_ping(&pid, &transId);
send_ping(&pid);
sentMsgs++;
sentPings++;
@ -301,82 +302,62 @@ void bdNode::iteration()
std::cerr << std::endl;
#endif
mCounterPings++;
}
}
/* allow each query to send up to one query... until maxMsgs has been reached */
int numQueries = mLocalQueries.size();
int sentQueries = 0;
int i = 0;
while((i < numQueries) && (sentMsgs < maxMsgs))
{
bdQuery *query = mLocalQueries.front();
mLocalQueries.pop_front();
mLocalQueries.push_back(query);
/* go through the possible queries */
if (query->nextQuery(id, targetNodeId))
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::iteration() send_query(";
mFns->bdPrintId(std::cerr, &id);
std::cerr << ",";
mFns->bdPrintNodeId(std::cerr, &targetNodeId);
std::cerr << ")";
std::cerr << std::endl;
#endif
send_query(&id, &targetNodeId);
sentMsgs++;
sentQueries++;
}
i++;
}
int sentQueries = mQueryMgr->iterateQueries(maxMsgs-sentMsgs);
sentMsgs += sentQueries;
#ifdef DEBUG_NODE_ACTIONS
std::cerr << "bdNode::iteration() maxMsgs: " << maxMsgs << " sentPings: " << sentPings;
std::cerr << " / " << allowedPings;
std::cerr << " sentQueries: " << sentQueries;
std::cerr << " / " << numQueries;
std::cerr << std::endl;
#endif
/* process remote query too */
processRemoteQuery();
while(mNodeSpace.out_of_date_peer(id))
std::list<bdId> peerIds;
std::list<bdId>::iterator oit;
mNodeSpace.scanOutOfDatePeers(peerIds);
for(oit = peerIds.begin(); oit != peerIds.end(); oit++)
{
/* push out ping */
bdToken transId;
genNewTransId(&transId);
//registerOutgoingMsg(&id, &transId, BITDHT_MSG_TYPE_PING);
msgout_ping(&id, &transId);
send_ping(&(*oit));
mAccount.incCounter(BDACCOUNT_MSG_OUTOFDATEPING, true);
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::iteration() Pinging Out-Of-Date Peer: ";
mFns->bdPrintId(std::cerr, &id);
mFns->bdPrintId(std::cerr, *oit);
std::cerr << std::endl;
#endif
mCounterOutOfDatePing++;
//registerOutgoingMsg(&id, &transId, BITDHT_MSG_TYPE_FIND_NODE);
//msgout_find_node(&id, &transId, &(id.id));
}
// Handle Connection loops.
tickConnections();
mConnMgr->tickConnections();
doStats();
//printStats(std::cerr);
//printQueries();
mAccount.doStats();
}
/***************************************************************************************
***************************************************************************************
***************************************************************************************/
void bdNode::send_ping(bdId *id)
{
bdToken transId;
genNewTransId(&transId);
//registerOutgoingMsg(&id, &transId, BITDHT_MSG_TYPE_PING);
msgout_ping(id, &transId);
}
void bdNode::send_query(bdId *id, bdNodeId *targetNodeId)
{
/* push out query */
@ -393,168 +374,32 @@ void bdNode::send_query(bdId *id, bdNodeId *targetNodeId)
mFns->bdPrintNodeId(std::cerr, &targetNodeId);
std::cerr << std::endl;
#endif
mCounterQueryNode++;
}
#define LPF_FACTOR (0.90)
void bdNode::doStats()
void bdNode::send_connect_msg(bdId *id, int msgtype, bdId *srcAddr, bdId *destAddr, int mode, int status)
{
mLpfOutOfDatePing *= (LPF_FACTOR) ;
mLpfOutOfDatePing += (1.0 - LPF_FACTOR) * mCounterOutOfDatePing;
mLpfPings *= (LPF_FACTOR);
mLpfPings += (1.0 - LPF_FACTOR) * mCounterPings;
mLpfPongs *= (LPF_FACTOR);
mLpfPongs += (1.0 - LPF_FACTOR) * mCounterPongs;
mLpfQueryNode *= (LPF_FACTOR);
mLpfQueryNode += (1.0 - LPF_FACTOR) * mCounterQueryNode;
mLpfQueryHash *= (LPF_FACTOR);
mLpfQueryHash += (1.0 - LPF_FACTOR) * mCounterQueryHash;
mLpfReplyFindNode *= (LPF_FACTOR);
mLpfReplyFindNode += (1.0 - LPF_FACTOR) * mCounterReplyFindNode;
mLpfReplyQueryHash *= (LPF_FACTOR);
mLpfReplyQueryHash += (1.0 - LPF_FACTOR) * mCounterReplyQueryHash;
/* push out query */
bdToken transId;
genNewTransId(&transId);
//registerOutgoingMsg(&id, &transId, BITDHT_MSG_TYPE_FIND_NODE);
mLpfRecvPing *= (LPF_FACTOR);
mLpfRecvPing += (1.0 - LPF_FACTOR) * mCounterRecvPing;
mLpfRecvPong *= (LPF_FACTOR);
mLpfRecvPong += (1.0 - LPF_FACTOR) * mCounterRecvPong;
mLpfRecvQueryNode *= (LPF_FACTOR);
mLpfRecvQueryNode += (1.0 - LPF_FACTOR) * mCounterRecvQueryNode;
mLpfRecvQueryHash *= (LPF_FACTOR);
mLpfRecvQueryHash += (1.0 - LPF_FACTOR) * mCounterRecvQueryHash;
mLpfRecvReplyFindNode *= (LPF_FACTOR);
mLpfRecvReplyFindNode += (1.0 - LPF_FACTOR) * mCounterRecvReplyFindNode;
mLpfRecvReplyQueryHash *= (LPF_FACTOR);
mLpfRecvReplyQueryHash += (1.0 - LPF_FACTOR) * mCounterRecvReplyQueryHash;
msgout_connect_genmsg(id, &transId, msgtype, srcAddr, destAddr, mode, status);
// connection stats.
mLpfConnectRequest *= (LPF_FACTOR);
mLpfConnectRequest += (1.0 - LPF_FACTOR) * mCounterConnectRequest;
mLpfConnectReply *= (LPF_FACTOR);
mLpfConnectReply += (1.0 - LPF_FACTOR) * mCounterConnectReply;
mLpfConnectStart *= (LPF_FACTOR);
mLpfConnectStart += (1.0 - LPF_FACTOR) * mCounterConnectStart;
mLpfConnectAck *= (LPF_FACTOR);
mLpfConnectAck += (1.0 - LPF_FACTOR) * mCounterConnectAck;
mLpfRecvConnectRequest *= (LPF_FACTOR);
mLpfRecvConnectRequest += (1.0 - LPF_FACTOR) * mCounterRecvConnectRequest;
mLpfRecvConnectReply *= (LPF_FACTOR);
mLpfRecvConnectReply += (1.0 - LPF_FACTOR) * mCounterRecvConnectReply;
mLpfRecvConnectStart *= (LPF_FACTOR);
mLpfRecvConnectStart += (1.0 - LPF_FACTOR) * mCounterRecvConnectStart;
mLpfRecvConnectAck *= (LPF_FACTOR);
mLpfRecvConnectAck += (1.0 - LPF_FACTOR) * mCounterRecvConnectAck;
resetCounters();
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::send_connect_msg() to: ";
mFns->bdPrintId(std::cerr, &id);
std::cerr << std::endl;
#endif
}
void bdNode::printStats(std::ostream &out)
{
out << "bdNode::printStats()" << std::endl;
out << " Send Recv: ";
out << std::endl;
out << " mLpfOutOfDatePing : " << std::setw(10) << mLpfOutOfDatePing;
out << std::endl;
out << " mLpfPings : " << std::setw(10) << mLpfPings;
out << " mLpfRecvPongs : " << std::setw(10) << mLpfRecvPong;
out << std::endl;
out << " mLpfPongs : " << std::setw(10) << mLpfPongs;
out << " mLpfRecvPings : " << std::setw(10) << mLpfRecvPing;
out << std::endl;
out << " mLpfQueryNode : " << std::setw(10) << mLpfQueryNode;
out << " mLpfRecvReplyFindNode : " << std::setw(10) << mLpfRecvReplyFindNode;
out << std::endl;
out << " mLpfQueryHash : " << std::setw(10) << mLpfQueryHash;
out << " mLpfRecvReplyQueryHash : " << std::setw(10) << mLpfRecvReplyQueryHash;
out << std::endl;
out << " mLpfReplyFindNode : " << std::setw(10) << mLpfReplyFindNode;
out << " mLpfRecvQueryNode : " << std::setw(10) << mLpfRecvQueryNode;
out << std::endl;
out << " mLpfReplyQueryHash/sec : " << std::setw(10) << mLpfReplyQueryHash;
out << " mLpfRecvQueryHash/sec : " << std::setw(10) << mLpfRecvQueryHash;
out << std::endl;
out << std::endl;
out << " mLpfConnectRequest/sec : " << std::setw(10) << mLpfConnectRequest;
out << " mLpfRecvConnectReq/sec : " << std::setw(10) << mLpfRecvConnectRequest;
out << std::endl;
out << " mLpfConnectReply/sec : " << std::setw(10) << mLpfConnectReply;
out << " mLpfRecvConnReply/sec : " << std::setw(10) << mLpfRecvConnectReply;
out << std::endl;
out << " mLpfConnectStart/sec : " << std::setw(10) << mLpfConnectStart;
out << " mLpfRecvConnStart/sec : " << std::setw(10) << mLpfRecvConnectStart;
out << std::endl;
out << " mLpfConnectAck/sec : " << std::setw(10) << mLpfConnectAck;
out << " mLpfRecvConnectAck/sec : " << std::setw(10) << mLpfRecvConnectAck;
out << std::endl;
out << std::endl;
}
void bdNode::resetCounters()
{
mCounterOutOfDatePing = 0;
mCounterPings = 0;
mCounterPongs = 0;
mCounterQueryNode = 0;
mCounterQueryHash = 0;
mCounterReplyFindNode = 0;
mCounterReplyQueryHash = 0;
mCounterRecvPing = 0;
mCounterRecvPong = 0;
mCounterRecvQueryNode = 0;
mCounterRecvQueryHash = 0;
mCounterRecvReplyFindNode = 0;
mCounterRecvReplyQueryHash = 0;
mCounterConnectRequest = 0;
mCounterConnectReply = 0;
mCounterConnectStart = 0;
mCounterConnectAck = 0;
mCounterRecvConnectRequest = 0;
mCounterRecvConnectReply = 0;
mCounterRecvConnectStart = 0;
mCounterRecvConnectAck = 0;
}
void bdNode::resetStats()
{
mLpfOutOfDatePing = 0;
mLpfPings = 0;
mLpfPongs = 0;
mLpfQueryNode = 0;
mLpfQueryHash = 0;
mLpfReplyFindNode = 0;
mLpfReplyQueryHash = 0;
mLpfRecvPing = 0;
mLpfRecvPong = 0;
mLpfRecvQueryNode = 0;
mLpfRecvQueryHash = 0;
mLpfRecvReplyFindNode = 0;
mLpfRecvReplyQueryHash = 0;
resetCounters();
}
void bdNode::checkPotentialPeer(bdId *id, bdId *src)
{
bool isWorthyPeer = false;
/* also push to queries */
std::list<bdQuery *>::iterator it;
for(it = mLocalQueries.begin(); it != mLocalQueries.end(); it++)
{
if ((*it)->addPotentialPeer(id, src, 0))
{
isWorthyPeer = true;
}
}
bool isWorthyPeer = mQueryMgr->checkPotentialPeer(id, src);
if (isWorthyPeer)
{
@ -564,7 +409,7 @@ void bdNode::checkPotentialPeer(bdId *id, bdId *src)
if (src) // src can be NULL!
{
bdNode::addPotentialConnectionProxy(src, id); // CAUTION: Order switched!
mConnMgr->addPotentialConnectionProxy(src, id); // CAUTION: Order switched!
}
}
@ -588,13 +433,22 @@ void bdNode::addPeer(const bdId *id, uint32_t peerflags)
fprintf(stderr, ")\n");
#endif
/* iterate through queries */
std::list<bdQuery *>::iterator it;
for(it = mLocalQueries.begin(); it != mLocalQueries.end(); it++)
/* first check the filters */
if (mFilterPeers->checkPeer(id, peerflags))
{
(*it)->addPeer(id, peerflags);
std::cerr << "bdNode::addPeer(";
mFns->bdPrintId(std::cerr, id);
std::cerr << ", " << std::hex << peerflags << std::dec;
std::cerr << ") FAILED the BAD PEER FILTER!!!! DISCARDING MSG";
std::list<struct sockaddr_in> filteredIPs;
mFilterPeers->filteredIPs(filteredIPs);
mStore.filterIpList(filteredIPs);
return;
}
mQueryMgr->addPeer(id, peerflags);
mNodeSpace.add_peer(id, peerflags);
bdPeer peer;
@ -604,126 +458,10 @@ void bdNode::addPeer(const bdId *id, uint32_t peerflags)
mStore.addStore(&peer);
// Finally we pass to connections for them to use.
bdNode::updatePotentialConnectionProxy(id, peerflags);
mConnMgr->updatePotentialConnectionProxy(id, peerflags);
}
#if 0
// virtual so manager can do callback.
// peer flags defined in bdiface.h
void bdNode::PeerResponse(const bdId *id, const bdNodeId *target, uint32_t peerflags)
{
#ifdef DEBUG_NODE_ACTIONS
std::cerr << "bdNode::PeerResponse(";
mFns->bdPrintId(std::cerr, id);
std::cerr << ", target: ";
mFns->bdPrintNodeId(std::cerr, target);
fprintf(stderr, ")\n");
#endif
/* iterate through queries */
std::list<bdQuery>::iterator it;
for(it = mLocalQueries.begin(); it != mLocalQueries.end(); it++)
{
it->PeerResponse(id, target, peerflags);
}
mNodeSpace.add_peer(id, peerflags);
bdPeer peer;
peer.mPeerId = *id;
peer.mPeerFlags = peerflags;
peer.mLastRecvTime = time(NULL);
mStore.addStore(&peer);
}
#endif
/************************************ Query Details *************************/
void bdNode::addQuery(const bdNodeId *id, uint32_t qflags)
{
std::list<bdId> startList;
std::multimap<bdMetric, bdId> nearest;
std::multimap<bdMetric, bdId>::iterator it;
//mNodeSpace.find_nearest_nodes(id, BITDHT_QUERY_START_PEERS, startList, nearest, 0);
mNodeSpace.find_nearest_nodes(id, BITDHT_QUERY_START_PEERS, nearest);
fprintf(stderr, "bdNode::addQuery(");
mFns->bdPrintNodeId(std::cerr, id);
fprintf(stderr, ")\n");
for(it = nearest.begin(); it != nearest.end(); it++)
{
startList.push_back(it->second);
}
bdQuery *query = new bdQuery(id, startList, qflags, mFns);
mLocalQueries.push_back(query);
}
void bdNode::clearQuery(const bdNodeId *rmId)
{
std::list<bdQuery *>::iterator it;
for(it = mLocalQueries.begin(); it != mLocalQueries.end();)
{
if ((*it)->mId == *rmId)
{
bdQuery *query = (*it);
it = mLocalQueries.erase(it);
delete query;
}
else
{
it++;
}
}
}
void bdNode::QueryStatus(std::map<bdNodeId, bdQueryStatus> &statusMap)
{
std::list<bdQuery *>::iterator it;
for(it = mLocalQueries.begin(); it != mLocalQueries.end(); it++)
{
bdQueryStatus status;
status.mStatus = (*it)->mState;
status.mQFlags = (*it)->mQueryFlags;
(*it)->result(status.mResults);
statusMap[(*it)->mId] = status;
}
}
int bdNode::QuerySummary(const bdNodeId *id, bdQuerySummary &query)
{
std::list<bdQuery *>::iterator it;
for(it = mLocalQueries.begin(); it != mLocalQueries.end(); it++)
{
if ((*it)->mId == *id)
{
query.mId = (*it)->mId;
query.mLimit = (*it)->mLimit;
query.mState = (*it)->mState;
query.mQueryTS = (*it)->mQueryTS;
query.mQueryFlags = (*it)->mQueryFlags;
query.mSearchTime = (*it)->mSearchTime;
query.mClosest = (*it)->mClosest;
query.mPotentialPeers = (*it)->mPotentialPeers;
query.mProxiesUnknown = (*it)->mProxiesUnknown;
query.mProxiesFlagged = (*it)->mProxiesFlagged;
return 1;
}
}
return 0;
}
/************************************ Process Remote Query *************************/
void bdNode::processRemoteQuery()
{
@ -770,8 +508,6 @@ void bdNode::processRemoteQuery()
std::cerr << ", found " << nearest.size() << " nodes ";
std::cerr << std::endl;
#endif
mCounterReplyFindNode++;
break;
}
@ -783,8 +519,6 @@ void bdNode::processRemoteQuery()
std::cerr << " TODO";
std::cerr << std::endl;
#endif
mCounterReplyQueryHash++;
/* TODO */
break;
@ -859,8 +593,17 @@ int bdNode::outgoingMsg(struct sockaddr_in *addr, char *msg, int *len)
void bdNode::incomingMsg(struct sockaddr_in *addr, char *msg, int len)
{
bdNodeNetMsg *bdmsg = new bdNodeNetMsg(msg, len, addr);
mIncomingMsgs.push_back(bdmsg);
/* check against the filter */
if (mFilterPeers->addrOkay(addr))
{
bdNodeNetMsg *bdmsg = new bdNodeNetMsg(msg, len, addr);
mIncomingMsgs.push_back(bdmsg);
}
else
{
std::cerr << "bdNode::incomingMsg() Incoming Packet Filtered";
std::cerr << std::endl;
}
}
/************************************ Message Handling *****************************/
@ -887,6 +630,7 @@ void bdNode::msgout_ping(bdId *id, bdToken *transId)
int blen = bitdht_create_ping_msg(transId, &(mOwnId), msg, avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_PING, true);
}
@ -920,6 +664,8 @@ void bdNode::msgout_pong(bdId *id, bdToken *transId)
int blen = bitdht_response_ping_msg(transId, &(mOwnId), &vid, msg, avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_PONG, true);
}
@ -947,6 +693,7 @@ void bdNode::msgout_find_node(bdId *id, bdToken *transId, bdNodeId *query)
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_QUERYNODE, true);
}
@ -957,6 +704,7 @@ void bdNode::msgout_reply_find_node(bdId *id, bdToken *transId, std::list<bdId>
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_REPLY_NODE);
mAccount.incCounter(BDACCOUNT_MSG_REPLYFINDNODE, true);
int blen = bitdht_resp_node_msg(transId, &(mOwnId), peers, msg, avail-1);
@ -1005,6 +753,7 @@ void bdNode::msgout_get_hash(bdId *id, bdToken *transId, bdNodeId *info_hash)
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_QUERYHASH, true);
}
@ -1037,6 +786,7 @@ void bdNode::msgout_reply_hash(bdId *id, bdToken *transId, bdToken *token, std::
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_REPLYQUERYHASH, true);
}
@ -1070,6 +820,8 @@ void bdNode::msgout_reply_nearest(bdId *id, bdToken *transId, bdToken *token, st
int blen = bitdht_peers_reply_closest_msg(transId, &(mOwnId), token, nodes, msg, avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_REPLYQUERYHASH, true);
}
@ -1097,6 +849,8 @@ void bdNode::msgout_post_hash(bdId *id, bdToken *transId, bdNodeId *info_hash, u
int blen = bitdht_announce_peers_msg(transId,&(mOwnId),info_hash,port,token,msg,avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_POSTHASH, true);
}
@ -1111,14 +865,15 @@ void bdNode::msgout_reply_post(bdId *id, bdToken *transId)
#endif
/* generate message, send to udp */
char msg[10240];
int avail = 10240;
char msg[10240];
int avail = 10240;
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_REPLY_POST);
int blen = bitdht_reply_announce_msg(transId, &(mOwnId), msg, avail-1);
int blen = bitdht_reply_announce_msg(transId, &(mOwnId), msg, avail-1);
sendPkt(msg, blen, id->addr);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_REPLYPOSTHASH, true);
}
@ -1128,10 +883,19 @@ void bdNode::sendPkt(char *msg, int len, struct sockaddr_in addr)
//fprintf(stderr, "bdNode::sendPkt(%d) to %s:%d\n",
// len, inet_ntoa(addr.sin_addr), htons(addr.sin_port));
bdNodeNetMsg *bdmsg = new bdNodeNetMsg(msg, len, &addr);
//bdmsg->print(std::cerr);
mOutgoingMsgs.push_back(bdmsg);
//bdmsg->print(std::cerr);
/* filter outgoing packets */
if (mFilterPeers->addrOkay(&addr))
{
bdNodeNetMsg *bdmsg = new bdNodeNetMsg(msg, len, &addr);
//bdmsg->print(std::cerr);
mOutgoingMsgs.push_back(bdmsg);
//bdmsg->print(std::cerr);
}
else
{
std::cerr << "bdNode::sendPkt() Outgoing Packet Filtered";
std::cerr << std::endl;
}
return;
}
@ -1658,11 +1422,11 @@ void bdNode::msgin_ping(bdId *id, bdToken *transId)
mFns->bdPrintId(std::cerr, id);
std::cerr << std::endl;
#endif
mCounterRecvPing++;
mCounterPongs++;
mAccount.incCounter(BDACCOUNT_MSG_PING, false);
/* peer is alive */
uint32_t peerflags = 0; /* no id typically, so cant get version */
uint32_t peerflags = BITDHT_PEER_STATUS_RECV_PING; /* no id typically, so cant get version */
addPeer(id, peerflags);
/* reply */
@ -1687,7 +1451,8 @@ void bdNode::msgin_pong(bdId *id, bdToken *transId, bdToken *versionId)
(void) transId;
#endif
mCounterRecvPong++;
mAccount.incCounter(BDACCOUNT_MSG_PONG, false);
/* recv pong, and peer is alive. add to DHT */
//uint32_t vId = 0; // TODO XXX convertBdVersionToVID(versionId);
@ -1804,7 +1569,8 @@ void bdNode::msgin_find_node(bdId *id, bdToken *transId, bdNodeId *query)
std::cerr << std::endl;
#endif
mCounterRecvQueryNode++;
mAccount.incCounter(BDACCOUNT_MSG_QUERYNODE, false);
/* store query... */
queueQuery(id, query, transId, BD_QUERY_NEIGHBOURS);
@ -1833,8 +1599,8 @@ void bdNode::msgin_reply_find_node(bdId *id, bdToken *transId, std::list<bdId> &
#else
(void) transId;
#endif
mCounterRecvReplyFindNode++;
mAccount.incCounter(BDACCOUNT_MSG_REPLYFINDNODE, false);
/* add neighbours to the potential list */
for(it = nodes.begin(); it != nodes.end(); it++)
@ -1853,6 +1619,8 @@ void bdNode::msgin_reply_find_node(bdId *id, bdToken *transId, std::list<bdId> &
void bdNode::msgin_get_hash(bdId *id, bdToken *transId, bdNodeId *info_hash)
{
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_get_hash() TransId: ";
bdPrintTransId(std::cerr, transId);
@ -1863,7 +1631,8 @@ void bdNode::msgin_get_hash(bdId *id, bdToken *transId, bdNodeId *info_hash)
std::cerr << std::endl;
#endif
mCounterRecvQueryHash++;
mAccount.incCounter(BDACCOUNT_MSG_QUERYHASH, false);
/* generate message, send to udp */
queueQuery(id, info_hash, transId, BD_QUERY_HASH);
@ -1872,7 +1641,7 @@ void bdNode::msgin_get_hash(bdId *id, bdToken *transId, bdNodeId *info_hash)
void bdNode::msgin_reply_hash(bdId *id, bdToken *transId, bdToken *token, std::list<std::string> &values)
{
mCounterRecvReplyQueryHash++;
mAccount.incCounter(BDACCOUNT_MSG_REPLYQUERYHASH, false);
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_reply_hash() TransId: ";
@ -1900,7 +1669,7 @@ void bdNode::msgin_reply_hash(bdId *id, bdToken *transId, bdToken *token, std::l
void bdNode::msgin_reply_nearest(bdId *id, bdToken *transId, bdToken *token, std::list<bdId> &nodes)
{
//mCounterRecvReplyNearestHash++;
mAccount.incCounter(BDACCOUNT_MSG_REPLYQUERYHASH, false);
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_reply_nearest() TransId: ";
@ -1930,7 +1699,8 @@ void bdNode::msgin_reply_nearest(bdId *id, bdToken *transId, bdToken *token, std
void bdNode::msgin_post_hash(bdId *id, bdToken *transId, bdNodeId *info_hash, uint32_t port, bdToken *token)
{
//mCounterRecvPostHash++;
mAccount.incCounter(BDACCOUNT_MSG_POSTHASH, false);
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_post_hash() TransId: ";
@ -1957,7 +1727,7 @@ void bdNode::msgin_post_hash(bdId *id, bdToken *transId, bdNodeId *info_hash,
void bdNode::msgin_reply_post(bdId *id, bdToken *transId)
{
/* generate message, send to udp */
//mCounterRecvReplyPostHash++;
mAccount.incCounter(BDACCOUNT_MSG_REPLYPOSTHASH, false);
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_reply_post() TransId: ";
@ -1972,6 +1742,153 @@ void bdNode::msgin_reply_post(bdId *id, bdToken *transId)
}
/************************************************************************************************************
******************************************** Message Interface **********************************************
************************************************************************************************************/
/* Outgoing Messages */
std::string getConnectMsgType(int msgtype)
{
switch(msgtype)
{
case BITDHT_MSG_TYPE_CONNECT_REQUEST:
return "ConnectRequest";
break;
case BITDHT_MSG_TYPE_CONNECT_REPLY:
return "ConnectReply";
break;
case BITDHT_MSG_TYPE_CONNECT_START:
return "ConnectStart";
break;
case BITDHT_MSG_TYPE_CONNECT_ACK:
return "ConnectAck";
break;
default:
return "ConnectUnknown";
break;
}
}
void bdNode::msgout_connect_genmsg(bdId *id, bdToken *transId, int msgtype, bdId *srcAddr, bdId *destAddr, int mode, int status)
{
std::cerr << "bdConnectManager::msgout_connect_genmsg() Type: " << getConnectMsgType(msgtype);
std::cerr << " TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " SrcAddr: ";
mFns->bdPrintId(std::cerr, srcAddr);
std::cerr << " DestAddr: ";
mFns->bdPrintId(std::cerr, destAddr);
std::cerr << " Mode: " << mode;
std::cerr << " Status: " << status;
std::cerr << std::endl;
#ifdef DEBUG_NODE_MSGOUT
#endif
switch(msgtype)
{
default:
case BITDHT_MSG_TYPE_CONNECT_REQUEST:
mAccount.incCounter(BDACCOUNT_MSG_CONNECTREQUEST, true);
break;
case BITDHT_MSG_TYPE_CONNECT_REPLY:
mAccount.incCounter(BDACCOUNT_MSG_CONNECTREPLY, true);
break;
case BITDHT_MSG_TYPE_CONNECT_START:
mAccount.incCounter(BDACCOUNT_MSG_CONNECTSTART, true);
break;
case BITDHT_MSG_TYPE_CONNECT_ACK:
mAccount.incCounter(BDACCOUNT_MSG_CONNECTACK, true);
break;
}
registerOutgoingMsg(id, transId, msgtype);
/* create string */
char msg[10240];
int avail = 10240;
int blen = bitdht_connect_genmsg(transId, &(mOwnId), msgtype, srcAddr, destAddr, mode, status, msg, avail-1);
sendPkt(msg, blen, id->addr);
}
void bdNode::msgin_connect_genmsg(bdId *id, bdToken *transId, int msgtype,
bdId *srcAddr, bdId *destAddr, int mode, int status)
{
std::list<bdId>::iterator it;
std::cerr << "bdConnectManager::msgin_connect_genmsg() Type: " << getConnectMsgType(msgtype);
std::cerr << " TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " From: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " SrcAddr: ";
mFns->bdPrintId(std::cerr, srcAddr);
std::cerr << " DestAddr: ";
mFns->bdPrintId(std::cerr, destAddr);
std::cerr << " Mode: " << mode;
std::cerr << " Status: " << status;
std::cerr << std::endl;
#ifdef DEBUG_NODE_MSGS
#else
(void) transId;
#endif
/* switch to actual work functions */
uint32_t peerflags = 0;
switch(msgtype)
{
case BITDHT_MSG_TYPE_CONNECT_REQUEST:
peerflags = BITDHT_PEER_STATUS_RECV_CONNECT_MSG;
mAccount.incCounter(BDACCOUNT_MSG_CONNECTREQUEST, false);
mConnMgr->recvedConnectionRequest(id, srcAddr, destAddr, mode);
break;
case BITDHT_MSG_TYPE_CONNECT_REPLY:
peerflags = BITDHT_PEER_STATUS_RECV_CONNECT_MSG;
mAccount.incCounter(BDACCOUNT_MSG_CONNECTREPLY, false);
mConnMgr->recvedConnectionReply(id, srcAddr, destAddr, mode, status);
break;
case BITDHT_MSG_TYPE_CONNECT_START:
peerflags = BITDHT_PEER_STATUS_RECV_CONNECT_MSG;
mAccount.incCounter(BDACCOUNT_MSG_CONNECTSTART, false);
mConnMgr->recvedConnectionStart(id, srcAddr, destAddr, mode, status);
break;
case BITDHT_MSG_TYPE_CONNECT_ACK:
peerflags = BITDHT_PEER_STATUS_RECV_CONNECT_MSG;
mAccount.incCounter(BDACCOUNT_MSG_CONNECTACK, false);
mConnMgr->recvedConnectionAck(id, srcAddr, destAddr, mode);
break;
default:
break;
}
/* received message - so peer must be good */
addPeer(id, peerflags);
}
/****************** Other Functions ******************/
void bdNode::genNewToken(bdToken *token)

View File

@ -35,6 +35,9 @@
#include "bitdht/bdhistory.h"
#include "bitdht/bdconnection.h"
#include "bitdht/bdaccount.h"
class bdFilter;
#define BD_QUERY_NEIGHBOURS 1
@ -92,17 +95,39 @@ class bdNodeNetMsg
};
class bdNode
class bdNodePublisher
{
public:
/* simplified outgoing msg functions (for the managers) */
virtual void send_ping(bdId *id) = 0; /* message out */
virtual void send_query(bdId *id, bdNodeId *targetNodeId) = 0; /* message out */
virtual void send_connect_msg(bdId *id, int msgtype,
bdId *srcAddr, bdId *destAddr, int mode, int status) = 0;
// internal Callback -> normally continues to callbackConnect().
virtual void callbackConnect(bdId *srcId, bdId *proxyId, bdId *destId,
int mode, int point, int cbtype, int errcode) = 0;
};
class bdNode: public bdNodePublisher
{
public:
bdNode(bdNodeId *id, std::string dhtVersion, std::string bootfile,
bdDhtFunctions *fns);
void init(); /* sets up the self referential classes (mQueryMgr & mConnMgr) */
void setNodeOptions(uint32_t optFlags);
/* startup / shutdown node */
void restartNode();
void shutdownNode();
void getOwnId(bdNodeId *id);
// virtual so manager can do callback.
// peer flags defined in bdiface.h
virtual void addPeer(const bdId *id, uint32_t peerflags);
@ -111,31 +136,33 @@ class bdNode
void checkPotentialPeer(bdId *id, bdId *src);
void addPotentialPeer(bdId *id, bdId *src);
void addQuery(const bdNodeId *id, uint32_t qflags);
void clearQuery(const bdNodeId *id);
void QueryStatus(std::map<bdNodeId, bdQueryStatus> &statusMap);
int QuerySummary(const bdNodeId *id, bdQuerySummary &query);
/* connection functions */
void requestConnection(bdNodeId *id, uint32_t modes);
void allowConnection(bdNodeId *id, uint32_t modes);
void iterationOff();
void iteration();
void processRemoteQuery();
void updateStore();
/* simplified outgoing msg functions (for the managers) */
virtual void send_ping(bdId *id); /* message out */
virtual void send_query(bdId *id, bdNodeId *targetNodeId); /* message out */
virtual void send_connect_msg(bdId *id, int msgtype,
bdId *srcAddr, bdId *destAddr, int mode, int status);
/* interaction with outside world */
// This is implemented in bdManager.
// virtual void callbackConnect(bdId *srcId, bdId *proxyId, bdId *destId,
// int mode, int point, int cbtype, int errcode);
/* interaction with outside world (Accessed by controller to deliver us msgs) */
int outgoingMsg(struct sockaddr_in *addr, char *msg, int *len);
void incomingMsg(struct sockaddr_in *addr, char *msg, int len);
// Below is internal Management of incoming / outgoing messages.
private:
/* internal interaction with network */
void sendPkt(char *msg, int len, struct sockaddr_in addr);
void recvPkt(char *msg, int len, struct sockaddr_in addr);
/* internal assistance functions */
void send_query(bdId *id, bdNodeId *targetNodeId); /* message out */
/* output functions (send msg) */
void msgout_ping(bdId *id, bdToken *transId);
@ -172,6 +199,11 @@ void recvPkt(char *msg, int len, struct sockaddr_in addr);
bdNodeId *info_hash, uint32_t port, bdToken *token);
void msgin_reply_post(bdId *id, bdToken *transId);
void msgout_connect_genmsg(bdId *id, bdToken *transId, int msgtype,
bdId *srcAddr, bdId *destAddr, int mode, int status);
void msgin_connect_genmsg(bdId *id, bdToken *transId, int msgtype,
bdId *srcAddr, bdId *destAddr, int mode, int status);
/* token handling */
@ -184,171 +216,43 @@ void recvPkt(char *msg, int len, struct sockaddr_in addr);
uint32_t checkIncomingMsg(bdId *id, bdToken *transId, uint32_t msgType);
void cleanupTransIdRegister();
void getOwnId(bdNodeId *id);
void doStats();
void printStats(std::ostream &out);
void printQueries();
void resetCounters();
void resetStats();
/****************************** Connection Code (in bdconnection.cc) ****************************/
/* Connections: Configuration */
void defaultConnectionOptions();
virtual void setConnectionOptions(uint32_t allowedModes, uint32_t flags);
/* Connections: Messages */
void msgout_connect_genmsg(bdId *id, bdToken *transId, int msgtype,
bdId *srcAddr, bdId *destAddr, int mode, int status);
void msgin_connect_genmsg(bdId *id, bdToken *transId, int msgtype,
bdId *srcAddr, bdId *destAddr, int mode, int status);
/* Connections: Initiation */
int requestConnection(struct sockaddr_in *laddr, bdNodeId *target, uint32_t mode, uint32_t start);
int requestConnection_direct(struct sockaddr_in *laddr, bdNodeId *target);
int requestConnection_proxy(struct sockaddr_in *laddr, bdNodeId *target, uint32_t mode);
int killConnectionRequest(struct sockaddr_in *laddr, bdNodeId *target, uint32_t mode);
int checkExistingConnectionAttempt(bdNodeId *target);
void addPotentialConnectionProxy(const bdId *srcId, const bdId *target);
void updatePotentialConnectionProxy(const bdId *id, uint32_t mode);
int checkPeerForFlag(const bdId *id, uint32_t with_flag);
int tickConnections();
void iterateConnectionRequests();
int startConnectionAttempt(bdConnectionRequest *req);
// internal Callback -> normally continues to callbackConnect().
void callbackConnectRequest(bdId *srcId, bdId *proxyId, bdId *destId,
int mode, int point, int cbtype, int errcode);
/* Connections: Outgoing */
int startConnectionAttempt(bdId *proxyId, bdId *srcConnAddr, bdId *destConnAddr, int mode);
void AuthConnectionOk(bdId *srcId, bdId *proxyId, bdId *destId, int mode, int loc);
void AuthConnectionNo(bdId *srcId, bdId *proxyId, bdId *destId, int mode, int loc, int errcode);
void iterateConnections();
/* Connections: Utility State */
bdConnection *findExistingConnection(bdNodeId *srcId, bdNodeId *proxyId, bdNodeId *destId);
bdConnection *newConnection(bdNodeId *srcId, bdNodeId *proxyId, bdNodeId *destId);
int cleanConnection(bdNodeId *srcId, bdNodeId *proxyId, bdNodeId *destId);
int determinePosition(bdNodeId *sender, bdNodeId *src, bdNodeId *dest);
int determineProxyId(bdNodeId *sender, bdNodeId *src, bdNodeId *dest, bdNodeId *proxyId);
bdConnection *findSimilarConnection(bdNodeId *srcId, bdNodeId *destId);
bdConnection *findExistingConnectionBySender(bdId *sender, bdId *src, bdId *dest);
bdConnection *newConnectionBySender(bdId *sender, bdId *src, bdId *dest);
int cleanConnectionBySender(bdId *sender, bdId *src, bdId *dest);
// Overloaded Generalised Connection Callback.
virtual void callbackConnect(bdId *srcId, bdId *proxyId, bdId *destId,
int mode, int point, int cbtype, int errcode);
/* Connections: */
int recvedConnectionRequest(bdId *id, bdId *srcConnAddr, bdId *destConnAddr, int mode);
int recvedConnectionReply(bdId *id, bdId *srcConnAddr, bdId *destConnAddr, int mode, int status);
int recvedConnectionStart(bdId *id, bdId *srcConnAddr, bdId *destConnAddr, int mode, int bandwidth);
int recvedConnectionAck(bdId *id, bdId *srcConnAddr, bdId *destConnAddr, int mode);
/********** Variables **********/
private:
std::map<bdProxyTuple, bdConnection> mConnections;
std::map<bdNodeId, bdConnectionRequest> mConnectionRequests;
uint32_t mConfigAllowedModes;
bool mConfigAutoProxy;
/****************************** Connection Code (in bdconnection.cc) ****************************/
/**** Some Variables are Protected to allow inherited classes to use *****/
protected:
bdSpace mNodeSpace;
bdQueryManager *mQueryMgr;
bdConnectManager *mConnMgr;
bdFilter *mFilterPeers;
bdNodeId mOwnId;
bdId mLikelyOwnId; // Try to workout own id address.
bdSpace mNodeSpace;
std::string mDhtVersion;
bdAccount mAccount;
bdStore mStore;
bdDhtFunctions *mFns;
bdHashSpace mHashSpace;
private:
bdStore mStore;
std::string mDhtVersion;
uint32_t mNodeOptionFlags;
bdDhtFunctions *mFns;
bdHashSpace mHashSpace;
bdHistory mHistory; /* for understanding the DHT */
std::list<bdQuery *> mLocalQueries;
std::list<bdRemoteQuery> mRemoteQueries;
std::list<bdId> mPotentialPeers;
std::list<bdNodeNetMsg *> mOutgoingMsgs;
std::list<bdNodeNetMsg *> mIncomingMsgs;
// Statistics.
double mCounterOutOfDatePing;
double mCounterPings;
double mCounterPongs;
double mCounterQueryNode;
double mCounterQueryHash;
double mCounterReplyFindNode;
double mCounterReplyQueryHash;
// connection stats.
double mCounterConnectRequest;
double mCounterConnectReply;
double mCounterConnectStart;
double mCounterConnectAck;
double mCounterRecvPing;
double mCounterRecvPong;
double mCounterRecvQueryNode;
double mCounterRecvQueryHash;
double mCounterRecvReplyFindNode;
double mCounterRecvReplyQueryHash;
// connection stats.
double mCounterRecvConnectRequest;
double mCounterRecvConnectReply;
double mCounterRecvConnectStart;
double mCounterRecvConnectAck;
double mLpfOutOfDatePing;
double mLpfPings;
double mLpfPongs;
double mLpfQueryNode;
double mLpfQueryHash;
double mLpfReplyFindNode;
double mLpfReplyQueryHash;
// connection stats.
double mLpfConnectRequest;
double mLpfConnectReply;
double mLpfConnectStart;
double mLpfConnectAck;
double mLpfRecvPing;
double mLpfRecvPong;
double mLpfRecvQueryNode;
double mLpfRecvQueryHash;
double mLpfRecvReplyFindNode;
double mLpfRecvReplyQueryHash;
// connection stats.
double mLpfRecvConnectRequest;
double mLpfRecvConnectReply;
double mLpfRecvConnectStart;
double mLpfRecvConnectAck;
};

View File

@ -192,6 +192,11 @@ bdSpace::bdSpace(bdNodeId *ownId, bdDhtFunctions *fns)
{
/* make some space for data */
buckets.resize(mFns->bdNumBuckets());
mAttachTS = 0;
mAttachedFlags = 0;
mAttachedCount = 0;
return;
}
@ -207,11 +212,17 @@ int bdSpace::clear()
return 1;
}
int bdSpace::setAttachedFlag(uint32_t withflags, int count)
{
mAttachedFlags = withflags;
mAttachedCount = count;
mAttachTS = 0;
return 1;
}
int bdSpace::find_nearest_nodes_with_flags(const bdNodeId *id, int number,
std::list<bdId> /* excluding */,
std::multimap<bdMetric, bdId> &nearest, uint32_t with_flag)
std::multimap<bdMetric, bdId> &nearest, uint32_t with_flags)
{
std::multimap<bdMetric, bdId> closest;
std::multimap<bdMetric, bdId>::iterator mit;
@ -237,7 +248,7 @@ int bdSpace::find_nearest_nodes_with_flags(const bdNodeId *id, int number,
{
for(eit = it->entries.begin(); eit != it->entries.end(); eit++)
{
if ((!with_flag) || (with_flag & eit->mPeerFlags))
if ((!with_flags) || ((with_flags & eit->mPeerFlags) == with_flags))
{
mFns->bdDistance(id, &(eit->mPeerId.id), &dist);
closest.insert(std::pair<bdMetric, bdId>(dist, eit->mPeerId));
@ -319,7 +330,7 @@ int bdSpace::find_nearest_nodes(const bdNodeId *id, int number,
/* This is much cheaper than find nearest... we only look in the one bucket
*/
int bdSpace::find_node(const bdNodeId *id, int number, std::list<bdId> &matchIds, uint32_t with_flag)
int bdSpace::find_node(const bdNodeId *id, int number, std::list<bdId> &matchIds, uint32_t with_flags)
{
bdMetric dist;
mFns->bdDistance(id, &(mOwnId), &dist);
@ -346,7 +357,7 @@ int bdSpace::find_node(const bdNodeId *id, int number, std::list<bdId> &matchIds
std::cerr << " withFlags: " << eit->mPeerFlags;
std::cerr << std::endl;
if ((!with_flag) || (with_flag & eit->mPeerFlags))
if ((!with_flags) || ((with_flags & eit->mPeerFlags) == with_flags))
{
if (*id == eit->mPeerId.id)
{
@ -423,7 +434,7 @@ int bdSpace::find_exactnode(const bdId *id, bdPeer &peer)
}
#if 0
int bdSpace::out_of_date_peer(bdId &id)
{
/*
@ -492,6 +503,177 @@ int bdSpace::out_of_date_peer(bdId &id)
return 0;
}
#endif
#define BITDHT_ATTACHED_SEND_PERIOD 17
int bdSpace::scanOutOfDatePeers(std::list<bdId> &peerIds)
{
/*
*
*/
bool doAttached = (mAttachedCount > 0);
int attachedCount = 0;
std::map<bdMetric, bdId> closest;
std::map<bdMetric, bdId>::iterator mit;
std::vector<bdBucket>::iterator it;
std::list<bdPeer>::iterator eit;
time_t ts = time(NULL);
/* iterate through the buckets, and sort by distance */
for(it = buckets.begin(); it != buckets.end(); it++)
{
for(eit = it->entries.begin(); eit != it->entries.end(); )
{
bool added = false;
if (doAttached)
{
if (eit->mExtraFlags & BITDHT_PEER_EXFLAG_ATTACHED)
{
/* add to send list, if we haven't pinged recently */
if ((ts - eit->mLastSendTime > BITDHT_ATTACHED_SEND_PERIOD ) &&
(ts - eit->mLastRecvTime > BITDHT_ATTACHED_SEND_PERIOD ))
{
peerIds.push_back(eit->mPeerId);
eit->mLastSendTime = ts;
added = true;
}
attachedCount++;
}
}
/* timeout on last send time! */
if ((!added) && (ts - eit->mLastSendTime > BITDHT_MAX_SEND_PERIOD ))
{
/* We want to ping a peer iff:
* 1) They are out-of-date: mLastRecvTime is too old.
* 2) They don't have 0x0001 flag (we haven't received a PONG) and never sent.
*/
if ((ts - eit->mLastRecvTime > BITDHT_MAX_SEND_PERIOD ) ||
!(eit->mPeerFlags & BITDHT_PEER_STATUS_RECV_PONG))
{
peerIds.push_back(eit->mPeerId);
eit->mLastSendTime = ts;
}
}
/* we also want to remove very old entries (should it happen here?)
* which are not pushed out by newer entries (will happen in for closer buckets)
*/
bool discard = false;
/* discard very old entries */
if (ts - eit->mLastRecvTime > BITDHT_DISCARD_PERIOD)
{
discard = true;
}
/* discard peers which have not responded to anything (ie have no flags set) */
/* changed into have not id'ed themselves, as we've added ping to list of flags. */
if ((ts - eit->mFoundTime > BITDHT_MAX_RESPONSE_PERIOD ) &&
!(eit->mPeerFlags & BITDHT_PEER_STATUS_RECV_PONG))
{
discard = true;
}
/* INCREMENT */
if (discard)
{
eit = it->entries.erase(eit);
}
else
{
eit++;
}
}
}
#define ATTACH_UPDATE_PERIOD 600
if ((ts - mAttachTS > ATTACH_UPDATE_PERIOD) || (attachedCount != mAttachedCount))
{
std::cerr << "Updating ATTACH Stuff";
std::cerr << std::endl;
updateAttachedPeers(); /* XXX TEMP HACK to look at stability */
mAttachTS = ts;
}
return (peerIds.size());
}
int bdSpace::updateAttachedPeers()
{
/*
*
*/
bool doAttached = (mAttachedCount > 0);
int attachedCount = 0;
if (!doAttached)
{
return 0;
}
std::map<bdMetric, bdId> closest;
std::map<bdMetric, bdId>::iterator mit;
std::vector<bdBucket>::iterator it;
std::list<bdPeer>::reverse_iterator eit;
/* skip the first bucket, as we don't want to ping ourselves! */
it = buckets.begin();
if (it != buckets.end())
{
it++;
}
/* iterate through the buckets (sorted by distance) */
for(; it != buckets.end(); it++)
{
/* start from the back, as these are the most recently seen (and more likely to be the old ATTACHED) */
for(eit = it->entries.rbegin(); eit != it->entries.rend(); eit++)
{
bool added = false;
if (doAttached)
{
if ((eit->mPeerFlags & mAttachedFlags) == mAttachedFlags)
{
/* flag as attached */
eit->mExtraFlags |= BITDHT_PEER_EXFLAG_ATTACHED;
/* inc count, and cancel search if we've found them */
attachedCount++;
if (attachedCount >= mAttachedCount)
{
doAttached = false;
}
}
else
{
eit->mExtraFlags &= ~BITDHT_PEER_EXFLAG_ATTACHED;
}
}
else
{
eit->mExtraFlags &= ~BITDHT_PEER_EXFLAG_ATTACHED;
}
}
}
}
/* Called to add or update peer.
* sorts bucket lists by lastRecvTime.
* updates requested node.
@ -500,6 +682,7 @@ int bdSpace::out_of_date_peer(bdId &id)
/* peer flags
* order is important!
* higher bits = more priority.
* BITDHT_PEER_STATUS_RECVPING
* BITDHT_PEER_STATUS_RECVPONG
* BITDHT_PEER_STATUS_RECVNODES
* BITDHT_PEER_STATUS_RECVHASHES
@ -543,15 +726,22 @@ int bdSpace::add_peer(const bdId *id, uint32_t peerflags)
/* loop through ids, to find it */
for(it = buck.entries.begin(); it != buck.entries.end(); it++)
{
if (*id == it->mPeerId)
// should check addr too!
{
/* similar id check */
if (mFns->bdSimilarId(id, &(it->mPeerId)))
{
bdPeer peer = *it;
it = buck.entries.erase(it);
peer.mLastRecvTime = ts;
peer.mPeerFlags |= peerflags; /* must be cumulative ... so can do online, replynodes, etc */
/* also update port from incoming id, as we have definitely recved from it */
if (mFns->bdUpdateSimilarId(&(peer.mPeerId), id))
{
/* updated it... must be Unstable */
peer.mExtraFlags |= BITDHT_PEER_EXFLAG_UNSTABLE;
}
buck.entries.push_back(peer);
#ifdef DEBUG_BD_SPACE
@ -625,6 +815,7 @@ int bdSpace::add_peer(const bdId *id, uint32_t peerflags)
newPeer.mLastSendTime = 0; // ts; //????
newPeer.mFoundTime = ts;
newPeer.mPeerFlags = peerflags;
newPeer.mExtraFlags = 0;
buck.entries.push_back(newPeer);
@ -1047,10 +1238,11 @@ bool bdSpace::findRandomPeerWithFlag(bdId &id, uint32_t withFlag)
{
if (i == rnd)
{
#ifdef BITDHT_DEBUG
std::cerr << "bdSpace::findRandomPeerWithFlag() found #" << i;
std::cerr << " in bucket #" << buck;
std::cerr << std::endl;
#endif
/* found */
id = lit->mPeerId;
return true;
@ -1061,6 +1253,8 @@ bool bdSpace::findRandomPeerWithFlag(bdId &id, uint32_t withFlag)
}
std::cerr << "bdSpace::findRandomPeerWithFlag() failed to find " << rnd << " / " << totalcount;
std::cerr << std::endl;
#ifdef BITDHT_DEBUG
#endif
return false;
}

View File

@ -159,6 +159,8 @@ class bdSpace
int clear();
int setAttachedFlag(uint32_t withflags, int count);
/* accessors */
int find_nearest_nodes(const bdNodeId *id, int number,
std::multimap<bdMetric, bdId> &nearest);
@ -171,7 +173,11 @@ int find_node(const bdNodeId *id, int number,
std::list<bdId> &matchIds, uint32_t with_flag);
int find_exactnode(const bdId *id, bdPeer &peer);
int out_of_date_peer(bdId &id); // side-effect updates, send flag on peer.
// switched to more efficient single sweep.
//int out_of_date_peer(bdId &id); // side-effect updates, send flag on peer.
int scanOutOfDatePeers(std::list<bdId> &peerIds);
int updateAttachedPeers();
int add_peer(const bdId *id, uint32_t mode);
int printDHT();
int getDhtBucket(const int idx, bdBucket &bucket);
@ -192,6 +198,10 @@ int updateOwnId(bdNodeId *newOwnId);
std::vector<bdBucket> buckets;
bdNodeId mOwnId;
bdDhtFunctions *mFns;
uint32_t mAttachedFlags;
uint32_t mAttachedCount;
time_t mAttachTS;
};

View File

@ -316,7 +316,11 @@ int bdQuery::addClosestPeer(const bdId *id, uint32_t mode)
if (mode)
{
/* also update port from incoming id, as we have definitely recved from it */
mFns->bdUpdateSimilarId(&(it->second.mPeerId), id);
if (mFns->bdUpdateSimilarId(&(it->second.mPeerId), id))
{
/* updated it... must be Unstable */
it->second.mExtraFlags |= BITDHT_PEER_EXFLAG_UNSTABLE;
}
}
if (mode & BITDHT_PEER_STATUS_RECV_NODES)
{
@ -567,7 +571,11 @@ int bdQuery::updatePotentialPeer(const bdId *id, uint32_t mode, uint32_t addType
else if (mode)
{
/* also update port from incoming id, as we have definitely recved from it */
mFns->bdUpdateSimilarId(&(it->second.mPeerId), id);
if (mFns->bdUpdateSimilarId(&(it->second.mPeerId), id))
{
/* updated it... must be Unstable */
it->second.mExtraFlags |= BITDHT_PEER_EXFLAG_UNSTABLE;
}
}
return 0;
}
@ -818,7 +826,11 @@ int bdQuery::updateProxyList(const bdId *id, uint32_t mode, std::list<bdPeer> &s
if (mode)
{
/* also update port from incoming id, as we have definitely recved from it */
mFns->bdUpdateSimilarId(&(it->mPeerId), id);
if (mFns->bdUpdateSimilarId(&(it->mPeerId), id))
{
/* updated it... must be Unstable */
it->mExtraFlags |= BITDHT_PEER_EXFLAG_UNSTABLE;
}
}
it->mPeerFlags |= mode;
it->mLastRecvTime = now;

View File

@ -0,0 +1,310 @@
/*
* bitdht/bdnode.cc
*
* BitDHT: An Flexible DHT library.
*
* Copyright 2010 by Robert Fernie
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 3 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "bitdht@lunamutt.com".
*
*/
#include "bitdht/bdquerymgr.h"
#include "bitdht/bdnode.h"
#include <string.h>
#include <stdlib.h>
#include <iostream>
#include <iomanip>
#include <sstream>
#define BITDHT_QUERY_START_PEERS 10
#define BITDHT_QUERY_NEIGHBOUR_PEERS 8
#define BITDHT_MAX_REMOTE_QUERY_AGE 10
/****
* #define DEBUG_NODE_MULTIPEER 1
* #define DEBUG_NODE_MSGS 1
* #define DEBUG_NODE_ACTIONS 1
* #define DEBUG_NODE_MSGIN 1
* #define DEBUG_NODE_MSGOUT 1
***/
//#define DEBUG_NODE_MSGS 1
bdQueryManager::bdQueryManager(bdSpace *space, bdDhtFunctions *fns, bdNodePublisher *pub)
:mNodeSpace(space), mFns(fns), mPub(pub)
{
}
/***** Startup / Shutdown ******/
void bdQueryManager::shutdownQueries()
{
/* clear the queries */
std::list<bdQuery *>::iterator it;
for(it = mLocalQueries.begin(); it != mLocalQueries.end();it++)
{
delete (*it);
}
mLocalQueries.clear();
}
void bdQueryManager::printQueries()
{
std::cerr << "bdQueryManager::printQueries()";
std::cerr << std::endl;
int i = 0;
std::list<bdQuery *>::iterator it;
for(it = mLocalQueries.begin(); it != mLocalQueries.end(); it++, i++)
{
fprintf(stderr, "Query #%d:\n", i);
(*it)->printQuery();
fprintf(stderr, "\n");
}
}
int bdQueryManager::iterateQueries(int maxQueries)
{
#ifdef DEBUG_NODE_MULTIPEER
std::cerr << "bdQueryManager::iterateQueries() of Peer: ";
mFns->bdPrintNodeId(std::cerr, &mOwnId);
std::cerr << std::endl;
#endif
/* allow each query to send up to one query... until maxMsgs has been reached */
int numQueries = mLocalQueries.size();
int sentQueries = 0;
int i = 0;
bdId id;
bdNodeId targetNodeId;
while((i < numQueries) && (sentQueries < maxQueries))
{
bdQuery *query = mLocalQueries.front();
mLocalQueries.pop_front();
mLocalQueries.push_back(query);
/* go through the possible queries */
if (query->nextQuery(id, targetNodeId))
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdQueryManager::iteration() send_query(";
mFns->bdPrintId(std::cerr, &id);
std::cerr << ",";
mFns->bdPrintNodeId(std::cerr, &targetNodeId);
std::cerr << ")";
std::cerr << std::endl;
#endif
mPub->send_query(&id, &targetNodeId);
sentQueries++;
}
i++;
}
#ifdef DEBUG_NODE_ACTIONS
std::cerr << "bdQueryManager::iteration() maxMsgs: " << maxMsgs << " sentPings: " << sentPings;
std::cerr << " / " << allowedPings;
std::cerr << " sentQueries: " << sentQueries;
std::cerr << " / " << numQueries;
std::cerr << std::endl;
#endif
//printQueries();
return sentQueries;
}
bool bdQueryManager::checkPotentialPeer(bdId *id, bdId *src)
{
bool isWorthyPeer = false;
/* also push to queries */
std::list<bdQuery *>::iterator it;
for(it = mLocalQueries.begin(); it != mLocalQueries.end(); it++)
{
if ((*it)->addPotentialPeer(id, src, 0))
{
isWorthyPeer = true;
}
}
return isWorthyPeer;
}
void bdQueryManager::addPeer(const bdId *id, uint32_t peerflags)
{
#ifdef DEBUG_NODE_ACTIONS
fprintf(stderr, "bdQueryManager::addPeer(");
mFns->bdPrintId(std::cerr, id);
fprintf(stderr, ")\n");
#endif
/* iterate through queries */
std::list<bdQuery *>::iterator it;
for(it = mLocalQueries.begin(); it != mLocalQueries.end(); it++)
{
(*it)->addPeer(id, peerflags);
}
}
/************************************ Query Details *************************/
void bdQueryManager::addQuery(const bdNodeId *id, uint32_t qflags)
{
std::list<bdId> startList;
std::multimap<bdMetric, bdId> nearest;
std::multimap<bdMetric, bdId>::iterator it;
mNodeSpace->find_nearest_nodes(id, BITDHT_QUERY_START_PEERS, nearest);
fprintf(stderr, "bdQueryManager::addQuery(");
mFns->bdPrintNodeId(std::cerr, id);
fprintf(stderr, ")\n");
for(it = nearest.begin(); it != nearest.end(); it++)
{
startList.push_back(it->second);
}
bdQuery *query = new bdQuery(id, startList, qflags, mFns);
mLocalQueries.push_back(query);
}
void bdQueryManager::clearQuery(const bdNodeId *rmId)
{
std::list<bdQuery *>::iterator it;
for(it = mLocalQueries.begin(); it != mLocalQueries.end();)
{
if ((*it)->mId == *rmId)
{
bdQuery *query = (*it);
it = mLocalQueries.erase(it);
delete query;
}
else
{
it++;
}
}
}
void bdQueryManager::QueryStatus(std::map<bdNodeId, bdQueryStatus> &statusMap)
{
std::list<bdQuery *>::iterator it;
for(it = mLocalQueries.begin(); it != mLocalQueries.end(); it++)
{
bdQueryStatus status;
status.mStatus = (*it)->mState;
status.mQFlags = (*it)->mQueryFlags;
(*it)->result(status.mResults);
statusMap[(*it)->mId] = status;
}
}
int bdQueryManager::QuerySummary(const bdNodeId *id, bdQuerySummary &query)
{
std::list<bdQuery *>::iterator it;
for(it = mLocalQueries.begin(); it != mLocalQueries.end(); it++)
{
if ((*it)->mId == *id)
{
query.mId = (*it)->mId;
query.mLimit = (*it)->mLimit;
query.mState = (*it)->mState;
query.mQueryTS = (*it)->mQueryTS;
query.mQueryFlags = (*it)->mQueryFlags;
query.mSearchTime = (*it)->mSearchTime;
query.mClosest = (*it)->mClosest;
query.mPotentialPeers = (*it)->mPotentialPeers;
query.mProxiesUnknown = (*it)->mProxiesUnknown;
query.mProxiesFlagged = (*it)->mProxiesFlagged;
return 1;
}
}
return 0;
}
/* Extract Results from Peer Queries */
#define BDQRYMGR_RESULTS 1
#define BDQRYMGR_PROXIES 2
#define BDQRYMGR_POTPROXIES 3
int bdQueryManager::getResults(bdNodeId *target, std::list<bdId> &answer, int querytype)
{
/* grab any peers from any existing query */
int results = 0;
std::list<bdQuery *>::iterator qit;
for(qit = mLocalQueries.begin(); qit != mLocalQueries.end(); qit++)
{
if (!((*qit)->mId == (*target)))
{
continue;
}
#ifdef DEBUG_NODE_CONNECTION
std::cerr << "bdQueryManager::getResults() Found Matching Query";
std::cerr << std::endl;
#endif
switch(querytype)
{
default:
case BDQRYMGR_RESULTS:
results = (*qit)->result(answer);
break;
case BDQRYMGR_PROXIES:
results = (*qit)->proxies(answer);
break;
case BDQRYMGR_POTPROXIES:
results = (*qit)->potentialProxies(answer);
break;
}
/* will only be one matching query.. so end loop */
return results;
}
}
int bdQueryManager::result(bdNodeId *target, std::list<bdId> &answer)
{
return getResults(target, answer, BDQRYMGR_RESULTS);
}
int bdQueryManager::proxies(bdNodeId *target, std::list<bdId> &answer)
{
return getResults(target, answer, BDQRYMGR_PROXIES);
}
int bdQueryManager::potentialProxies(bdNodeId *target, std::list<bdId> &answer)
{
return getResults(target, answer, BDQRYMGR_POTPROXIES);
}

View File

@ -0,0 +1,73 @@
#ifndef BITDHT_QUERY_MANAGER_H
#define BITDHT_QUERY_MANAGER_H
/*
* bitdht/bdquerymgr.h
*
* BitDHT: An Flexible DHT library.
*
* Copyright 2011 by Robert Fernie
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 3 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "bitdht@lunamutt.com".
*
*/
#include "bitdht/bdquery.h"
class bdNodePublisher;
class bdQueryManager
{
public:
bdQueryManager(bdSpace *space, bdDhtFunctions *fns, bdNodePublisher *pub);
void shutdownQueries();
void printQueries();
int iterateQueries(int maxqueries);
bool checkPotentialPeer(bdId *id, bdId *src);
void addPeer(const bdId *id, uint32_t peerflags);
void addQuery(const bdNodeId *id, uint32_t qflags);
void clearQuery(const bdNodeId *id);
void QueryStatus(std::map<bdNodeId, bdQueryStatus> &statusMap);
int QuerySummary(const bdNodeId *id, bdQuerySummary &query);
int result(bdNodeId *target, std::list<bdId> &answer);
int proxies(bdNodeId *target, std::list<bdId> &answer);
int potentialProxies(bdNodeId *target, std::list<bdId> &answer);
private:
int getResults(bdNodeId *target, std::list<bdId> &answer, int querytype);
/* NB: No Mutex Protection... Single threaded, Mutex at higher level!
*/
bdSpace *mNodeSpace;
bdDhtFunctions *mFns;
bdNodePublisher *mPub;
std::list<bdQuery *> mLocalQueries;
};
#endif // BITDHT_QUERY_MANAGER_H

View File

@ -75,22 +75,29 @@ void bdStdZeroNodeId(bdNodeId *id)
// can could end-up with the wrong port.
// However this only matters with firewalled peers anyway.
// So not too serious.
uint32_t bdStdSimilarId(const bdId *n1, const bdId *n2)
bool bdStdSimilarId(const bdId *n1, const bdId *n2)
{
if (n1->id == n2->id)
{
if (n1->addr.sin_addr.s_addr == n2->addr.sin_addr.s_addr)
{
return 1;
return true;
}
}
return 0;
return false;
}
void bdStdUpdateSimilarId(bdId *dest, const bdId *src)
bool bdStdUpdateSimilarId(bdId *dest, const bdId *src)
{
/* only difference that's currently allowed */
if (dest->addr.sin_port == src->addr.sin_port)
{
/* no update required */
return false;
}
dest->addr.sin_port = src->addr.sin_port;
return true;
}
@ -274,13 +281,13 @@ int bdStdDht::bdBucketDistance(const bdMetric *metric)
}
uint32_t bdStdDht::bdSimilarId(const bdId *id1, const bdId *id2)
bool bdStdDht::bdSimilarId(const bdId *id1, const bdId *id2)
{
return bdStdSimilarId(id1, id2);
}
void bdStdDht::bdUpdateSimilarId(bdId *dest, const bdId *src)
bool bdStdDht::bdUpdateSimilarId(bdId *dest, const bdId *src)
{
return bdStdUpdateSimilarId(dest, src);
}

View File

@ -76,8 +76,8 @@ virtual int bdDistance(const bdNodeId *n1, const bdNodeId *n2, bdMetric *metric)
virtual int bdBucketDistance(const bdNodeId *n1, const bdNodeId *n2);
virtual int bdBucketDistance(const bdMetric *metric);
virtual uint32_t bdSimilarId(const bdId *id1, const bdId *id2);
virtual void bdUpdateSimilarId(bdId *dest, const bdId *src);
virtual bool bdSimilarId(const bdId *id1, const bdId *id2);
virtual bool bdUpdateSimilarId(bdId *dest, const bdId *src); /* returns true if update was necessary */
virtual void bdRandomMidId(const bdNodeId *target, const bdNodeId *other, bdNodeId *mid);

View File

@ -101,6 +101,7 @@ int bdStore::reloadFromStore()
}
// This is a very ugly function!
int bdStore::getPeer(bdPeer *peer)
{
#ifdef DEBUG_STORE
@ -119,6 +120,34 @@ int bdStore::getPeer(bdPeer *peer)
return 0;
}
int bdStore::filterIpList(const std::list<struct sockaddr_in> &filteredIPs)
{
// Nasty O(n^2) iteration over 500 entries!!!.
// hope its not used to often.
std::list<struct sockaddr_in>::const_iterator it;
for(it = filteredIPs.begin(); it != filteredIPs.end(); it++)
{
std::list<bdPeer>::iterator sit;
for(sit = store.begin(); sit != store.end();)
{
if (it->sin_addr.s_addr == sit->mPeerId.addr.sin_addr.s_addr)
{
std::cerr << "bdStore::filterIpList() Found Bad entry in Store. Erasing!";
std::cerr << std::endl;
sit = store.erase(sit);
}
else
{
sit++;
}
}
}
}
#define MAX_ENTRIES 500
/* maintain a sorted list */
@ -219,3 +248,5 @@ void bdStore::writeStore()
return writeStore(mStoreFile);
}

View File

@ -38,6 +38,7 @@ class bdStore
bdStore(std::string file, bdDhtFunctions *fns);
int reloadFromStore(); /* for restarts */
int filterIpList(const std::list<struct sockaddr_in> &filteredIPs);
int clear();
int getPeer(bdPeer *peer);

View File

@ -107,6 +107,7 @@ HEADERS += \
bitdht/bdconnection.h \
bitdht/bdfilter.h \
bitdht/bdaccount.h \
bitdht/bdquerymgr.h \
SOURCES += \
bitdht/bencode.c \
@ -129,5 +130,6 @@ SOURCES += \
bitdht/bdconnection.cc \
bitdht/bdfilter.cc \
bitdht/bdaccount.cc \
bitdht/bdquerymgr.cc \