From 4032f8242ce8870205a70c9f8c4d1bf2883c6fd7 Mon Sep 17 00:00:00 2001 From: drbob Date: Mon, 13 Jun 2011 12:54:03 +0000 Subject: [PATCH] Second stage of improved bitdht check-in. * Added Port Restricted Udp Layer, to simulate firewalled peers locally. * Corrected LossyUdpLayer implementation as well (not tested). * added bdsockaddr_clear() function to bitdht::utils. * Added Parsing of incoming CONNECT msgs. * Corrected Connect stats. * added connection tick(). * Fixed up basic Connection starting & Timeout cleanup. * Added lots of Connection Debug. * Added in missing transIds. * Added Check for "Similar" connections, to avoid duplicate attempts. * Filled in bdConnectionRequest functions. * Added beMsgGetBdId decode function. * Unified CONNECT_MODE flags. * Corrected bdPrintTransId bug (bad chars) * Tweaked bitdht startup parameters (NUM_PEERS & STARTUP TIME) for testing. git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-peernet@4255 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libbitdht/src/bitdht/bdconnection.cc | 267 +++++++++++++++++++++++---- libbitdht/src/bitdht/bdconnection.h | 13 +- libbitdht/src/bitdht/bdmanager.cc | 20 +- libbitdht/src/bitdht/bdmsgs.cc | 44 ++++- libbitdht/src/bitdht/bdmsgs.h | 5 + libbitdht/src/bitdht/bdnode.cc | 129 ++++++++++++- libbitdht/src/bitdht/bdnode.h | 6 +- libbitdht/src/bitdht/bdobj.cc | 3 +- libbitdht/src/udp/udplayer.cc | 117 ++++++++++-- libbitdht/src/udp/udplayer.h | 30 +++ libbitdht/src/udp/udpstack.cc | 28 +++ libbitdht/src/udp/udpstack.h | 8 + libbitdht/src/util/bdnet.cc | 7 + libbitdht/src/util/bdnet.h | 2 +- 14 files changed, 611 insertions(+), 68 deletions(-) diff --git a/libbitdht/src/bitdht/bdconnection.cc b/libbitdht/src/bitdht/bdconnection.cc index 46cecaad9..e7b5e0d47 100644 --- a/libbitdht/src/bitdht/bdconnection.cc +++ b/libbitdht/src/bitdht/bdconnection.cc @@ -28,12 +28,16 @@ #include "bitdht/bdnode.h" #include "bitdht/bdconnection.h" #include "bitdht/bdmsgs.h" +#include "bitdht/bdstddht.h" +#include "util/bdnet.h" + +#define DEBUG_NODE_CONNECTION 1 #if 0 #include "bitdht/bencode.h" #include "bitdht/bdmsgs.h" -#include "util/bdnet.h" + #include #include @@ -73,7 +77,6 @@ std::string getConnectMsgType(int msgtype) void bdNode::msgout_connect_genmsg(bdId *id, bdToken *transId, int msgtype, bdId *srcAddr, bdId *destAddr, int mode, int status) { -#ifdef DEBUG_NODE_MSGOUT std::cerr << "bdNode::msgout_connect_genmsg() Type: " << getConnectMsgType(msgtype); std::cerr << " TransId: "; bdPrintTransId(std::cerr, transId); @@ -86,6 +89,7 @@ void bdNode::msgout_connect_genmsg(bdId *id, bdToken *transId, int msgtype, bdId std::cerr << " Mode: " << mode; std::cerr << " Status: " << status; std::cerr << std::endl; +#ifdef DEBUG_NODE_MSGOUT #endif registerOutgoingMsg(id, transId, msgtype); @@ -105,7 +109,6 @@ void bdNode::msgin_connect_genmsg(bdId *id, bdToken *transId, int msgtype, { std::list::iterator it; -#ifdef DEBUG_NODE_MSGS std::cerr << "bdNode::msgin_connect_genmsg() Type: " << getConnectMsgType(msgtype); std::cerr << " TransId: "; bdPrintTransId(std::cerr, transId); @@ -118,6 +121,7 @@ void bdNode::msgin_connect_genmsg(bdId *id, bdToken *transId, int msgtype, std::cerr << " Mode: " << mode; std::cerr << " Status: " << status; std::cerr << std::endl; +#ifdef DEBUG_NODE_MSGS #else (void) transId; #endif @@ -179,16 +183,16 @@ void bdNode::msgin_connect_genmsg(bdId *id, bdToken *transId, int msgtype, * 1) Direct Endpoint. * 2) Using a Proxy. */ - + int bdNode::requestConnection(struct sockaddr_in *laddr, bdNodeId *target, uint32_t mode) { /* check if connection obj already exists */ #ifdef DEBUG_NODE_CONNECTION std::cerr << "bdNode::requestConnection() Mode: " << mode; std::cerr << " Target: "; - mFns->bdPrintNodeId(std::cerr, id); - std::cerr << " Local NetAddress: "; - mFns->bdPrintAddr(std::cerr, laddr); + mFns->bdPrintNodeId(std::cerr, target); + std::cerr << " Local NetAddress: " << inet_ntoa(laddr->sin_addr); + std::cerr << ":" << ntohs(laddr->sin_port); std::cerr << std::endl; #endif @@ -244,6 +248,10 @@ int bdNode::requestConnection_direct(struct sockaddr_in *laddr, bdNodeId *target continue; } +#ifdef DEBUG_NODE_CONNECTION + std::cerr << "bdNode::requestConnection_direct() Found Matching Query"; + std::cerr << std::endl; +#endif /* matching query */ /* find any potential proxies (must be same DHT type XXX TODO) */ (*qit)->result(connreq.mPotentialProxies); @@ -292,6 +300,10 @@ int bdNode::requestConnection_proxy(struct sockaddr_in *laddr, bdNodeId *target, continue; } +#ifdef DEBUG_NODE_CONNECTION + std::cerr << "bdNode::requestConnection_proxy() Found Matching Query"; + std::cerr << std::endl; +#endif /* matching query */ /* find any potential proxies (must be same DHT type XXX TODO) */ (*qit)->proxies(connreq.mPotentialProxies); @@ -350,8 +362,21 @@ void bdNode::addPotentialConnectionProxy(bdId *srcId, bdId *target) } } + +int bdNode::tickConnections() +{ + iterateConnectionRequests(); + iterateConnections(); +} + + void bdNode::iterateConnectionRequests() { + time_t now = time(NULL); + + std::list eraseList; + std::list::iterator eit; + std::map::iterator it; for(it = mConnectionRequests.begin(); it != mConnectionRequests.end(); it++) { @@ -359,9 +384,68 @@ void bdNode::iterateConnectionRequests() if (it->second.mState == BITDHT_CONNREQUEST_INIT) { /* kick off the connection if possible */ - //startConnectionAttempt(it->second); + startConnectionAttempt(&(it->second)); + } + + // Cleanup + if (now - it->second.mStateTS > BITDHT_CONNREQUEST_MAX_AGE) + { + std::cerr << "bdNode::iterateConnectionAttempt() Cleaning Old ConnReq: "; + std::cerr << std::endl; + std::cerr << it->second; + std::cerr << std::endl; + + /* cleanup */ + eraseList.push_back(it->first); } } + + for(eit = eraseList.begin(); eit != eraseList.end(); eit++) + { + it = mConnectionRequests.find(*eit); + if (it != mConnectionRequests.end()) + { + mConnectionRequests.erase(it); + } + } +} + +int bdNode::startConnectionAttempt(bdConnectionRequest *req) +{ + std::cerr << "bdNode::startConnectionAttempt() ConnReq: "; + std::cerr << std::endl; + std::cerr << *req; + std::cerr << std::endl; + + if (req->mPotentialProxies.size() < 1) + { + std::cerr << "bdNode::startConnectionAttempt() No Potential Proxies... delaying attempt"; + std::cerr << std::endl; + return 0; + } + + bdId proxyId; + bdId srcConnAddr; + bdId destConnAddr; + + int mode = req->mMode; + + destConnAddr.id = req->mTarget; + bdsockaddr_clear(&(destConnAddr.addr)); + + srcConnAddr.id = mOwnId; + srcConnAddr.addr = req->mLocalAddr; + + proxyId = req->mPotentialProxies.front(); + req->mPotentialProxies.pop_front(); + + req->mCurrentAttempt = proxyId; + req->mPeersTried.push_back(proxyId); + + req->mState = BITDHT_CONNREQUEST_INPROGRESS; + req->mStateTS = time(NULL); + + return startConnectionAttempt(&proxyId, &srcConnAddr, &destConnAddr, mode); } @@ -407,11 +491,13 @@ int bdNode::startConnectionAttempt(bdId *proxyId, bdId *srcConnAddr, bdId *destC conn->ConnectionSetup(proxyId, srcConnAddr, destConnAddr, mode); /* push off message */ - bdToken *transId; + bdToken transId; + genNewTransId(&transId); + int msgtype = BITDHT_MSG_TYPE_CONNECT_REQUEST; int status = BITDHT_CONNECT_ANSWER_OKAY; - msgout_connect_genmsg(&(conn->mProxyId), transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), conn->mMode, status); + msgout_connect_genmsg(&(conn->mProxyId), &transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), conn->mMode, status); return 1; } @@ -475,10 +561,12 @@ void bdNode::AuthConnectionOk(bdId *srcId, bdId *proxyId, bdId *destId, int mode conn->AuthoriseEndConnection(srcId, proxyId, destId, mode, loc); /* we respond to the proxy which will finalise connection */ - bdToken *transId; + bdToken transId; + genNewTransId(&transId); + int msgtype = BITDHT_MSG_TYPE_CONNECT_REPLY; int status = BITDHT_CONNECT_ANSWER_OKAY; - msgout_connect_genmsg(&(conn->mProxyId), transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), conn->mMode, status); + msgout_connect_genmsg(&(conn->mProxyId), &transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), conn->mMode, status); return; } @@ -497,10 +585,12 @@ void bdNode::AuthConnectionOk(bdId *srcId, bdId *proxyId, bdId *destId, int mode conn->AuthoriseProxyConnection(srcId, proxyId, destId, mode, loc); - bdToken *transId; + bdToken transId; + genNewTransId(&transId); + int msgtype = BITDHT_MSG_TYPE_CONNECT_REQUEST; int status = BITDHT_CONNECT_ANSWER_OKAY; - msgout_connect_genmsg(&(conn->mDestId), transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), conn->mMode, status); + msgout_connect_genmsg(&(conn->mDestId), &transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), conn->mMode, status); } else { @@ -536,10 +626,12 @@ void bdNode::AuthConnectionNo(bdId *srcId, bdId *proxyId, bdId *destId, int mode if (mode == BITDHT_CONNECT_MODE_DIRECT) { /* we respond to the proxy which will finalise connection */ - bdToken *transId; + bdToken transId; + genNewTransId(&transId); + int status = BITDHT_CONNECT_ANSWER_NOK; int msgtype = BITDHT_MSG_TYPE_CONNECT_REPLY; - msgout_connect_genmsg(&(conn->mSrcId), transId, msgtype, + msgout_connect_genmsg(&(conn->mSrcId), &transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), mode, status); cleanConnection(&(srcId->id), &(proxyId->id), &(destId->id)); @@ -549,10 +641,12 @@ void bdNode::AuthConnectionNo(bdId *srcId, bdId *proxyId, bdId *destId, int mode if (loc == BD_PROXY_CONNECTION_END_POINT) { /* we respond to the proxy which will finalise connection */ - bdToken *transId; + bdToken transId; + genNewTransId(&transId); + int status = BITDHT_CONNECT_ANSWER_NOK; int msgtype = BITDHT_MSG_TYPE_CONNECT_REPLY; - msgout_connect_genmsg(&(conn->mProxyId), transId, msgtype, + msgout_connect_genmsg(&(conn->mProxyId), &transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), mode, status); cleanConnection(&(srcId->id), &(proxyId->id), &(destId->id)); @@ -561,10 +655,12 @@ void bdNode::AuthConnectionNo(bdId *srcId, bdId *proxyId, bdId *destId, int mode } /* otherwise we are the proxy (for either), reply FAIL */ - bdToken *transId; + bdToken transId; + genNewTransId(&transId); + int status = BITDHT_CONNECT_ANSWER_NOK; int msgtype = BITDHT_MSG_TYPE_CONNECT_REPLY; - msgout_connect_genmsg(&(conn->mSrcId), transId, msgtype, + msgout_connect_genmsg(&(conn->mSrcId), &transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), mode, status); cleanConnection(&(srcId->id), &(proxyId->id), &(destId->id)); @@ -609,17 +705,21 @@ void bdNode::iterateConnections() it->second.mRetryCount++; if (!it->second.mSrcAck) { - bdToken *transId; + bdToken transId; + genNewTransId(&transId); + int msgtype = BITDHT_MSG_TYPE_CONNECT_START; - msgout_connect_genmsg(&(it->second.mSrcId), transId, msgtype, + msgout_connect_genmsg(&(it->second.mSrcId), &transId, msgtype, &(it->second.mSrcConnAddr), &(it->second.mDestConnAddr), it->second.mMode, it->second.mBandwidth); } if (!it->second.mDestAck) { - bdToken *transId; + bdToken transId; + genNewTransId(&transId); + int msgtype = BITDHT_MSG_TYPE_CONNECT_START; - msgout_connect_genmsg(&(it->second.mDestId), transId, msgtype, + msgout_connect_genmsg(&(it->second.mDestId), &transId, msgtype, &(it->second.mSrcConnAddr), &(it->second.mDestConnAddr), it->second.mMode, it->second.mBandwidth); } @@ -723,6 +823,22 @@ bdConnection::bdConnection() //time_t mCompletedTS; } + /* heavy check, used to check for alternative connections, coming from other direction + * Caller must switch src/dest to use it properly (otherwise it'll find your connection!) + */ +bdConnection *bdNode::findSimilarConnection(bdNodeId *srcId, bdNodeId *destId) +{ + std::map::iterator it; + for(it = mConnections.begin(); it != mConnections.end(); it++) + { + if ((it->first.srcId == *srcId) && (it->first.destId == *destId)) + { + /* found similar connection */ + return &(it->second); + } + } + return NULL; +} bdConnection *bdNode::findExistingConnection(bdNodeId *srcId, bdNodeId *proxyId, bdNodeId *destId) { @@ -872,6 +988,22 @@ int bdNode::recvedConnectionRequest(bdId *id, bdId *srcConnAddr, bdId *destConnA return 0; } + /* Switch the order of peers around to test for "opposite connections" */ + if (NULL != findSimilarConnection(&(destConnAddr->id), &(srcConnAddr->id))) + { + std::cerr << "bdNode::recvedConnectionRequest() Found Similar Connection. Replying NO"; + std::cerr << std::endl; + + /* reply existing connection */ + bdToken transId; + genNewTransId(&transId); + + int status = BITDHT_CONNECT_ANSWER_NOK; + int msgtype = BITDHT_MSG_TYPE_CONNECT_REPLY; + msgout_connect_genmsg(id, &transId, msgtype, srcConnAddr, destConnAddr, mode, status); + return 0; + } + /* INSTALL a NEW CONNECTION */ conn = bdNode::newConnectionBySender(id, srcConnAddr, destConnAddr); @@ -994,9 +1126,11 @@ int bdNode::recvedConnectionReply(bdId *id, bdId *srcConnAddr, bdId *destConnAdd conn->mMode, conn->mPoint, BITDHT_CONNECT_CB_FAILED); /* send on message to SRC */ - bdToken *transId; + bdToken transId; + genNewTransId(&transId); + int msgtype = BITDHT_MSG_TYPE_CONNECT_REPLY; - msgout_connect_genmsg(&(conn->mSrcId), transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), mode, status); + msgout_connect_genmsg(&(conn->mSrcId), &transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), mode, status); /* connection is killed */ cleanConnectionBySender(id, srcConnAddr, destConnAddr); @@ -1018,6 +1152,9 @@ int bdNode::recvedConnectionReply(bdId *id, bdId *srcConnAddr, bdId *destConnAdd int bdNode::recvedConnectionStart(bdId *id, bdId *srcConnAddr, bdId *destConnAddr, int mode, int bandwidth) { + std::cerr << "bdNode::recvedConnectionStart()"; + std::cerr << std::endl; + /* retrieve existing connection data */ bdConnection *conn = findExistingConnectionBySender(id, srcConnAddr, destConnAddr); if (!conn) @@ -1033,29 +1170,40 @@ int bdNode::recvedConnectionStart(bdId *id, bdId *srcConnAddr, bdId *destConnAdd if (conn->mPoint == BD_PROXY_CONNECTION_MID_POINT) { + std::cerr << "bdNode::recvedConnectionStart() ERROR We Are Connection MID Point"; + std::cerr << std::endl; /* ERROR */ } /* check state */ - if ((conn->mState != BITDHT_CONNECTION_WAITING_START) || (conn->mState != BITDHT_CONNECTION_COMPLETED)) + if ((conn->mState != BITDHT_CONNECTION_WAITING_START) && (conn->mState != BITDHT_CONNECTION_COMPLETED)) { /* ERROR */ + std::cerr << "bdNode::recvedConnectionStart() ERROR State != WAITING_START && != COMPLETED"; + std::cerr << std::endl; return 0; } /* ALL Okay, Send ACK */ + std::cerr << "bdNode::recvedConnectionStart() Passed basic tests, Okay to send ACK"; + std::cerr << std::endl; + + bdToken transId; + genNewTransId(&transId); - bdToken *transId; int msgtype = BITDHT_MSG_TYPE_CONNECT_ACK; int status = BITDHT_CONNECT_ANSWER_OKAY; - msgout_connect_genmsg(id, transId, msgtype, &(conn->mSrcId), &(conn->mDestId), mode, status); + msgout_connect_genmsg(id, &transId, msgtype, &(conn->mSrcId), &(conn->mDestId), mode, status); /* do complete Callback */ /* flag as completed */ if (conn->mState != BITDHT_CONNECTION_COMPLETED) { + std::cerr << "bdNode::recvedConnectionStart() Switching State to COMPLETED, doing callback"; + std::cerr << std::endl; + /* Store Final Addresses */ time_t now = time(NULL); @@ -1081,6 +1229,11 @@ int bdNode::recvedConnectionStart(bdId *id, bdId *srcConnAddr, bdId *destConnAdd } } + else + { + std::cerr << "bdNode::recvedConnectionStart() Just sent duplicate ACK"; + std::cerr << std::endl; + } /* don't delete, if ACK is lost, we want to be able to re-respond */ return 1; @@ -1358,19 +1511,71 @@ int bdConnection::upgradeProxyConnectionToFinish(bdId *id, bdId *srcConnAddr, bd int bdConnectionRequest::setupDirectConnection(struct sockaddr_in *laddr, bdNodeId *target) { - return 0; + mState = BITDHT_CONNREQUEST_INIT; + mStateTS = time(NULL); + mTarget = *target; + mLocalAddr = *laddr; + mMode = BITDHT_CONNECT_MODE_DIRECT; + + return 1; } int bdConnectionRequest::setupProxyConnection(struct sockaddr_in *laddr, bdNodeId *target, uint32_t mode) { - return 0; + mState = BITDHT_CONNREQUEST_INIT; + mStateTS = time(NULL); + mTarget = *target; + mLocalAddr = *laddr; + mMode = mode; + + return 1; } int bdConnectionRequest::addPotentialProxy(bdId *srcId) { + std::cerr << "bdConnectionRequest::addPotentialProxy() "; + bdStdPrintId(std::cerr, srcId); + std::cerr << std::endl; + + std::list::iterator it = std::find(mPeersTried.begin(), mPeersTried.end(), *srcId); + if (it == mPeersTried.end()) + { + it = std::find(mPotentialProxies.begin(), mPotentialProxies.end(), *srcId); + if (it == mPotentialProxies.end()) + { + mPotentialProxies.push_back(*srcId); + return 1; + } + else + { + std::cerr << "bdConnectionRequest::addPotentialProxy() Duplicate in mPotentialProxies List"; + std::cerr << std::endl; + } + } + else + { + std::cerr << "bdConnectionRequest::addPotentialProxy() Already tried this peer"; + std::cerr << std::endl; + } return 0; } +std::ostream &operator<<(std::ostream &out, const bdConnectionRequest &req) +{ + out << "bdConnectionRequest: "; + out << "State: " << req.mState; + out << std::endl; + out << "PotentialProxies:"; + out << std::endl; + std::list::const_iterator it; + for(it = req.mPotentialProxies.begin(); it != req.mPotentialProxies.end(); it++) + { + out << "\t"; + bdStdPrintId(out, &(*it)); + out << std::endl; + } + return out; +} diff --git a/libbitdht/src/bitdht/bdconnection.h b/libbitdht/src/bitdht/bdconnection.h index ff4667852..b7a6c7867 100644 --- a/libbitdht/src/bitdht/bdconnection.h +++ b/libbitdht/src/bitdht/bdconnection.h @@ -38,6 +38,8 @@ #define BITDHT_CONNREQUEST_INPROGRESS 2 #define BITDHT_CONNREQUEST_DONE 3 +#define BITDHT_CONNREQUEST_MAX_AGE 60 + #define BITDHT_CONNECTION_WAITING_AUTH 1 #define BITDHT_CONNECTION_WAITING_REPLY 2 @@ -132,13 +134,20 @@ class bdConnectionRequest int addPotentialProxy(bdId *srcId); - std::list mPotentialProxies; + bdNodeId mTarget; + struct sockaddr_in mLocalAddr; + int mMode; int mState; + time_t mStateTS; - int stuff; + std::list mPotentialProxies; + + bdId mCurrentAttempt; + std::list mPeersTried; }; +std::ostream &operator<<(std::ostream &out, const bdConnectionRequest &req); #endif diff --git a/libbitdht/src/bitdht/bdmanager.cc b/libbitdht/src/bitdht/bdmanager.cc index bcb76cfdf..f2f511260 100644 --- a/libbitdht/src/bitdht/bdmanager.cc +++ b/libbitdht/src/bitdht/bdmanager.cc @@ -257,9 +257,11 @@ void bdNodeManager::iteration() * if, after 60 secs, we haven't reached MIN_OP_SPACE_SIZE, restart.... */ -#define MAX_FINDSELF_TIME 60 #define TRANSITION_OP_SPACE_SIZE 100 /* 1 query / sec, should take 12-15 secs */ -#define MIN_OP_SPACE_SIZE 20 +//#define MAX_FINDSELF_TIME 60 +//#define MIN_OP_SPACE_SIZE 20 +#define MAX_FINDSELF_TIME 10 +#define MIN_OP_SPACE_SIZE 3 // for testing. { uint32_t nodeSpaceSize = mNodeSpace.calcSpaceSize(); @@ -424,9 +426,9 @@ void bdNodeManager::QueryRandomLocalNet() int bdNodeManager::status() { /* do status of bdNode */ -//#ifdef DEBUG_MGR +#ifdef DEBUG_MGR printState(); -//#endif +#endif checkStatus(); @@ -435,10 +437,10 @@ int bdNodeManager::status() mBdNetworkSize = mNodeSpace.calcNetworkSizeWithFlag( BITDHT_PEER_STATUS_DHT_APPL); -//#ifdef DEBUG_MGR +#ifdef DEBUG_MGR std::cerr << "BitDHT NetworkSize: " << mNetworkSize << std::endl; std::cerr << "BitDHT App NetworkSize: " << mBdNetworkSize << std::endl; -//#endif +#endif return 1; } @@ -1050,11 +1052,17 @@ int bdDebugCallback::dhtValueCallback(const bdNodeId *id, std::string key, uint3 void bdNodeManager::ConnectionRequest(struct sockaddr_in *laddr, bdNodeId *target, uint32_t mode) { + std::cerr << "bdNodeManager::ConnectionRequest()"; + std::cerr << std::endl; + bdNode::requestConnection(laddr, target, mode); } void bdNodeManager::ConnectionAuth(bdId *srcId, bdId *proxyId, bdId *destId, uint32_t mode, uint32_t loc, uint32_t answer) { + std::cerr << "bdNodeManager::ConnectionAuth()"; + std::cerr << std::endl; + if (answer) { AuthConnectionOk(srcId, proxyId, destId, mode, loc); diff --git a/libbitdht/src/bitdht/bdmsgs.cc b/libbitdht/src/bitdht/bdmsgs.cc index adee2062b..a11dead46 100644 --- a/libbitdht/src/bitdht/bdmsgs.cc +++ b/libbitdht/src/bitdht/bdmsgs.cc @@ -576,6 +576,13 @@ uint32_t beMsgType(be_node *n) #endif return BITDHT_MSG_TYPE_POST_HASH; } + else if (beMsgMatchString(query, "connect", 7)) + { +#ifdef DEBUG_MSG_TYPE + std::cerr << "bsMsgType() QUERY:connect MSG TYPE" << std::endl; +#endif + return BITDHT_MSG_TYPE_CONNECT; + } #ifdef DEBUG_MSG_TYPE std::cerr << "bsMsgType() QUERY:UNKNOWN MSG TYPE, dumping dict" << std::endl; /* dump answer */ @@ -735,6 +742,28 @@ int beMsgGetListBdIds(be_node *n, std::list &nodes) return 1; } +int beMsgGetBdId(be_node *n, bdId &id) +{ + /* extract the string pointer, and size */ + /* split into parts */ + + if (n->type != BE_STR) + { + return 0; + } + + int len = be_str_len(n); + if (len < BITDHT_COMPACTNODEID_LEN) + { + return 0; + } + if (decodeCompactNodeId(&id, n->val.s, BITDHT_COMPACTNODEID_LEN)) + { + return 1; + } + return 0; +} + std::string encodeCompactNodeId(bdId *id) { std::string enc; @@ -953,31 +982,30 @@ int bitdht_connect_genmsg(bdToken *tid, bdNodeId *id, int msgtype, bdId *src, bd be_node *destnode = be_create_str_wlen(destEnc.c_str(), BITDHT_COMPACTNODEID_LEN); be_node *typenode = be_create_int(msgtype); be_node *statusnode = be_create_int(status); + be_node *modenode = be_create_int(mode); be_node *tidnode = be_create_str_wlen((char *) tid->data, tid->len); be_node *yqrnode = be_create_str("q"); be_node *cmdnode = be_create_str("connect"); - -#define CONNECT_MODE_DIRECT 0x0001 -#define CONNECT_MODE_PROXY 0x0002 -#define CONNECT_MODE_RELAY 0x0004 - + +#if 0 be_node *modenode = NULL; switch(mode) { - case CONNECT_MODE_DIRECT: + case BITDHT_CONNECT_MODE_DIRECT: modenode = be_create_str("d"); break; - case CONNECT_MODE_PROXY: + case BITDHT_CONNECT_MODE_PROXY: modenode = be_create_str("p"); break; - case CONNECT_MODE_RELAY: + case BITDHT_CONNECT_MODE_RELAY: modenode = be_create_str("r"); break; default: modenode = be_create_str("u"); break; } +#endif be_add_keypair(iddict, "id", idnode); be_add_keypair(iddict, "src", srcnode); diff --git a/libbitdht/src/bitdht/bdmsgs.h b/libbitdht/src/bitdht/bdmsgs.h index fac17a0b2..738f0075d 100644 --- a/libbitdht/src/bitdht/bdmsgs.h +++ b/libbitdht/src/bitdht/bdmsgs.h @@ -45,7 +45,11 @@ #define BITDHT_MSG_TYPE_POST_HASH 8 #define BITDHT_MSG_TYPE_REPLY_POST 9 + + // THESE ARE EXTENSIONS +#define BITDHT_MSG_TYPE_CONNECT 20 + // CONNECTIONS. #define BITDHT_MSG_TYPE_CONNECT_REQUEST 101 #define BITDHT_MSG_TYPE_CONNECT_REPLY 102 @@ -108,6 +112,7 @@ be_node *makeCompactNodeIdString(std::list &nodes); int beMsgGetToken(be_node *n, bdToken &token); int beMsgGetNodeId(be_node *n, bdNodeId &nodeId); +int beMsgGetBdId(be_node *n, bdId &id); int beMsgGetListBdIds(be_node *n, std::list &nodes); int beMsgGetListStrings(be_node *n, std::list &values); diff --git a/libbitdht/src/bitdht/bdnode.cc b/libbitdht/src/bitdht/bdnode.cc index 0a359f9e4..5e377407d 100644 --- a/libbitdht/src/bitdht/bdnode.cc +++ b/libbitdht/src/bitdht/bdnode.cc @@ -363,6 +363,11 @@ void bdNode::iteration() //msgout_find_node(&id, &transId, &(id.id)); } + + // Handle Connection loops. + tickConnections(); + + doStats(); //printStats(std::cerr); @@ -473,19 +478,15 @@ void bdNode::printStats(std::ostream &out) out << std::endl; out << " mLpfConnectRequest/sec : " << std::setw(10) << mLpfConnectRequest; - out << std::endl; - out << " mLpfConnectReply/sec : " << std::setw(10) << mLpfConnectReply; - out << std::endl; - out << " mLpfConnectStart/sec : " << std::setw(10) << mLpfConnectStart; - out << std::endl; - out << " mLpfConnectAck/sec : " << std::setw(10) << mLpfConnectAck; - out << std::endl; out << " mLpfRecvConnectReq/sec : " << std::setw(10) << mLpfRecvConnectRequest; out << std::endl; + out << " mLpfConnectReply/sec : " << std::setw(10) << mLpfConnectReply; out << " mLpfRecvConnReply/sec : " << std::setw(10) << mLpfRecvConnectReply; out << std::endl; + out << " mLpfConnectStart/sec : " << std::setw(10) << mLpfConnectStart; out << " mLpfRecvConnStart/sec : " << std::setw(10) << mLpfRecvConnectStart; out << std::endl; + out << " mLpfConnectAck/sec : " << std::setw(10) << mLpfConnectAck; out << " mLpfRecvConnectAck/sec : " << std::setw(10) << mLpfRecvConnectAck; out << std::endl; out << std::endl; @@ -1365,6 +1366,108 @@ void bdNode::recvPkt(char *msg, int len, struct sockaddr_in addr) beMsgGetUInt32(be_port, &port); } + /****************** handle Connect (lots) ***************************/ + bdId connSrcAddr; + bdId connDestAddr; + uint32_t connMode; + uint32_t connStatus; + uint32_t connType; + + be_node *be_ConnSrcAddr = NULL; + be_node *be_ConnDestAddr = NULL; + be_node *be_ConnMode = NULL; + be_node *be_ConnStatus = NULL; + be_node *be_ConnType = NULL; + if (beType == BITDHT_MSG_TYPE_CONNECT) + { + /* SrcAddr */ + be_ConnSrcAddr = beMsgGetDictNode(be_data, "src"); + if (!be_ConnSrcAddr) + { +#ifdef DEBUG_NODE_PARSE + std::cerr << "bdNode::recvPkt() CONNECT Missing SrcAddr. Dropping Msg"; + std::cerr << std::endl; +#endif + be_free(node); + return; + } + + /* DestAddr */ + be_ConnDestAddr = beMsgGetDictNode(be_data, "dest"); + if (!be_ConnDestAddr) + { +#ifdef DEBUG_NODE_PARSE + std::cerr << "bdNode::recvPkt() CONNECT Missing DestAddr. Dropping Msg"; + std::cerr << std::endl; +#endif + be_free(node); + return; + } + + /* Mode */ + be_ConnMode = beMsgGetDictNode(be_data, "mode"); + if (!be_ConnMode) + { +#ifdef DEBUG_NODE_PARSE + std::cerr << "bdNode::recvPkt() CONNECT Missing Mode. Dropping Msg"; + std::cerr << std::endl; +#endif + be_free(node); + return; + } + + /* Status */ + be_ConnStatus = beMsgGetDictNode(be_data, "status"); + if (!be_ConnStatus) + { +#ifdef DEBUG_NODE_PARSE + std::cerr << "bdNode::recvPkt() CONNECT Missing Status. Dropping Msg"; + std::cerr << std::endl; +#endif + be_free(node); + return; + } + + /* Type */ + be_ConnType = beMsgGetDictNode(be_data, "type"); + if (!be_ConnType) + { +#ifdef DEBUG_NODE_PARSE + std::cerr << "bdNode::recvPkt() CONNECT Missing Type. Dropping Msg"; + std::cerr << std::endl; +#endif + be_free(node); + return; + } + } + + if (be_ConnSrcAddr) + { + beMsgGetBdId(be_ConnSrcAddr, connSrcAddr); + } + + if (be_ConnDestAddr) + { + beMsgGetBdId(be_ConnDestAddr, connDestAddr); + } + + if (be_ConnMode) + { + beMsgGetUInt32(be_ConnMode, &connMode); + } + + if (be_ConnStatus) + { + beMsgGetUInt32(be_ConnStatus, &connStatus); + } + + if (be_ConnType) + { + beMsgGetUInt32(be_ConnType, &connType); + } + + + /****************** Bits Parsed Ok. Process Msg ***********************/ /* Construct Source Id */ bdId srcId(id, addr); @@ -1477,6 +1580,18 @@ void bdNode::recvPkt(char *msg, int len, struct sockaddr_in addr) msgin_reply_post(&srcId, &transId); break; } + case BITDHT_MSG_TYPE_CONNECT: /* a: id, src, dest, mode, status, type */ + { +//#ifdef DEBUG_NODE_MSGS + std::cerr << "bdNode::recvPkt() ConnectMsg from: "; + mFns->bdPrintId(std::cerr, &srcId); + std::cerr << std::endl; +//#endif + msgin_connect_genmsg(&srcId, &transId, connType, + &connSrcAddr, &connDestAddr, + connMode, connStatus); + break; + } default: { #ifdef DEBUG_NODE_MSGS diff --git a/libbitdht/src/bitdht/bdnode.h b/libbitdht/src/bitdht/bdnode.h index 96f871f26..d2522ec2a 100644 --- a/libbitdht/src/bitdht/bdnode.h +++ b/libbitdht/src/bitdht/bdnode.h @@ -199,7 +199,7 @@ void recvPkt(char *msg, int len, struct sockaddr_in addr); void msgout_connect_genmsg(bdId *id, bdToken *transId, int msgtype, bdId *srcAddr, bdId *destAddr, int mode, int status); - void msgin_connect_genmsg(bdId *id, bdToken *transId, int type, + void msgin_connect_genmsg(bdId *id, bdToken *transId, int msgtype, bdId *srcAddr, bdId *destAddr, int mode, int status); /* Connections: Initiation */ @@ -210,7 +210,10 @@ void recvPkt(char *msg, int len, struct sockaddr_in addr); int checkExistingConnectionAttempt(bdNodeId *target); void addPotentialConnectionProxy(bdId *srcId, bdId *target); + + int tickConnections(); void iterateConnectionRequests(); + int startConnectionAttempt(bdConnectionRequest *req); /* Connections: Outgoing */ @@ -229,6 +232,7 @@ void recvPkt(char *msg, int len, struct sockaddr_in addr); int determinePosition(bdNodeId *sender, bdNodeId *src, bdNodeId *dest); int determineProxyId(bdNodeId *sender, bdNodeId *src, bdNodeId *dest, bdNodeId *proxyId); + bdConnection *findSimilarConnection(bdNodeId *srcId, bdNodeId *destId); bdConnection *findExistingConnectionBySender(bdId *sender, bdId *src, bdId *dest); bdConnection *newConnectionBySender(bdId *sender, bdId *src, bdId *dest); int cleanConnectionBySender(bdId *sender, bdId *src, bdId *dest); diff --git a/libbitdht/src/bitdht/bdobj.cc b/libbitdht/src/bitdht/bdobj.cc index c9c6925d9..df22686cc 100644 --- a/libbitdht/src/bitdht/bdobj.cc +++ b/libbitdht/src/bitdht/bdobj.cc @@ -29,7 +29,8 @@ void bdPrintTransId(std::ostream &out, bdToken *transId) { - out << transId->data; + //out << transId->data; + bdPrintToken(out, transId); return; } diff --git a/libbitdht/src/udp/udplayer.cc b/libbitdht/src/udp/udplayer.cc index d21ca078c..66e4e0691 100644 --- a/libbitdht/src/udp/udplayer.cc +++ b/libbitdht/src/udp/udplayer.cc @@ -525,28 +525,27 @@ LossyUdpLayer::~LossyUdpLayer() { return; } int LossyUdpLayer::receiveUdpPacket(void *data, int *size, struct sockaddr_in &from) { - double prob = (1.0 * (rand() / (RAND_MAX + 1.0))); - - if (prob < lossFraction) + if (0 < UdpLayer::receiveUdpPacket(data, size, from)) { - /* but discard */ - if (0 < UdpLayer::receiveUdpPacket(data, size, from)) + double prob = (1.0 * (rand() / (RAND_MAX + 1.0))); + + if (prob < lossFraction) { + /* discard */ std::cerr << "LossyUdpLayer::receiveUdpPacket() Dropping packet!"; std::cerr << std::endl; std::cerr << printPkt(data, *size); std::cerr << std::endl; std::cerr << "LossyUdpLayer::receiveUdpPacket() Packet Dropped!"; std::cerr << std::endl; + + size = 0; + return -1; } - size = 0; - return -1; - + return *size; } - - // otherwise read normally; - return UdpLayer::receiveUdpPacket(data, size, from); + return -1; } int LossyUdpLayer::sendUdpPacket(const void *data, int size, struct sockaddr_in &to) @@ -571,3 +570,99 @@ int LossyUdpLayer::sendUdpPacket(const void *data, int size, struct sockaddr_in return UdpLayer::sendUdpPacket(data, size, to); } +/**************************** LossyUdpLayer - for Testing **************/ + +PortRange::PortRange() :lport(0), uport(0) { return; } +PortRange::PortRange(uint16_t lp, uint16_t up) :lport(lp), uport(up) { return; } + +bool PortRange::inRange(uint16_t port) +{ + if (port < lport) + { + return false; + } + + if (port > uport) + { + return false; + } + return true; +} + + + +RestrictedUdpLayer::RestrictedUdpLayer(UdpReceiver *udpr, + struct sockaddr_in &local) + :UdpLayer(udpr, local) +{ + return; +} +RestrictedUdpLayer::~RestrictedUdpLayer() { return; } + +void RestrictedUdpLayer::addRestrictedPortRange(int lp, int up) +{ + PortRange pr(lp, up); + mLostPorts.push_back(pr); +} + +int RestrictedUdpLayer::receiveUdpPacket(void *data, int *size, struct sockaddr_in &from) +{ + if (0 < UdpLayer::receiveUdpPacket(data, size, from)) + { + /* check the port against list */ + uint16_t inPort = ntohs(from.sin_port); + + std::list::iterator it; + for(it = mLostPorts.begin(); it != mLostPorts.end(); it++) + { + if (it->inRange(inPort)) + { +#ifdef DEBUG_UDP_LAYER + std::cerr << "RestrictedUdpLayer::receiveUdpPacket() Dropping packet"; + std::cerr << ", Port(" << inPort << ") in restricted range!"; + std::cerr << std::endl; + //std::cerr << printPkt(data, *size); + //std::cerr << std::endl; +#endif + + size = 0; + return -1; + + } + + } + /* acceptable port */ + return *size; + } + return -1; +} + +int RestrictedUdpLayer::sendUdpPacket(const void *data, int size, struct sockaddr_in &to) +{ + /* check the port against list */ + uint16_t outPort = ntohs(to.sin_port); + + std::list::iterator it; + for(it = mLostPorts.begin(); it != mLostPorts.end(); it++) + { + if (it->inRange(outPort)) + { + /* drop */ +#ifdef DEBUG_UDP_LAYER + std::cerr << "RestrictedUdpLayer::sendUdpPacket() Dropping packet"; + std::cerr << ", Port(" << outPort << ") in restricted range!"; + std::cerr << std::endl; + //std::cerr << printPkt(data, *size); + //std::cerr << std::endl; +#endif + + return size; + } + + + } + + // otherwise read normally; + return UdpLayer::sendUdpPacket(data, size, to); +} + diff --git a/libbitdht/src/udp/udplayer.h b/libbitdht/src/udp/udplayer.h index 8686940f1..21fcc5cf1 100644 --- a/libbitdht/src/udp/udplayer.h +++ b/libbitdht/src/udp/udplayer.h @@ -134,5 +134,35 @@ virtual int sendUdpPacket(const void *data, int size, struct sockaddr_in &to); double lossFraction; }; +class PortRange +{ + public: + PortRange(); + PortRange(uint16_t lp, uint16_t up); + + bool inRange(uint16_t port); + + uint16_t lport; + uint16_t uport; +}; + + +/* For Testing - drops packets */ +class RestrictedUdpLayer: public UdpLayer +{ + public: + RestrictedUdpLayer(UdpReceiver *udpr, struct sockaddr_in &local); +virtual ~RestrictedUdpLayer(); + +void addRestrictedPortRange(int lp, int up); + + protected: + +virtual int receiveUdpPacket(void *data, int *size, struct sockaddr_in &from); +virtual int sendUdpPacket(const void *data, int size, struct sockaddr_in &to); + + std::list mLostPorts; +}; + #endif diff --git a/libbitdht/src/udp/udpstack.cc b/libbitdht/src/udp/udpstack.cc index 98af8d3db..2dc5b1675 100644 --- a/libbitdht/src/udp/udpstack.cc +++ b/libbitdht/src/udp/udpstack.cc @@ -47,6 +47,34 @@ UdpStack::UdpStack(struct sockaddr_in &local) return; } +UdpStack::UdpStack(int testmode, struct sockaddr_in &local) + :udpLayer(NULL), laddr(local) +{ + std::cerr << "UdpStack::UdpStack() Evoked in TestMode" << std::endl; + if (testmode == UDP_TEST_LOSSY_LAYER) + { + std::cerr << "UdpStack::UdpStack() Installing LossyUdpLayer" << std::endl; + udpLayer = new LossyUdpLayer(this, laddr, UDP_TEST_LOSSY_FRAC); + } + else if (testmode == UDP_TEST_RESTRICTED_LAYER) + { + std::cerr << "UdpStack::UdpStack() Installing RestrictedUdpLayer" << std::endl; + udpLayer = new RestrictedUdpLayer(this, laddr); + } + else + { + std::cerr << "UdpStack::UdpStack() Installing Standard UdpLayer" << std::endl; + // standard layer + openSocket(); + } + return; +} + +UdpLayer *UdpStack::getUdpLayer() /* for testing only */ +{ + return udpLayer; +} + bool UdpStack::resetAddress(struct sockaddr_in &local) { std::cerr << "UdpStack::resetAddress(" << local << ")"; diff --git a/libbitdht/src/udp/udpstack.h b/libbitdht/src/udp/udpstack.h index 0408bcbf7..7ce5d26b5 100644 --- a/libbitdht/src/udp/udpstack.h +++ b/libbitdht/src/udp/udpstack.h @@ -63,13 +63,21 @@ virtual int sendPkt(const void *data, int size, struct sockaddr_in &to, int ttl) }; +#define UDP_TEST_LOSSY_LAYER 1 +#define UDP_TEST_RESTRICTED_LAYER 2 + +#define UDP_TEST_LOSSY_FRAC (0.10) + class UdpStack: public UdpReceiver, public UdpPublisher { public: UdpStack(struct sockaddr_in &local); + UdpStack(int testmode, struct sockaddr_in &local); virtual ~UdpStack() { return; } +UdpLayer *getUdpLayer(); /* for testing only */ + bool resetAddress(struct sockaddr_in &local); diff --git a/libbitdht/src/util/bdnet.cc b/libbitdht/src/util/bdnet.cc index 1333b21b4..c855143e0 100644 --- a/libbitdht/src/util/bdnet.cc +++ b/libbitdht/src/util/bdnet.cc @@ -346,3 +346,10 @@ ssize_t bdnet_sendto(int s, const void *buf, size_t len, int flags, #endif /********************************** WINDOWS/UNIX SPECIFIC PART ******************/ + +void bdsockaddr_clear(struct sockaddr_in *addr) +{ + memset(addr, 0, sizeof(*addr)); +} + + diff --git a/libbitdht/src/util/bdnet.h b/libbitdht/src/util/bdnet.h index 684900310..2fc5f4a50 100644 --- a/libbitdht/src/util/bdnet.h +++ b/libbitdht/src/util/bdnet.h @@ -103,7 +103,7 @@ int bdnet_inet_aton(const char *name, struct in_addr *addr); /* check if we can modify the TTL on a UDP packet */ int bdnet_checkTTL(int fd); - +void bdsockaddr_clear(struct sockaddr_in *addr); /* Extra stuff to declare for windows error handling (mimics unix errno) */