Second stage of improved bitdht check-in.

* Added Port Restricted Udp Layer, to simulate firewalled peers locally.
 * Corrected LossyUdpLayer implementation as well (not tested).
 * added bdsockaddr_clear() function to bitdht::utils.
 * Added Parsing of incoming CONNECT msgs.
 * Corrected Connect stats.
 * added connection tick().
 * Fixed up basic Connection starting & Timeout cleanup.
 * Added lots of Connection Debug.
 * Added in missing transIds.
 * Added Check for "Similar" connections, to avoid duplicate attempts.
 * Filled in bdConnectionRequest functions.
 * Added beMsgGetBdId decode function.
 * Unified CONNECT_MODE flags.
 * Corrected bdPrintTransId bug (bad chars)
 * Tweaked bitdht startup parameters (NUM_PEERS & STARTUP TIME) for testing.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-peernet@4255 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2011-06-13 12:54:03 +00:00
parent 006a5f3011
commit 4032f8242c
14 changed files with 611 additions and 68 deletions

View File

@ -28,12 +28,16 @@
#include "bitdht/bdnode.h"
#include "bitdht/bdconnection.h"
#include "bitdht/bdmsgs.h"
#include "bitdht/bdstddht.h"
#include "util/bdnet.h"
#define DEBUG_NODE_CONNECTION 1
#if 0
#include "bitdht/bencode.h"
#include "bitdht/bdmsgs.h"
#include "util/bdnet.h"
#include <string.h>
#include <stdlib.h>
@ -73,7 +77,6 @@ std::string getConnectMsgType(int msgtype)
void bdNode::msgout_connect_genmsg(bdId *id, bdToken *transId, int msgtype, bdId *srcAddr, bdId *destAddr, int mode, int status)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_connect_genmsg() Type: " << getConnectMsgType(msgtype);
std::cerr << " TransId: ";
bdPrintTransId(std::cerr, transId);
@ -86,6 +89,7 @@ void bdNode::msgout_connect_genmsg(bdId *id, bdToken *transId, int msgtype, bdId
std::cerr << " Mode: " << mode;
std::cerr << " Status: " << status;
std::cerr << std::endl;
#ifdef DEBUG_NODE_MSGOUT
#endif
registerOutgoingMsg(id, transId, msgtype);
@ -105,7 +109,6 @@ void bdNode::msgin_connect_genmsg(bdId *id, bdToken *transId, int msgtype,
{
std::list<bdId>::iterator it;
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::msgin_connect_genmsg() Type: " << getConnectMsgType(msgtype);
std::cerr << " TransId: ";
bdPrintTransId(std::cerr, transId);
@ -118,6 +121,7 @@ void bdNode::msgin_connect_genmsg(bdId *id, bdToken *transId, int msgtype,
std::cerr << " Mode: " << mode;
std::cerr << " Status: " << status;
std::cerr << std::endl;
#ifdef DEBUG_NODE_MSGS
#else
(void) transId;
#endif
@ -179,16 +183,16 @@ void bdNode::msgin_connect_genmsg(bdId *id, bdToken *transId, int msgtype,
* 1) Direct Endpoint.
* 2) Using a Proxy.
*/
int bdNode::requestConnection(struct sockaddr_in *laddr, bdNodeId *target, uint32_t mode)
{
/* check if connection obj already exists */
#ifdef DEBUG_NODE_CONNECTION
std::cerr << "bdNode::requestConnection() Mode: " << mode;
std::cerr << " Target: ";
mFns->bdPrintNodeId(std::cerr, id);
std::cerr << " Local NetAddress: ";
mFns->bdPrintAddr(std::cerr, laddr);
mFns->bdPrintNodeId(std::cerr, target);
std::cerr << " Local NetAddress: " << inet_ntoa(laddr->sin_addr);
std::cerr << ":" << ntohs(laddr->sin_port);
std::cerr << std::endl;
#endif
@ -244,6 +248,10 @@ int bdNode::requestConnection_direct(struct sockaddr_in *laddr, bdNodeId *target
continue;
}
#ifdef DEBUG_NODE_CONNECTION
std::cerr << "bdNode::requestConnection_direct() Found Matching Query";
std::cerr << std::endl;
#endif
/* matching query */
/* find any potential proxies (must be same DHT type XXX TODO) */
(*qit)->result(connreq.mPotentialProxies);
@ -292,6 +300,10 @@ int bdNode::requestConnection_proxy(struct sockaddr_in *laddr, bdNodeId *target,
continue;
}
#ifdef DEBUG_NODE_CONNECTION
std::cerr << "bdNode::requestConnection_proxy() Found Matching Query";
std::cerr << std::endl;
#endif
/* matching query */
/* find any potential proxies (must be same DHT type XXX TODO) */
(*qit)->proxies(connreq.mPotentialProxies);
@ -350,8 +362,21 @@ void bdNode::addPotentialConnectionProxy(bdId *srcId, bdId *target)
}
}
int bdNode::tickConnections()
{
iterateConnectionRequests();
iterateConnections();
}
void bdNode::iterateConnectionRequests()
{
time_t now = time(NULL);
std::list<bdNodeId> eraseList;
std::list<bdNodeId>::iterator eit;
std::map<bdNodeId, bdConnectionRequest>::iterator it;
for(it = mConnectionRequests.begin(); it != mConnectionRequests.end(); it++)
{
@ -359,9 +384,68 @@ void bdNode::iterateConnectionRequests()
if (it->second.mState == BITDHT_CONNREQUEST_INIT)
{
/* kick off the connection if possible */
//startConnectionAttempt(it->second);
startConnectionAttempt(&(it->second));
}
// Cleanup
if (now - it->second.mStateTS > BITDHT_CONNREQUEST_MAX_AGE)
{
std::cerr << "bdNode::iterateConnectionAttempt() Cleaning Old ConnReq: ";
std::cerr << std::endl;
std::cerr << it->second;
std::cerr << std::endl;
/* cleanup */
eraseList.push_back(it->first);
}
}
for(eit = eraseList.begin(); eit != eraseList.end(); eit++)
{
it = mConnectionRequests.find(*eit);
if (it != mConnectionRequests.end())
{
mConnectionRequests.erase(it);
}
}
}
int bdNode::startConnectionAttempt(bdConnectionRequest *req)
{
std::cerr << "bdNode::startConnectionAttempt() ConnReq: ";
std::cerr << std::endl;
std::cerr << *req;
std::cerr << std::endl;
if (req->mPotentialProxies.size() < 1)
{
std::cerr << "bdNode::startConnectionAttempt() No Potential Proxies... delaying attempt";
std::cerr << std::endl;
return 0;
}
bdId proxyId;
bdId srcConnAddr;
bdId destConnAddr;
int mode = req->mMode;
destConnAddr.id = req->mTarget;
bdsockaddr_clear(&(destConnAddr.addr));
srcConnAddr.id = mOwnId;
srcConnAddr.addr = req->mLocalAddr;
proxyId = req->mPotentialProxies.front();
req->mPotentialProxies.pop_front();
req->mCurrentAttempt = proxyId;
req->mPeersTried.push_back(proxyId);
req->mState = BITDHT_CONNREQUEST_INPROGRESS;
req->mStateTS = time(NULL);
return startConnectionAttempt(&proxyId, &srcConnAddr, &destConnAddr, mode);
}
@ -407,11 +491,13 @@ int bdNode::startConnectionAttempt(bdId *proxyId, bdId *srcConnAddr, bdId *destC
conn->ConnectionSetup(proxyId, srcConnAddr, destConnAddr, mode);
/* push off message */
bdToken *transId;
bdToken transId;
genNewTransId(&transId);
int msgtype = BITDHT_MSG_TYPE_CONNECT_REQUEST;
int status = BITDHT_CONNECT_ANSWER_OKAY;
msgout_connect_genmsg(&(conn->mProxyId), transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), conn->mMode, status);
msgout_connect_genmsg(&(conn->mProxyId), &transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), conn->mMode, status);
return 1;
}
@ -475,10 +561,12 @@ void bdNode::AuthConnectionOk(bdId *srcId, bdId *proxyId, bdId *destId, int mode
conn->AuthoriseEndConnection(srcId, proxyId, destId, mode, loc);
/* we respond to the proxy which will finalise connection */
bdToken *transId;
bdToken transId;
genNewTransId(&transId);
int msgtype = BITDHT_MSG_TYPE_CONNECT_REPLY;
int status = BITDHT_CONNECT_ANSWER_OKAY;
msgout_connect_genmsg(&(conn->mProxyId), transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), conn->mMode, status);
msgout_connect_genmsg(&(conn->mProxyId), &transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), conn->mMode, status);
return;
}
@ -497,10 +585,12 @@ void bdNode::AuthConnectionOk(bdId *srcId, bdId *proxyId, bdId *destId, int mode
conn->AuthoriseProxyConnection(srcId, proxyId, destId, mode, loc);
bdToken *transId;
bdToken transId;
genNewTransId(&transId);
int msgtype = BITDHT_MSG_TYPE_CONNECT_REQUEST;
int status = BITDHT_CONNECT_ANSWER_OKAY;
msgout_connect_genmsg(&(conn->mDestId), transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), conn->mMode, status);
msgout_connect_genmsg(&(conn->mDestId), &transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), conn->mMode, status);
}
else
{
@ -536,10 +626,12 @@ void bdNode::AuthConnectionNo(bdId *srcId, bdId *proxyId, bdId *destId, int mode
if (mode == BITDHT_CONNECT_MODE_DIRECT)
{
/* we respond to the proxy which will finalise connection */
bdToken *transId;
bdToken transId;
genNewTransId(&transId);
int status = BITDHT_CONNECT_ANSWER_NOK;
int msgtype = BITDHT_MSG_TYPE_CONNECT_REPLY;
msgout_connect_genmsg(&(conn->mSrcId), transId, msgtype,
msgout_connect_genmsg(&(conn->mSrcId), &transId, msgtype,
&(conn->mSrcConnAddr), &(conn->mDestConnAddr), mode, status);
cleanConnection(&(srcId->id), &(proxyId->id), &(destId->id));
@ -549,10 +641,12 @@ void bdNode::AuthConnectionNo(bdId *srcId, bdId *proxyId, bdId *destId, int mode
if (loc == BD_PROXY_CONNECTION_END_POINT)
{
/* we respond to the proxy which will finalise connection */
bdToken *transId;
bdToken transId;
genNewTransId(&transId);
int status = BITDHT_CONNECT_ANSWER_NOK;
int msgtype = BITDHT_MSG_TYPE_CONNECT_REPLY;
msgout_connect_genmsg(&(conn->mProxyId), transId, msgtype,
msgout_connect_genmsg(&(conn->mProxyId), &transId, msgtype,
&(conn->mSrcConnAddr), &(conn->mDestConnAddr), mode, status);
cleanConnection(&(srcId->id), &(proxyId->id), &(destId->id));
@ -561,10 +655,12 @@ void bdNode::AuthConnectionNo(bdId *srcId, bdId *proxyId, bdId *destId, int mode
}
/* otherwise we are the proxy (for either), reply FAIL */
bdToken *transId;
bdToken transId;
genNewTransId(&transId);
int status = BITDHT_CONNECT_ANSWER_NOK;
int msgtype = BITDHT_MSG_TYPE_CONNECT_REPLY;
msgout_connect_genmsg(&(conn->mSrcId), transId, msgtype,
msgout_connect_genmsg(&(conn->mSrcId), &transId, msgtype,
&(conn->mSrcConnAddr), &(conn->mDestConnAddr), mode, status);
cleanConnection(&(srcId->id), &(proxyId->id), &(destId->id));
@ -609,17 +705,21 @@ void bdNode::iterateConnections()
it->second.mRetryCount++;
if (!it->second.mSrcAck)
{
bdToken *transId;
bdToken transId;
genNewTransId(&transId);
int msgtype = BITDHT_MSG_TYPE_CONNECT_START;
msgout_connect_genmsg(&(it->second.mSrcId), transId, msgtype,
msgout_connect_genmsg(&(it->second.mSrcId), &transId, msgtype,
&(it->second.mSrcConnAddr), &(it->second.mDestConnAddr),
it->second.mMode, it->second.mBandwidth);
}
if (!it->second.mDestAck)
{
bdToken *transId;
bdToken transId;
genNewTransId(&transId);
int msgtype = BITDHT_MSG_TYPE_CONNECT_START;
msgout_connect_genmsg(&(it->second.mDestId), transId, msgtype,
msgout_connect_genmsg(&(it->second.mDestId), &transId, msgtype,
&(it->second.mSrcConnAddr), &(it->second.mDestConnAddr),
it->second.mMode, it->second.mBandwidth);
}
@ -723,6 +823,22 @@ bdConnection::bdConnection()
//time_t mCompletedTS;
}
/* heavy check, used to check for alternative connections, coming from other direction
* Caller must switch src/dest to use it properly (otherwise it'll find your connection!)
*/
bdConnection *bdNode::findSimilarConnection(bdNodeId *srcId, bdNodeId *destId)
{
std::map<bdProxyTuple, bdConnection>::iterator it;
for(it = mConnections.begin(); it != mConnections.end(); it++)
{
if ((it->first.srcId == *srcId) && (it->first.destId == *destId))
{
/* found similar connection */
return &(it->second);
}
}
return NULL;
}
bdConnection *bdNode::findExistingConnection(bdNodeId *srcId, bdNodeId *proxyId, bdNodeId *destId)
{
@ -872,6 +988,22 @@ int bdNode::recvedConnectionRequest(bdId *id, bdId *srcConnAddr, bdId *destConnA
return 0;
}
/* Switch the order of peers around to test for "opposite connections" */
if (NULL != findSimilarConnection(&(destConnAddr->id), &(srcConnAddr->id)))
{
std::cerr << "bdNode::recvedConnectionRequest() Found Similar Connection. Replying NO";
std::cerr << std::endl;
/* reply existing connection */
bdToken transId;
genNewTransId(&transId);
int status = BITDHT_CONNECT_ANSWER_NOK;
int msgtype = BITDHT_MSG_TYPE_CONNECT_REPLY;
msgout_connect_genmsg(id, &transId, msgtype, srcConnAddr, destConnAddr, mode, status);
return 0;
}
/* INSTALL a NEW CONNECTION */
conn = bdNode::newConnectionBySender(id, srcConnAddr, destConnAddr);
@ -994,9 +1126,11 @@ int bdNode::recvedConnectionReply(bdId *id, bdId *srcConnAddr, bdId *destConnAdd
conn->mMode, conn->mPoint, BITDHT_CONNECT_CB_FAILED);
/* send on message to SRC */
bdToken *transId;
bdToken transId;
genNewTransId(&transId);
int msgtype = BITDHT_MSG_TYPE_CONNECT_REPLY;
msgout_connect_genmsg(&(conn->mSrcId), transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), mode, status);
msgout_connect_genmsg(&(conn->mSrcId), &transId, msgtype, &(conn->mSrcConnAddr), &(conn->mDestConnAddr), mode, status);
/* connection is killed */
cleanConnectionBySender(id, srcConnAddr, destConnAddr);
@ -1018,6 +1152,9 @@ int bdNode::recvedConnectionReply(bdId *id, bdId *srcConnAddr, bdId *destConnAdd
int bdNode::recvedConnectionStart(bdId *id, bdId *srcConnAddr, bdId *destConnAddr, int mode, int bandwidth)
{
std::cerr << "bdNode::recvedConnectionStart()";
std::cerr << std::endl;
/* retrieve existing connection data */
bdConnection *conn = findExistingConnectionBySender(id, srcConnAddr, destConnAddr);
if (!conn)
@ -1033,29 +1170,40 @@ int bdNode::recvedConnectionStart(bdId *id, bdId *srcConnAddr, bdId *destConnAdd
if (conn->mPoint == BD_PROXY_CONNECTION_MID_POINT)
{
std::cerr << "bdNode::recvedConnectionStart() ERROR We Are Connection MID Point";
std::cerr << std::endl;
/* ERROR */
}
/* check state */
if ((conn->mState != BITDHT_CONNECTION_WAITING_START) || (conn->mState != BITDHT_CONNECTION_COMPLETED))
if ((conn->mState != BITDHT_CONNECTION_WAITING_START) && (conn->mState != BITDHT_CONNECTION_COMPLETED))
{
/* ERROR */
std::cerr << "bdNode::recvedConnectionStart() ERROR State != WAITING_START && != COMPLETED";
std::cerr << std::endl;
return 0;
}
/* ALL Okay, Send ACK */
std::cerr << "bdNode::recvedConnectionStart() Passed basic tests, Okay to send ACK";
std::cerr << std::endl;
bdToken transId;
genNewTransId(&transId);
bdToken *transId;
int msgtype = BITDHT_MSG_TYPE_CONNECT_ACK;
int status = BITDHT_CONNECT_ANSWER_OKAY;
msgout_connect_genmsg(id, transId, msgtype, &(conn->mSrcId), &(conn->mDestId), mode, status);
msgout_connect_genmsg(id, &transId, msgtype, &(conn->mSrcId), &(conn->mDestId), mode, status);
/* do complete Callback */
/* flag as completed */
if (conn->mState != BITDHT_CONNECTION_COMPLETED)
{
std::cerr << "bdNode::recvedConnectionStart() Switching State to COMPLETED, doing callback";
std::cerr << std::endl;
/* Store Final Addresses */
time_t now = time(NULL);
@ -1081,6 +1229,11 @@ int bdNode::recvedConnectionStart(bdId *id, bdId *srcConnAddr, bdId *destConnAdd
}
}
else
{
std::cerr << "bdNode::recvedConnectionStart() Just sent duplicate ACK";
std::cerr << std::endl;
}
/* don't delete, if ACK is lost, we want to be able to re-respond */
return 1;
@ -1358,19 +1511,71 @@ int bdConnection::upgradeProxyConnectionToFinish(bdId *id, bdId *srcConnAddr, bd
int bdConnectionRequest::setupDirectConnection(struct sockaddr_in *laddr, bdNodeId *target)
{
return 0;
mState = BITDHT_CONNREQUEST_INIT;
mStateTS = time(NULL);
mTarget = *target;
mLocalAddr = *laddr;
mMode = BITDHT_CONNECT_MODE_DIRECT;
return 1;
}
int bdConnectionRequest::setupProxyConnection(struct sockaddr_in *laddr, bdNodeId *target, uint32_t mode)
{
return 0;
mState = BITDHT_CONNREQUEST_INIT;
mStateTS = time(NULL);
mTarget = *target;
mLocalAddr = *laddr;
mMode = mode;
return 1;
}
int bdConnectionRequest::addPotentialProxy(bdId *srcId)
{
std::cerr << "bdConnectionRequest::addPotentialProxy() ";
bdStdPrintId(std::cerr, srcId);
std::cerr << std::endl;
std::list<bdId>::iterator it = std::find(mPeersTried.begin(), mPeersTried.end(), *srcId);
if (it == mPeersTried.end())
{
it = std::find(mPotentialProxies.begin(), mPotentialProxies.end(), *srcId);
if (it == mPotentialProxies.end())
{
mPotentialProxies.push_back(*srcId);
return 1;
}
else
{
std::cerr << "bdConnectionRequest::addPotentialProxy() Duplicate in mPotentialProxies List";
std::cerr << std::endl;
}
}
else
{
std::cerr << "bdConnectionRequest::addPotentialProxy() Already tried this peer";
std::cerr << std::endl;
}
return 0;
}
std::ostream &operator<<(std::ostream &out, const bdConnectionRequest &req)
{
out << "bdConnectionRequest: ";
out << "State: " << req.mState;
out << std::endl;
out << "PotentialProxies:";
out << std::endl;
std::list<bdId>::const_iterator it;
for(it = req.mPotentialProxies.begin(); it != req.mPotentialProxies.end(); it++)
{
out << "\t";
bdStdPrintId(out, &(*it));
out << std::endl;
}
return out;
}

View File

@ -38,6 +38,8 @@
#define BITDHT_CONNREQUEST_INPROGRESS 2
#define BITDHT_CONNREQUEST_DONE 3
#define BITDHT_CONNREQUEST_MAX_AGE 60
#define BITDHT_CONNECTION_WAITING_AUTH 1
#define BITDHT_CONNECTION_WAITING_REPLY 2
@ -132,13 +134,20 @@ class bdConnectionRequest
int addPotentialProxy(bdId *srcId);
std::list<bdId> mPotentialProxies;
bdNodeId mTarget;
struct sockaddr_in mLocalAddr;
int mMode;
int mState;
time_t mStateTS;
int stuff;
std::list<bdId> mPotentialProxies;
bdId mCurrentAttempt;
std::list<bdId> mPeersTried;
};
std::ostream &operator<<(std::ostream &out, const bdConnectionRequest &req);
#endif

View File

@ -257,9 +257,11 @@ void bdNodeManager::iteration()
* if, after 60 secs, we haven't reached MIN_OP_SPACE_SIZE, restart....
*/
#define MAX_FINDSELF_TIME 60
#define TRANSITION_OP_SPACE_SIZE 100 /* 1 query / sec, should take 12-15 secs */
#define MIN_OP_SPACE_SIZE 20
//#define MAX_FINDSELF_TIME 60
//#define MIN_OP_SPACE_SIZE 20
#define MAX_FINDSELF_TIME 10
#define MIN_OP_SPACE_SIZE 3 // for testing.
{
uint32_t nodeSpaceSize = mNodeSpace.calcSpaceSize();
@ -424,9 +426,9 @@ void bdNodeManager::QueryRandomLocalNet()
int bdNodeManager::status()
{
/* do status of bdNode */
//#ifdef DEBUG_MGR
#ifdef DEBUG_MGR
printState();
//#endif
#endif
checkStatus();
@ -435,10 +437,10 @@ int bdNodeManager::status()
mBdNetworkSize = mNodeSpace.calcNetworkSizeWithFlag(
BITDHT_PEER_STATUS_DHT_APPL);
//#ifdef DEBUG_MGR
#ifdef DEBUG_MGR
std::cerr << "BitDHT NetworkSize: " << mNetworkSize << std::endl;
std::cerr << "BitDHT App NetworkSize: " << mBdNetworkSize << std::endl;
//#endif
#endif
return 1;
}
@ -1050,11 +1052,17 @@ int bdDebugCallback::dhtValueCallback(const bdNodeId *id, std::string key, uint3
void bdNodeManager::ConnectionRequest(struct sockaddr_in *laddr, bdNodeId *target, uint32_t mode)
{
std::cerr << "bdNodeManager::ConnectionRequest()";
std::cerr << std::endl;
bdNode::requestConnection(laddr, target, mode);
}
void bdNodeManager::ConnectionAuth(bdId *srcId, bdId *proxyId, bdId *destId, uint32_t mode, uint32_t loc, uint32_t answer)
{
std::cerr << "bdNodeManager::ConnectionAuth()";
std::cerr << std::endl;
if (answer)
{
AuthConnectionOk(srcId, proxyId, destId, mode, loc);

View File

@ -576,6 +576,13 @@ uint32_t beMsgType(be_node *n)
#endif
return BITDHT_MSG_TYPE_POST_HASH;
}
else if (beMsgMatchString(query, "connect", 7))
{
#ifdef DEBUG_MSG_TYPE
std::cerr << "bsMsgType() QUERY:connect MSG TYPE" << std::endl;
#endif
return BITDHT_MSG_TYPE_CONNECT;
}
#ifdef DEBUG_MSG_TYPE
std::cerr << "bsMsgType() QUERY:UNKNOWN MSG TYPE, dumping dict" << std::endl;
/* dump answer */
@ -735,6 +742,28 @@ int beMsgGetListBdIds(be_node *n, std::list<bdId> &nodes)
return 1;
}
int beMsgGetBdId(be_node *n, bdId &id)
{
/* extract the string pointer, and size */
/* split into parts */
if (n->type != BE_STR)
{
return 0;
}
int len = be_str_len(n);
if (len < BITDHT_COMPACTNODEID_LEN)
{
return 0;
}
if (decodeCompactNodeId(&id, n->val.s, BITDHT_COMPACTNODEID_LEN))
{
return 1;
}
return 0;
}
std::string encodeCompactNodeId(bdId *id)
{
std::string enc;
@ -953,31 +982,30 @@ int bitdht_connect_genmsg(bdToken *tid, bdNodeId *id, int msgtype, bdId *src, bd
be_node *destnode = be_create_str_wlen(destEnc.c_str(), BITDHT_COMPACTNODEID_LEN);
be_node *typenode = be_create_int(msgtype);
be_node *statusnode = be_create_int(status);
be_node *modenode = be_create_int(mode);
be_node *tidnode = be_create_str_wlen((char *) tid->data, tid->len);
be_node *yqrnode = be_create_str("q");
be_node *cmdnode = be_create_str("connect");
#define CONNECT_MODE_DIRECT 0x0001
#define CONNECT_MODE_PROXY 0x0002
#define CONNECT_MODE_RELAY 0x0004
#if 0
be_node *modenode = NULL;
switch(mode)
{
case CONNECT_MODE_DIRECT:
case BITDHT_CONNECT_MODE_DIRECT:
modenode = be_create_str("d");
break;
case CONNECT_MODE_PROXY:
case BITDHT_CONNECT_MODE_PROXY:
modenode = be_create_str("p");
break;
case CONNECT_MODE_RELAY:
case BITDHT_CONNECT_MODE_RELAY:
modenode = be_create_str("r");
break;
default:
modenode = be_create_str("u");
break;
}
#endif
be_add_keypair(iddict, "id", idnode);
be_add_keypair(iddict, "src", srcnode);

View File

@ -45,7 +45,11 @@
#define BITDHT_MSG_TYPE_POST_HASH 8
#define BITDHT_MSG_TYPE_REPLY_POST 9
// THESE ARE EXTENSIONS
#define BITDHT_MSG_TYPE_CONNECT 20
// CONNECTIONS.
#define BITDHT_MSG_TYPE_CONNECT_REQUEST 101
#define BITDHT_MSG_TYPE_CONNECT_REPLY 102
@ -108,6 +112,7 @@ be_node *makeCompactNodeIdString(std::list<bdId> &nodes);
int beMsgGetToken(be_node *n, bdToken &token);
int beMsgGetNodeId(be_node *n, bdNodeId &nodeId);
int beMsgGetBdId(be_node *n, bdId &id);
int beMsgGetListBdIds(be_node *n, std::list<bdId> &nodes);
int beMsgGetListStrings(be_node *n, std::list<std::string> &values);

View File

@ -363,6 +363,11 @@ void bdNode::iteration()
//msgout_find_node(&id, &transId, &(id.id));
}
// Handle Connection loops.
tickConnections();
doStats();
//printStats(std::cerr);
@ -473,19 +478,15 @@ void bdNode::printStats(std::ostream &out)
out << std::endl;
out << " mLpfConnectRequest/sec : " << std::setw(10) << mLpfConnectRequest;
out << std::endl;
out << " mLpfConnectReply/sec : " << std::setw(10) << mLpfConnectReply;
out << std::endl;
out << " mLpfConnectStart/sec : " << std::setw(10) << mLpfConnectStart;
out << std::endl;
out << " mLpfConnectAck/sec : " << std::setw(10) << mLpfConnectAck;
out << std::endl;
out << " mLpfRecvConnectReq/sec : " << std::setw(10) << mLpfRecvConnectRequest;
out << std::endl;
out << " mLpfConnectReply/sec : " << std::setw(10) << mLpfConnectReply;
out << " mLpfRecvConnReply/sec : " << std::setw(10) << mLpfRecvConnectReply;
out << std::endl;
out << " mLpfConnectStart/sec : " << std::setw(10) << mLpfConnectStart;
out << " mLpfRecvConnStart/sec : " << std::setw(10) << mLpfRecvConnectStart;
out << std::endl;
out << " mLpfConnectAck/sec : " << std::setw(10) << mLpfConnectAck;
out << " mLpfRecvConnectAck/sec : " << std::setw(10) << mLpfRecvConnectAck;
out << std::endl;
out << std::endl;
@ -1365,6 +1366,108 @@ void bdNode::recvPkt(char *msg, int len, struct sockaddr_in addr)
beMsgGetUInt32(be_port, &port);
}
/****************** handle Connect (lots) ***************************/
bdId connSrcAddr;
bdId connDestAddr;
uint32_t connMode;
uint32_t connStatus;
uint32_t connType;
be_node *be_ConnSrcAddr = NULL;
be_node *be_ConnDestAddr = NULL;
be_node *be_ConnMode = NULL;
be_node *be_ConnStatus = NULL;
be_node *be_ConnType = NULL;
if (beType == BITDHT_MSG_TYPE_CONNECT)
{
/* SrcAddr */
be_ConnSrcAddr = beMsgGetDictNode(be_data, "src");
if (!be_ConnSrcAddr)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() CONNECT Missing SrcAddr. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/* DestAddr */
be_ConnDestAddr = beMsgGetDictNode(be_data, "dest");
if (!be_ConnDestAddr)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() CONNECT Missing DestAddr. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/* Mode */
be_ConnMode = beMsgGetDictNode(be_data, "mode");
if (!be_ConnMode)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() CONNECT Missing Mode. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/* Status */
be_ConnStatus = beMsgGetDictNode(be_data, "status");
if (!be_ConnStatus)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() CONNECT Missing Status. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/* Type */
be_ConnType = beMsgGetDictNode(be_data, "type");
if (!be_ConnType)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() CONNECT Missing Type. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
}
if (be_ConnSrcAddr)
{
beMsgGetBdId(be_ConnSrcAddr, connSrcAddr);
}
if (be_ConnDestAddr)
{
beMsgGetBdId(be_ConnDestAddr, connDestAddr);
}
if (be_ConnMode)
{
beMsgGetUInt32(be_ConnMode, &connMode);
}
if (be_ConnStatus)
{
beMsgGetUInt32(be_ConnStatus, &connStatus);
}
if (be_ConnType)
{
beMsgGetUInt32(be_ConnType, &connType);
}
/****************** Bits Parsed Ok. Process Msg ***********************/
/* Construct Source Id */
bdId srcId(id, addr);
@ -1477,6 +1580,18 @@ void bdNode::recvPkt(char *msg, int len, struct sockaddr_in addr)
msgin_reply_post(&srcId, &transId);
break;
}
case BITDHT_MSG_TYPE_CONNECT: /* a: id, src, dest, mode, status, type */
{
//#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() ConnectMsg from: ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << std::endl;
//#endif
msgin_connect_genmsg(&srcId, &transId, connType,
&connSrcAddr, &connDestAddr,
connMode, connStatus);
break;
}
default:
{
#ifdef DEBUG_NODE_MSGS

View File

@ -199,7 +199,7 @@ void recvPkt(char *msg, int len, struct sockaddr_in addr);
void msgout_connect_genmsg(bdId *id, bdToken *transId, int msgtype,
bdId *srcAddr, bdId *destAddr, int mode, int status);
void msgin_connect_genmsg(bdId *id, bdToken *transId, int type,
void msgin_connect_genmsg(bdId *id, bdToken *transId, int msgtype,
bdId *srcAddr, bdId *destAddr, int mode, int status);
/* Connections: Initiation */
@ -210,7 +210,10 @@ void recvPkt(char *msg, int len, struct sockaddr_in addr);
int checkExistingConnectionAttempt(bdNodeId *target);
void addPotentialConnectionProxy(bdId *srcId, bdId *target);
int tickConnections();
void iterateConnectionRequests();
int startConnectionAttempt(bdConnectionRequest *req);
/* Connections: Outgoing */
@ -229,6 +232,7 @@ void recvPkt(char *msg, int len, struct sockaddr_in addr);
int determinePosition(bdNodeId *sender, bdNodeId *src, bdNodeId *dest);
int determineProxyId(bdNodeId *sender, bdNodeId *src, bdNodeId *dest, bdNodeId *proxyId);
bdConnection *findSimilarConnection(bdNodeId *srcId, bdNodeId *destId);
bdConnection *findExistingConnectionBySender(bdId *sender, bdId *src, bdId *dest);
bdConnection *newConnectionBySender(bdId *sender, bdId *src, bdId *dest);
int cleanConnectionBySender(bdId *sender, bdId *src, bdId *dest);

View File

@ -29,7 +29,8 @@
void bdPrintTransId(std::ostream &out, bdToken *transId)
{
out << transId->data;
//out << transId->data;
bdPrintToken(out, transId);
return;
}

View File

@ -525,28 +525,27 @@ LossyUdpLayer::~LossyUdpLayer() { return; }
int LossyUdpLayer::receiveUdpPacket(void *data, int *size, struct sockaddr_in &from)
{
double prob = (1.0 * (rand() / (RAND_MAX + 1.0)));
if (prob < lossFraction)
if (0 < UdpLayer::receiveUdpPacket(data, size, from))
{
/* but discard */
if (0 < UdpLayer::receiveUdpPacket(data, size, from))
double prob = (1.0 * (rand() / (RAND_MAX + 1.0)));
if (prob < lossFraction)
{
/* discard */
std::cerr << "LossyUdpLayer::receiveUdpPacket() Dropping packet!";
std::cerr << std::endl;
std::cerr << printPkt(data, *size);
std::cerr << std::endl;
std::cerr << "LossyUdpLayer::receiveUdpPacket() Packet Dropped!";
std::cerr << std::endl;
size = 0;
return -1;
}
size = 0;
return -1;
return *size;
}
// otherwise read normally;
return UdpLayer::receiveUdpPacket(data, size, from);
return -1;
}
int LossyUdpLayer::sendUdpPacket(const void *data, int size, struct sockaddr_in &to)
@ -571,3 +570,99 @@ int LossyUdpLayer::sendUdpPacket(const void *data, int size, struct sockaddr_in
return UdpLayer::sendUdpPacket(data, size, to);
}
/**************************** LossyUdpLayer - for Testing **************/
PortRange::PortRange() :lport(0), uport(0) { return; }
PortRange::PortRange(uint16_t lp, uint16_t up) :lport(lp), uport(up) { return; }
bool PortRange::inRange(uint16_t port)
{
if (port < lport)
{
return false;
}
if (port > uport)
{
return false;
}
return true;
}
RestrictedUdpLayer::RestrictedUdpLayer(UdpReceiver *udpr,
struct sockaddr_in &local)
:UdpLayer(udpr, local)
{
return;
}
RestrictedUdpLayer::~RestrictedUdpLayer() { return; }
void RestrictedUdpLayer::addRestrictedPortRange(int lp, int up)
{
PortRange pr(lp, up);
mLostPorts.push_back(pr);
}
int RestrictedUdpLayer::receiveUdpPacket(void *data, int *size, struct sockaddr_in &from)
{
if (0 < UdpLayer::receiveUdpPacket(data, size, from))
{
/* check the port against list */
uint16_t inPort = ntohs(from.sin_port);
std::list<PortRange>::iterator it;
for(it = mLostPorts.begin(); it != mLostPorts.end(); it++)
{
if (it->inRange(inPort))
{
#ifdef DEBUG_UDP_LAYER
std::cerr << "RestrictedUdpLayer::receiveUdpPacket() Dropping packet";
std::cerr << ", Port(" << inPort << ") in restricted range!";
std::cerr << std::endl;
//std::cerr << printPkt(data, *size);
//std::cerr << std::endl;
#endif
size = 0;
return -1;
}
}
/* acceptable port */
return *size;
}
return -1;
}
int RestrictedUdpLayer::sendUdpPacket(const void *data, int size, struct sockaddr_in &to)
{
/* check the port against list */
uint16_t outPort = ntohs(to.sin_port);
std::list<PortRange>::iterator it;
for(it = mLostPorts.begin(); it != mLostPorts.end(); it++)
{
if (it->inRange(outPort))
{
/* drop */
#ifdef DEBUG_UDP_LAYER
std::cerr << "RestrictedUdpLayer::sendUdpPacket() Dropping packet";
std::cerr << ", Port(" << outPort << ") in restricted range!";
std::cerr << std::endl;
//std::cerr << printPkt(data, *size);
//std::cerr << std::endl;
#endif
return size;
}
}
// otherwise read normally;
return UdpLayer::sendUdpPacket(data, size, to);
}

View File

@ -134,5 +134,35 @@ virtual int sendUdpPacket(const void *data, int size, struct sockaddr_in &to);
double lossFraction;
};
class PortRange
{
public:
PortRange();
PortRange(uint16_t lp, uint16_t up);
bool inRange(uint16_t port);
uint16_t lport;
uint16_t uport;
};
/* For Testing - drops packets */
class RestrictedUdpLayer: public UdpLayer
{
public:
RestrictedUdpLayer(UdpReceiver *udpr, struct sockaddr_in &local);
virtual ~RestrictedUdpLayer();
void addRestrictedPortRange(int lp, int up);
protected:
virtual int receiveUdpPacket(void *data, int *size, struct sockaddr_in &from);
virtual int sendUdpPacket(const void *data, int size, struct sockaddr_in &to);
std::list<PortRange> mLostPorts;
};
#endif

View File

@ -47,6 +47,34 @@ UdpStack::UdpStack(struct sockaddr_in &local)
return;
}
UdpStack::UdpStack(int testmode, struct sockaddr_in &local)
:udpLayer(NULL), laddr(local)
{
std::cerr << "UdpStack::UdpStack() Evoked in TestMode" << std::endl;
if (testmode == UDP_TEST_LOSSY_LAYER)
{
std::cerr << "UdpStack::UdpStack() Installing LossyUdpLayer" << std::endl;
udpLayer = new LossyUdpLayer(this, laddr, UDP_TEST_LOSSY_FRAC);
}
else if (testmode == UDP_TEST_RESTRICTED_LAYER)
{
std::cerr << "UdpStack::UdpStack() Installing RestrictedUdpLayer" << std::endl;
udpLayer = new RestrictedUdpLayer(this, laddr);
}
else
{
std::cerr << "UdpStack::UdpStack() Installing Standard UdpLayer" << std::endl;
// standard layer
openSocket();
}
return;
}
UdpLayer *UdpStack::getUdpLayer() /* for testing only */
{
return udpLayer;
}
bool UdpStack::resetAddress(struct sockaddr_in &local)
{
std::cerr << "UdpStack::resetAddress(" << local << ")";

View File

@ -63,13 +63,21 @@ virtual int sendPkt(const void *data, int size, struct sockaddr_in &to, int ttl)
};
#define UDP_TEST_LOSSY_LAYER 1
#define UDP_TEST_RESTRICTED_LAYER 2
#define UDP_TEST_LOSSY_FRAC (0.10)
class UdpStack: public UdpReceiver, public UdpPublisher
{
public:
UdpStack(struct sockaddr_in &local);
UdpStack(int testmode, struct sockaddr_in &local);
virtual ~UdpStack() { return; }
UdpLayer *getUdpLayer(); /* for testing only */
bool resetAddress(struct sockaddr_in &local);

View File

@ -346,3 +346,10 @@ ssize_t bdnet_sendto(int s, const void *buf, size_t len, int flags,
#endif
/********************************** WINDOWS/UNIX SPECIFIC PART ******************/
void bdsockaddr_clear(struct sockaddr_in *addr)
{
memset(addr, 0, sizeof(*addr));
}

View File

@ -103,7 +103,7 @@ int bdnet_inet_aton(const char *name, struct in_addr *addr);
/* check if we can modify the TTL on a UDP packet */
int bdnet_checkTTL(int fd);
void bdsockaddr_clear(struct sockaddr_in *addr);
/* Extra stuff to declare for windows error handling (mimics unix errno)
*/