RetroShare/libbitdht/src/bitdht/bdnode.cc
drbob d6938721b0 Reduced timeout for connections to 20 secs. because they should be quick.
Made the query more robust, by only saying a query is SUCCESSFUL if we have recvd a reply from peer.
NB: These changes are yet to be tested!



git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-netupgrade@4439 b45a01b8-16f6-495d-af2f-9b41ad6348cc
2011-07-13 11:37:47 +00:00

2070 lines
47 KiB
C++

/*
* bitdht/bdnode.cc
*
* BitDHT: An Flexible DHT library.
*
* Copyright 2010 by Robert Fernie
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 3 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "bitdht@lunamutt.com".
*
*/
#include "bitdht/bdnode.h"
#include "bitdht/bencode.h"
#include "bitdht/bdmsgs.h"
#include "bitdht/bdquerymgr.h"
#include "bitdht/bdfilter.h"
#include "util/bdnet.h"
#include "util/bdrandom.h"
#include <string.h>
#include <stdlib.h>
#include <iostream>
#include <iomanip>
#include <sstream>
#define BITDHT_QUERY_START_PEERS 10
#define BITDHT_QUERY_NEIGHBOUR_PEERS 8
#define BITDHT_MAX_REMOTE_QUERY_AGE 10
#define MAX_REMOTE_PROCESS_PER_CYCLE 5
/****
* #define USE_HISTORY 1
*
* #define DEBUG_NODE_MULTIPEER 1
* #define DEBUG_NODE_PARSE 1
* #define DEBUG_NODE_MSGS 1
* #define DEBUG_NODE_ACTIONS 1
* #define DEBUG_NODE_MSGIN 1
* #define DEBUG_NODE_MSGOUT 1
***/
//#define DEBUG_NODE_MSGS 1
bdNode::bdNode(bdNodeId *ownId, std::string dhtVersion, std::string bootfile, bdDhtFunctions *fns)
:mOwnId(*ownId), mNodeSpace(ownId, fns), mStore(bootfile, fns), mDhtVersion(dhtVersion), mFns(fns),
mQueryMgr(NULL), mConnMgr(NULL), mFilterPeers(NULL)
{
init(); /* (uses this pointers) stuff it - do it here! */
}
void bdNode::init()
{
mQueryMgr = new bdQueryManager(&mNodeSpace, mFns, this);
mConnMgr = new bdConnectManager(&mOwnId, &mNodeSpace, mQueryMgr, mFns, this);
std::list<bdFilteredPeer> emptyList;
mFilterPeers = new bdFilter(&mOwnId, emptyList, BITDHT_FILTER_REASON_OWNID, mFns);
//setNodeOptions(BITDHT_OPTIONS_MAINTAIN_UNSTABLE_PORT);
setNodeOptions(0);
}
#define ATTACH_NUMBER 5
void bdNode::setNodeOptions(uint32_t optFlags)
{
mNodeOptionFlags = optFlags;
if (optFlags & BITDHT_OPTIONS_MAINTAIN_UNSTABLE_PORT)
{
mNodeSpace.setAttachedFlag(BITDHT_PEER_STATUS_DHT_ENGINE, ATTACH_NUMBER);
}
else
{
mNodeSpace.setAttachedFlag(BITDHT_PEER_STATUS_DHT_ENGINE, 0);
}
}
void bdNode::getOwnId(bdNodeId *id)
{
*id = mOwnId;
}
/***** Startup / Shutdown ******/
void bdNode::restartNode()
{
mAccount.resetStats();
mStore.reloadFromStore();
/* setup */
bdPeer peer;
while(mStore.getPeer(&peer))
{
addPotentialPeer(&(peer.mPeerId), NULL);
}
}
void bdNode::shutdownNode()
{
/* clear the queries */
mQueryMgr->shutdownQueries();
mConnMgr->shutdownConnections();
mRemoteQueries.clear();
/* clear the space */
mNodeSpace.clear();
mHashSpace.clear();
/* clear other stuff */
mPotentialPeers.clear();
mStore.clear();
/* clean up any outgoing messages */
while(mOutgoingMsgs.size() > 0)
{
bdNodeNetMsg *msg = mOutgoingMsgs.front();
mOutgoingMsgs.pop_front();
/* cleanup message */
delete msg;
}
}
/* Crappy initial store... use bdspace as answer */
void bdNode::updateStore()
{
mStore.writeStore();
}
void bdNode::printState()
{
std::cerr << "bdNode::printState() for Peer: ";
mFns->bdPrintNodeId(std::cerr, &mOwnId);
std::cerr << std::endl;
mNodeSpace.printDHT();
mQueryMgr->printQueries();
mConnMgr->printConnections();
#ifdef USE_HISTORY
mHistory.printMsgs();
#endif
mAccount.printStats(std::cerr);
}
void bdNode::iterationOff()
{
/* clean up any incoming messages */
while(mIncomingMsgs.size() > 0)
{
bdNodeNetMsg *msg = mIncomingMsgs.front();
mIncomingMsgs.pop_front();
/* cleanup message */
delete msg;
}
}
void bdNode::iteration()
{
#ifdef DEBUG_NODE_MULTIPEER
std::cerr << "bdNode::iteration() of Peer: ";
mFns->bdPrintNodeId(std::cerr, &mOwnId);
std::cerr << std::endl;
#endif
/* iterate through queries */
bdId id;
bdNodeId targetNodeId;
std::list<bdQuery>::iterator it;
std::list<bdId>::iterator bit;
/* process incoming msgs */
while(mIncomingMsgs.size() > 0)
{
bdNodeNetMsg *msg = mIncomingMsgs.front();
mIncomingMsgs.pop_front();
recvPkt(msg->data, msg->mSize, msg->addr);
/* cleanup message */
delete msg;
}
/* assume that this is called once per second... limit the messages
* in theory, a query can generate up to 10 peers (which will all require a ping!).
* we want to handle all the pings we can... so we don't hold up the process.
* but we also want enough queries to keep things moving.
* so allow up to 90% of messages to be pings.
*
* ignore responses to other peers... as the number is very small generally
*/
#define BDNODE_MESSAGE_RATE_HIGH 1
#define BDNODE_MESSAGE_RATE_MED 2
#define BDNODE_MESSAGE_RATE_LOW 3
#define BDNODE_MESSAGE_RATE_TRICKLE 4
#define BDNODE_HIGH_MSG_RATE 100
#define BDNODE_MED_MSG_RATE 50
#define BDNODE_LOW_MSG_RATE 20
#define BDNODE_TRICKLE_MSG_RATE 5
int maxMsgs = BDNODE_MED_MSG_RATE;
int mAllowedMsgRate = BDNODE_MESSAGE_RATE_MED;
switch(mAllowedMsgRate)
{
case BDNODE_MESSAGE_RATE_HIGH:
maxMsgs = BDNODE_HIGH_MSG_RATE;
break;
case BDNODE_MESSAGE_RATE_MED:
maxMsgs = BDNODE_MED_MSG_RATE;
break;
case BDNODE_MESSAGE_RATE_LOW:
maxMsgs = BDNODE_LOW_MSG_RATE;
break;
case BDNODE_MESSAGE_RATE_TRICKLE:
maxMsgs = BDNODE_TRICKLE_MSG_RATE;
break;
default:
break;
}
int allowedPings = 0.9 * maxMsgs;
int sentMsgs = 0;
int sentPings = 0;
while((mPotentialPeers.size() > 0) && (sentMsgs < allowedPings))
{
/* check history ... is we have pinged them already...
* then simulate / pretend we have received a pong,
* and don't bother sending another ping.
*/
bdId pid = mPotentialPeers.front();
mPotentialPeers.pop_front();
/* don't send too many queries ... check history first */
#ifdef USE_HISTORY
if (mHistory.validPeer(&pid))
{
/* just add as peer */
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::iteration() Pinging Known Potential Peer : ";
mFns->bdPrintId(std::cerr, &pid);
std::cerr << std::endl;
#endif
}
#endif
/**** TEMP ****/
{
send_ping(&pid);
sentMsgs++;
sentPings++;
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::iteration() Pinging Potential Peer : ";
mFns->bdPrintId(std::cerr, &pid);
std::cerr << std::endl;
#endif
}
}
/* allow each query to send up to one query... until maxMsgs has been reached */
int sentQueries = mQueryMgr->iterateQueries(maxMsgs-sentMsgs);
sentMsgs += sentQueries;
#ifdef DEBUG_NODE_ACTIONS
std::cerr << "bdNode::iteration() maxMsgs: " << maxMsgs << " sentPings: " << sentPings;
std::cerr << " / " << allowedPings;
std::cerr << " sentQueries: " << sentQueries;
std::cerr << std::endl;
#endif
/* process remote query too */
processRemoteQuery();
std::list<bdId> peerIds;
std::list<bdId>::iterator oit;
mNodeSpace.scanOutOfDatePeers(peerIds);
for(oit = peerIds.begin(); oit != peerIds.end(); oit++)
{
send_ping(&(*oit));
mAccount.incCounter(BDACCOUNT_MSG_OUTOFDATEPING, true);
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::iteration() Pinging Out-Of-Date Peer: ";
mFns->bdPrintId(std::cerr, *oit);
std::cerr << std::endl;
#endif
}
// Handle Connection loops.
mConnMgr->tickConnections();
mAccount.doStats();
}
/***************************************************************************************
***************************************************************************************
***************************************************************************************/
void bdNode::send_ping(bdId *id)
{
bdToken transId;
genNewTransId(&transId);
//registerOutgoingMsg(&id, &transId, BITDHT_MSG_TYPE_PING);
msgout_ping(id, &transId);
}
void bdNode::send_query(bdId *id, bdNodeId *targetNodeId)
{
/* push out query */
bdToken transId;
genNewTransId(&transId);
//registerOutgoingMsg(&id, &transId, BITDHT_MSG_TYPE_FIND_NODE);
msgout_find_node(id, &transId, targetNodeId);
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::send_query() Find Node Req for : ";
mFns->bdPrintId(std::cerr, &id);
std::cerr << " searching for : ";
mFns->bdPrintNodeId(std::cerr, &targetNodeId);
std::cerr << std::endl;
#endif
}
void bdNode::send_connect_msg(bdId *id, int msgtype, bdId *srcAddr, bdId *destAddr, int mode, int status)
{
/* push out query */
bdToken transId;
genNewTransId(&transId);
//registerOutgoingMsg(&id, &transId, BITDHT_MSG_TYPE_FIND_NODE);
msgout_connect_genmsg(id, &transId, msgtype, srcAddr, destAddr, mode, status);
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::send_connect_msg() to: ";
mFns->bdPrintId(std::cerr, &id);
std::cerr << std::endl;
#endif
}
void bdNode::checkPotentialPeer(bdId *id, bdId *src)
{
bool isWorthyPeer = mQueryMgr->checkPotentialPeer(id, src);
if (isWorthyPeer)
{
addPotentialPeer(id, src);
}
if (src) // src can be NULL!
{
mConnMgr->addPotentialConnectionProxy(src, id); // CAUTION: Order switched!
}
}
void bdNode::addPotentialPeer(bdId *id, bdId *src)
{
mPotentialPeers.push_back(*id);
}
// virtual so manager can do callback.
// peer flags defined in bdiface.h
void bdNode::addPeer(const bdId *id, uint32_t peerflags)
{
#ifdef DEBUG_NODE_ACTIONS
fprintf(stderr, "bdNode::addPeer(");
mFns->bdPrintId(std::cerr, id);
fprintf(stderr, ")\n");
#endif
/* first check the filters */
if (mFilterPeers->checkPeer(id, peerflags))
{
std::cerr << "bdNode::addPeer(";
mFns->bdPrintId(std::cerr, id);
std::cerr << ", " << std::hex << peerflags << std::dec;
std::cerr << ") FAILED the BAD PEER FILTER!!!! DISCARDING MSG";
std::cerr << std::endl;
std::list<struct sockaddr_in> filteredIPs;
mFilterPeers->filteredIPs(filteredIPs);
mStore.filterIpList(filteredIPs);
return;
}
mQueryMgr->addPeer(id, peerflags);
mNodeSpace.add_peer(id, peerflags);
bdPeer peer;
peer.mPeerId = *id;
peer.mPeerFlags = peerflags;
peer.mLastRecvTime = time(NULL);
mStore.addStore(&peer);
// Finally we pass to connections for them to use.
mConnMgr->updatePotentialConnectionProxy(id, peerflags);
#define DISPLAY_BITDHTNODES 1
#ifdef DISPLAY_BITDHTNODES
/* TEMP to extract IDS for BloomFilter */
if (peerflags & BITDHT_PEER_STATUS_DHT_ENGINE)
{
std::cerr << "bdNode::addPeer() FOUND BITDHT PEER";
std::cerr << std::endl;
mFns->bdPrintNodeId(std::cerr, &(id->id));
std::cerr << std::endl;
}
#endif
}
/************************************ Process Remote Query *************************/
/* increased the allowed processing rate from 1/sec => 5/sec */
void bdNode::processRemoteQuery()
{
int nProcessed = 0;
time_t oldTS = time(NULL) - BITDHT_MAX_REMOTE_QUERY_AGE;
while(nProcessed < MAX_REMOTE_PROCESS_PER_CYCLE)
{
/* extra exit clause */
if (mRemoteQueries.size() < 1) return;
bdRemoteQuery &query = mRemoteQueries.front();
/* discard older ones (stops queue getting overloaded) */
if (query.mQueryTS > oldTS)
{
/* recent enough to process! */
nProcessed++;
switch(query.mQueryType)
{
case BD_QUERY_NEIGHBOURS:
{
/* search bdSpace for neighbours */
//std::list<bdId> excludeList;
std::list<bdId> nearList;
std::multimap<bdMetric, bdId> nearest;
std::multimap<bdMetric, bdId>::iterator it;
//mNodeSpace.find_nearest_nodes(&(query.mQuery), BITDHT_QUERY_NEIGHBOUR_PEERS, excludeList, nearest, 0);
mNodeSpace.find_nearest_nodes(&(query.mQuery), BITDHT_QUERY_NEIGHBOUR_PEERS, nearest);
for(it = nearest.begin(); it != nearest.end(); it++)
{
nearList.push_back(it->second);
}
msgout_reply_find_node(&(query.mId), &(query.mTransId), nearList);
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::processRemoteQuery() Reply to Find Node: ";
mFns->bdPrintId(std::cerr, &(query.mId));
std::cerr << " searching for : ";
mFns->bdPrintNodeId(std::cerr, &(query.mQuery));
std::cerr << ", found " << nearest.size() << " nodes ";
std::cerr << std::endl;
#endif
break;
}
case BD_QUERY_HASH:
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::processRemoteQuery() Reply to Query Node: ";
mFns->bdPrintId(std::cerr, &(query.mId));
std::cerr << " TODO";
std::cerr << std::endl;
#endif
/* TODO */
break;
}
default:
{
/* drop */
/* unprocess! */
nProcessed--;
break;
}
}
}
else
{
std::cerr << "bdNode::processRemoteQuery() Query Too Old: Discarding: ";
mFns->bdPrintId(std::cerr, &(query.mId));
std::cerr << std::endl;
#ifdef DEBUG_NODE_MSGS
#endif
}
mRemoteQueries.pop_front();
}
}
/************************************ Message Buffering ****************************/
/* interaction with outside world */
int bdNode::outgoingMsg(struct sockaddr_in *addr, char *msg, int *len)
{
if (mOutgoingMsgs.size() > 0)
{
bdNodeNetMsg *bdmsg = mOutgoingMsgs.front();
//bdmsg->print(std::cerr);
mOutgoingMsgs.pop_front();
//bdmsg->print(std::cerr);
/* truncate if necessary */
if (bdmsg->mSize < *len)
{
//std::cerr << "bdNode::outgoingMsg space(" << *len << ") msgsize(" << bdmsg->mSize << ")";
//std::cerr << std::endl;
*len = bdmsg->mSize;
}
else
{
//std::cerr << "bdNode::outgoingMsg space(" << *len << ") small - trunc from: "
//<< bdmsg->mSize;
//std::cerr << std::endl;
}
memcpy(msg, bdmsg->data, *len);
*addr = bdmsg->addr;
//bdmsg->print(std::cerr);
delete bdmsg;
return 1;
}
return 0;
}
void bdNode::incomingMsg(struct sockaddr_in *addr, char *msg, int len)
{
/* check against the filter */
if (mFilterPeers->addrOkay(addr))
{
bdNodeNetMsg *bdmsg = new bdNodeNetMsg(msg, len, addr);
mIncomingMsgs.push_back(bdmsg);
}
else
{
std::cerr << "bdNode::incomingMsg() Incoming Packet Filtered";
std::cerr << std::endl;
}
}
/************************************ Message Handling *****************************/
/* Outgoing Messages */
void bdNode::msgout_ping(bdId *id, bdToken *transId)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_ping() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << std::endl;
#endif
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_PING);
/* create string */
char msg[10240];
int avail = 10240;
int blen = bitdht_create_ping_msg(transId, &(mOwnId), msg, avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_PING, true);
}
void bdNode::msgout_pong(bdId *id, bdToken *transId)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_pong() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " Version: " << version;
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << std::endl;
#endif
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_PONG);
/* generate message, send to udp */
bdToken vid;
uint32_t vlen = BITDHT_TOKEN_MAX_LEN;
if (mDhtVersion.size() < vlen)
{
vlen = mDhtVersion.size();
}
memcpy(vid.data, mDhtVersion.c_str(), vlen);
vid.len = vlen;
char msg[10240];
int avail = 10240;
int blen = bitdht_response_ping_msg(transId, &(mOwnId), &vid, msg, avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_PONG, true);
}
void bdNode::msgout_find_node(bdId *id, bdToken *transId, bdNodeId *query)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_find_node() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Query: ";
mFns->bdPrintNodeId(std::cerr, query);
std::cerr << std::endl;
#endif
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_FIND_NODE);
char msg[10240];
int avail = 10240;
int blen = bitdht_find_node_msg(transId, &(mOwnId), query, msg, avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_QUERYNODE, true);
}
void bdNode::msgout_reply_find_node(bdId *id, bdToken *transId, std::list<bdId> &peers)
{
char msg[10240];
int avail = 10240;
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_REPLY_NODE);
mAccount.incCounter(BDACCOUNT_MSG_REPLYFINDNODE, true);
int blen = bitdht_resp_node_msg(transId, &(mOwnId), peers, msg, avail-1);
sendPkt(msg, blen, id->addr);
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_reply_find_node() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Peers:";
std::list<bdId>::iterator it;
for(it = peers.begin(); it != peers.end(); it++)
{
std::cerr << " ";
mFns->bdPrintId(std::cerr, &(*it));
}
std::cerr << std::endl;
#endif
}
/*****************
* SECOND HALF
*
*****/
void bdNode::msgout_get_hash(bdId *id, bdToken *transId, bdNodeId *info_hash)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_get_hash() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " InfoHash: ";
mFns->bdPrintNodeId(std::cerr, info_hash);
std::cerr << std::endl;
#endif
char msg[10240];
int avail = 10240;
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_GET_HASH);
int blen = bitdht_get_peers_msg(transId, &(mOwnId), info_hash, msg, avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_QUERYHASH, true);
}
void bdNode::msgout_reply_hash(bdId *id, bdToken *transId, bdToken *token, std::list<std::string> &values)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_reply_hash() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Token: ";
bdPrintToken(std::cerr, token);
std::cerr << " Peers: ";
std::list<std::string>::iterator it;
for(it = values.begin(); it != values.end(); it++)
{
std::cerr << " ";
bdPrintCompactPeerId(std::cerr, *it);
}
std::cerr << std::endl;
#endif
char msg[10240];
int avail = 10240;
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_REPLY_HASH);
int blen = bitdht_peers_reply_hash_msg(transId, &(mOwnId), token, values, msg, avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_REPLYQUERYHASH, true);
}
void bdNode::msgout_reply_nearest(bdId *id, bdToken *transId, bdToken *token, std::list<bdId> &nodes)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_reply_nearest() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Token: ";
bdPrintToken(std::cerr, token);
std::cerr << " Nodes:";
std::list<bdId>::iterator it;
for(it = nodes.begin(); it != nodes.end(); it++)
{
std::cerr << " ";
mFns->bdPrintId(std::cerr, &(*it));
}
std::cerr << std::endl;
#endif
char msg[10240];
int avail = 10240;
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_REPLY_NEAR);
int blen = bitdht_peers_reply_closest_msg(transId, &(mOwnId), token, nodes, msg, avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_REPLYQUERYHASH, true);
}
void bdNode::msgout_post_hash(bdId *id, bdToken *transId, bdNodeId *info_hash, uint32_t port, bdToken *token)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_post_hash() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Info_Hash: ";
mFns->bdPrintNodeId(std::cerr, info_hash);
std::cerr << " Port: " << port;
std::cerr << " Token: ";
bdPrintToken(std::cerr, token);
std::cerr << std::endl;
#endif
char msg[10240];
int avail = 10240;
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_POST_HASH);
int blen = bitdht_announce_peers_msg(transId,&(mOwnId),info_hash,port,token,msg,avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_POSTHASH, true);
}
void bdNode::msgout_reply_post(bdId *id, bdToken *transId)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_reply_post() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << std::endl;
#endif
/* generate message, send to udp */
char msg[10240];
int avail = 10240;
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_REPLY_POST);
int blen = bitdht_reply_announce_msg(transId, &(mOwnId), msg, avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_REPLYPOSTHASH, true);
}
void bdNode::sendPkt(char *msg, int len, struct sockaddr_in addr)
{
//fprintf(stderr, "bdNode::sendPkt(%d) to %s:%d\n",
// len, inet_ntoa(addr.sin_addr), htons(addr.sin_port));
/* filter outgoing packets */
if (mFilterPeers->addrOkay(&addr))
{
bdNodeNetMsg *bdmsg = new bdNodeNetMsg(msg, len, &addr);
//bdmsg->print(std::cerr);
mOutgoingMsgs.push_back(bdmsg);
//bdmsg->print(std::cerr);
}
else
{
std::cerr << "bdNode::sendPkt() Outgoing Packet Filtered";
std::cerr << std::endl;
}
return;
}
/********************* Incoming Messages *************************/
/*
* These functions are holding up udp queue -> so quickly
* parse message, and get on with it!
*/
void bdNode::recvPkt(char *msg, int len, struct sockaddr_in addr)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() msg[" << len << "] = ";
for(int i = 0; i < len; i++)
{
if ((msg[i] > 31) && (msg[i] < 127))
{
std::cerr << msg[i];
}
else
{
std::cerr << "[" << (int) msg[i] << "]";
}
}
std::cerr << std::endl;
#endif
/* convert to a be_node */
be_node *node = be_decoden(msg, len);
if (!node)
{
/* invalid decode */
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Failure to decode. Dropping Msg";
std::cerr << std::endl;
std::cerr << "message length: " << len;
std::cerr << std::endl;
std::cerr << "msg[] = ";
for(int i = 0; i < len; i++)
{
if ((msg[i] > 31) && (msg[i] < 127))
{
std::cerr << msg[i];
}
else
{
std::cerr << "[" << (int) msg[i] << "]";
}
}
std::cerr << std::endl;
#endif
return;
}
/* find message type */
uint32_t beType = beMsgType(node);
bool beQuery = (BE_Y_Q == beMsgGetY(node));
if (!beType)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Invalid Message Type. Dropping Msg";
std::cerr << std::endl;
#endif
/* invalid message */
be_free(node);
return;
}
/************************* handle token (all) **************************/
be_node *be_transId = beMsgGetDictNode(node, "t");
bdToken transId;
if (be_transId)
{
beMsgGetToken(be_transId, transId);
}
else
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() TransId Failure. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/************************* handle data (all) **************************/
/* extract common features */
char dictkey[2] = "r";
if (beQuery)
{
dictkey[0] = 'a';
}
be_node *be_data = beMsgGetDictNode(node, dictkey);
if (!be_data)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Missing Data Body. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/************************** handle id (all) ***************************/
be_node *be_id = beMsgGetDictNode(be_data, "id");
bdNodeId id;
if (be_id)
{
beMsgGetNodeId(be_id, id);
}
else
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Missing Peer Id. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/************************ handle version (optional:pong) **************/
be_node *be_version = NULL;
bdToken versionId;
if (beType == BITDHT_MSG_TYPE_PONG)
{
be_version = beMsgGetDictNode(node, "v");
if (!be_version)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() NOTE: PONG missing Optional Version.";
std::cerr << std::endl;
#endif
}
}
if (be_version)
{
beMsgGetToken(be_version, versionId);
}
/*********** handle target (query) or info_hash (get_hash) ************/
bdNodeId target_info_hash;
be_node *be_target = NULL;
if (beType == BITDHT_MSG_TYPE_FIND_NODE)
{
be_target = beMsgGetDictNode(be_data, "target");
if (!be_target)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Missing Target / Info_Hash. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
}
else if ((beType == BITDHT_MSG_TYPE_GET_HASH) ||
(beType == BITDHT_MSG_TYPE_POST_HASH))
{
be_target = beMsgGetDictNode(be_data, "info_hash");
if (!be_target)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Missing Target / Info_Hash. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
}
if (be_target)
{
beMsgGetNodeId(be_target, target_info_hash);
}
/*********** handle nodes (reply_query or reply_near) *****************/
std::list<bdId> nodes;
be_node *be_nodes = NULL;
if ((beType == BITDHT_MSG_TYPE_REPLY_NODE) ||
(beType == BITDHT_MSG_TYPE_REPLY_NEAR))
{
be_nodes = beMsgGetDictNode(be_data, "nodes");
if (!be_nodes)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Missing Nodes. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
}
if (be_nodes)
{
beMsgGetListBdIds(be_nodes, nodes);
}
/******************* handle values (reply_hash) ***********************/
std::list<std::string> values;
be_node *be_values = NULL;
if (beType == BITDHT_MSG_TYPE_REPLY_HASH)
{
be_values = beMsgGetDictNode(be_data, "values");
if (!be_values)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Missing Values. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
}
if (be_values)
{
beMsgGetListStrings(be_values, values);
}
/************ handle token (reply_hash, reply_near, post hash) ********/
bdToken token;
be_node *be_token = NULL;
if ((beType == BITDHT_MSG_TYPE_REPLY_HASH) ||
(beType == BITDHT_MSG_TYPE_REPLY_NEAR) ||
(beType == BITDHT_MSG_TYPE_POST_HASH))
{
be_token = beMsgGetDictNode(be_data, "token");
if (!be_token)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Missing Token. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
}
if (be_token)
{
beMsgGetToken(be_transId, transId);
}
/****************** handle port (post hash) ***************************/
uint32_t port;
be_node *be_port = NULL;
if (beType == BITDHT_MSG_TYPE_POST_HASH)
{
be_port = beMsgGetDictNode(be_data, "port");
if (!be_port)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() POST_HASH Missing Port. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
}
if (be_port)
{
beMsgGetUInt32(be_port, &port);
}
/****************** handle Connect (lots) ***************************/
bdId connSrcAddr;
bdId connDestAddr;
uint32_t connMode;
uint32_t connStatus;
uint32_t connType;
be_node *be_ConnSrcAddr = NULL;
be_node *be_ConnDestAddr = NULL;
be_node *be_ConnMode = NULL;
be_node *be_ConnStatus = NULL;
be_node *be_ConnType = NULL;
if (beType == BITDHT_MSG_TYPE_CONNECT)
{
/* SrcAddr */
be_ConnSrcAddr = beMsgGetDictNode(be_data, "src");
if (!be_ConnSrcAddr)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() CONNECT Missing SrcAddr. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/* DestAddr */
be_ConnDestAddr = beMsgGetDictNode(be_data, "dest");
if (!be_ConnDestAddr)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() CONNECT Missing DestAddr. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/* Mode */
be_ConnMode = beMsgGetDictNode(be_data, "mode");
if (!be_ConnMode)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() CONNECT Missing Mode. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/* Status */
be_ConnStatus = beMsgGetDictNode(be_data, "status");
if (!be_ConnStatus)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() CONNECT Missing Status. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/* Type */
be_ConnType = beMsgGetDictNode(be_data, "type");
if (!be_ConnType)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() CONNECT Missing Type. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
}
if (be_ConnSrcAddr)
{
beMsgGetBdId(be_ConnSrcAddr, connSrcAddr);
}
if (be_ConnDestAddr)
{
beMsgGetBdId(be_ConnDestAddr, connDestAddr);
}
if (be_ConnMode)
{
beMsgGetUInt32(be_ConnMode, &connMode);
}
if (be_ConnStatus)
{
beMsgGetUInt32(be_ConnStatus, &connStatus);
}
if (be_ConnType)
{
beMsgGetUInt32(be_ConnType, &connType);
}
/****************** Bits Parsed Ok. Process Msg ***********************/
/* Construct Source Id */
bdId srcId(id, addr);
checkIncomingMsg(&srcId, &transId, beType);
switch(beType)
{
case BITDHT_MSG_TYPE_PING: /* a: id, transId */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Responding to Ping : ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << std::endl;
#endif
msgin_ping(&srcId, &transId);
break;
}
case BITDHT_MSG_TYPE_PONG: /* r: id, transId */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Received Pong from : ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << std::endl;
#endif
if (be_version)
{
msgin_pong(&srcId, &transId, &versionId);
}
else
{
msgin_pong(&srcId, &transId, NULL);
}
break;
}
case BITDHT_MSG_TYPE_FIND_NODE: /* a: id, transId, target */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Req Find Node from : ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << " Looking for: ";
mFns->bdPrintNodeId(std::cerr, &target_info_hash);
std::cerr << std::endl;
#endif
msgin_find_node(&srcId, &transId, &target_info_hash);
break;
}
case BITDHT_MSG_TYPE_REPLY_NODE: /* r: id, transId, nodes */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Received Reply Node from : ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << std::endl;
#endif
msgin_reply_find_node(&srcId, &transId, nodes);
break;
}
case BITDHT_MSG_TYPE_GET_HASH: /* a: id, transId, info_hash */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Received SearchHash : ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << " for Hash: ";
mFns->bdPrintNodeId(std::cerr, &target_info_hash);
std::cerr << std::endl;
#endif
msgin_get_hash(&srcId, &transId, &target_info_hash);
break;
}
case BITDHT_MSG_TYPE_REPLY_HASH: /* r: id, transId, token, values */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Received Reply Hash : ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << std::endl;
#endif
msgin_reply_hash(&srcId, &transId, &token, values);
break;
}
case BITDHT_MSG_TYPE_REPLY_NEAR: /* r: id, transId, token, nodes */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Received Reply Near : ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << std::endl;
#endif
msgin_reply_nearest(&srcId, &transId, &token, nodes);
break;
}
case BITDHT_MSG_TYPE_POST_HASH: /* a: id, transId, info_hash, port, token */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Post Hash from : ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << " to post: ";
mFns->bdPrintNodeId(std::cerr, &target_info_hash);
std::cerr << " with port: " << port;
std::cerr << std::endl;
#endif
msgin_post_hash(&srcId, &transId, &target_info_hash, port, &token);
break;
}
case BITDHT_MSG_TYPE_REPLY_POST: /* r: id, transId */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Reply Post from: ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << std::endl;
#endif
msgin_reply_post(&srcId, &transId);
break;
}
case BITDHT_MSG_TYPE_CONNECT: /* a: id, src, dest, mode, status, type */
{
//#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() ConnectMsg from: ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << std::endl;
//#endif
msgin_connect_genmsg(&srcId, &transId, connType,
&connSrcAddr, &connDestAddr,
connMode, connStatus);
break;
}
default:
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() ERROR";
std::cerr << std::endl;
#endif
/* ERROR */
break;
}
}
be_free(node);
return;
}
/* Input: id, token.
* Response: pong(id, token)
*/
void bdNode::msgin_ping(bdId *id, bdToken *transId)
{
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_ping() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << std::endl;
#endif
mAccount.incCounter(BDACCOUNT_MSG_PING, false);
/* peer is alive */
uint32_t peerflags = BITDHT_PEER_STATUS_RECV_PING; /* no id typically, so cant get version */
addPeer(id, peerflags);
/* reply */
msgout_pong(id, transId);
}
/* Input: id, token, (+optional version)
* Response: store peer.
*/
void bdNode::msgin_pong(bdId *id, bdToken *transId, bdToken *versionId)
{
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_pong() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " Version: TODO!"; // << version;
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << std::endl;
#else
(void) transId;
#endif
mAccount.incCounter(BDACCOUNT_MSG_PONG, false);
/* recv pong, and peer is alive. add to DHT */
//uint32_t vId = 0; // TODO XXX convertBdVersionToVID(versionId);
/* calculate version match with peer */
bool sameDhtEngine = false;
bool sameDhtVersion = false;
bool sameAppl = false;
bool sameApplVersion = false;
if (versionId)
{
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_pong() Peer Version: ";
for(int i = 0; i < versionId->len; i++)
{
std::cerr << versionId->data[i];
}
std::cerr << std::endl;
#endif
/* check two bytes */
if ((versionId->len >= 2) && (mDhtVersion.size() >= 2) &&
(versionId->data[0] == mDhtVersion[0]) && (versionId->data[1] == mDhtVersion[1]))
{
sameDhtEngine = true;
}
/* check two bytes.
* Due to Old Versions not having this field, we need to check that they are numbers.
* We have a Major version, and minor version....
* This flag is set if Major is same, and minor is greater or equal to our version.
*/
if ((versionId->len >= 4) && (mDhtVersion.size() >= 4))
{
if ((isdigit(versionId->data[2]) && isdigit(versionId->data[3])) &&
(versionId->data[2] == mDhtVersion[2]) && (versionId->data[3] >= mDhtVersion[3]))
{
sameDhtVersion = true;
}
}
if ((sameDhtVersion) && (!sameDhtEngine))
{
sameDhtVersion = false;
std::cerr << "bdNode::msgin_pong() STRANGE Peer Version: ";
for(int i = 0; i < versionId->len; i++)
{
std::cerr << versionId->data[i];
}
std::cerr << std::endl;
}
/* check two bytes */
if ((versionId->len >= 6) && (mDhtVersion.size() >= 6) &&
(versionId->data[4] == mDhtVersion[4]) && (versionId->data[5] == mDhtVersion[5]))
{
sameAppl = true;
}
/* check two bytes */
if ((versionId->len >= 8) && (mDhtVersion.size() >= 8))
{
if ((isdigit(versionId->data[6]) && isdigit(versionId->data[7])) &&
(versionId->data[6] == mDhtVersion[6]) && (versionId->data[7] >= mDhtVersion[7]))
{
sameApplVersion = true;
}
}
}
else
{
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_pong() No Version";
std::cerr << std::endl;
#endif
}
uint32_t peerflags = BITDHT_PEER_STATUS_RECV_PONG; /* should have id too */
if (sameDhtEngine)
{
peerflags |= BITDHT_PEER_STATUS_DHT_ENGINE;
}
if (sameDhtVersion)
{
peerflags |= BITDHT_PEER_STATUS_DHT_ENGINE_VERSION;
}
if (sameAppl)
{
peerflags |= BITDHT_PEER_STATUS_DHT_APPL;
}
if (sameApplVersion)
{
peerflags |= BITDHT_PEER_STATUS_DHT_APPL_VERSION;
}
addPeer(id, peerflags);
}
/* Input: id, token, queryId */
void bdNode::msgin_find_node(bdId *id, bdToken *transId, bdNodeId *query)
{
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_find_node() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " From: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Query: ";
mFns->bdPrintNodeId(std::cerr, query);
std::cerr << std::endl;
#endif
mAccount.incCounter(BDACCOUNT_MSG_QUERYNODE, false);
/* store query... */
queueQuery(id, query, transId, BD_QUERY_NEIGHBOURS);
uint32_t peerflags = 0; /* no id, and no help! */
addPeer(id, peerflags);
}
void bdNode::msgin_reply_find_node(bdId *id, bdToken *transId, std::list<bdId> &nodes)
{
std::list<bdId>::iterator it;
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::msgin_reply_find_node() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " From: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Peers:";
for(it = nodes.begin(); it != nodes.end(); it++)
{
std::cerr << " ";
mFns->bdPrintId(std::cerr, &(*it));
}
std::cerr << std::endl;
#else
(void) transId;
#endif
mAccount.incCounter(BDACCOUNT_MSG_REPLYFINDNODE, false);
/* add neighbours to the potential list */
for(it = nodes.begin(); it != nodes.end(); it++)
{
checkPotentialPeer(&(*it), id);
}
/* received reply - so peer must be good */
uint32_t peerflags = BITDHT_PEER_STATUS_RECV_NODES; /* no id ;( */
addPeer(id, peerflags);
}
/********* THIS IS THE SECOND STAGE
*
*/
void bdNode::msgin_get_hash(bdId *id, bdToken *transId, bdNodeId *info_hash)
{
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_get_hash() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " From: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " InfoHash: ";
mFns->bdPrintNodeId(std::cerr, info_hash);
std::cerr << std::endl;
#endif
mAccount.incCounter(BDACCOUNT_MSG_QUERYHASH, false);
/* generate message, send to udp */
queueQuery(id, info_hash, transId, BD_QUERY_HASH);
}
void bdNode::msgin_reply_hash(bdId *id, bdToken *transId, bdToken *token, std::list<std::string> &values)
{
mAccount.incCounter(BDACCOUNT_MSG_REPLYQUERYHASH, false);
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_reply_hash() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " From: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Token: ";
bdPrintToken(std::cerr, token);
std::cerr << " Peers: ";
std::list<std::string>::iterator it;
for(it = values.begin(); it != values.end(); it++)
{
std::cerr << " ";
bdPrintCompactPeerId(std::cerr, *it);
}
std::cerr << std::endl;
#else
(void) id;
(void) transId;
(void) token;
(void) values;
#endif
}
void bdNode::msgin_reply_nearest(bdId *id, bdToken *transId, bdToken *token, std::list<bdId> &nodes)
{
mAccount.incCounter(BDACCOUNT_MSG_REPLYQUERYHASH, false);
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_reply_nearest() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " From: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Token: ";
bdPrintToken(std::cerr, token);
std::cerr << " Nodes:";
std::list<bdId>::iterator it;
for(it = nodes.begin(); it != nodes.end(); it++)
{
std::cerr << " ";
mFns->bdPrintId(std::cerr, &(*it));
}
std::cerr << std::endl;
#else
(void) id;
(void) transId;
(void) token;
(void) nodes;
#endif
}
void bdNode::msgin_post_hash(bdId *id, bdToken *transId, bdNodeId *info_hash, uint32_t port, bdToken *token)
{
mAccount.incCounter(BDACCOUNT_MSG_POSTHASH, false);
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_post_hash() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " From: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Info_Hash: ";
mFns->bdPrintNodeId(std::cerr, info_hash);
std::cerr << " Port: " << port;
std::cerr << " Token: ";
bdPrintToken(std::cerr, token);
std::cerr << std::endl;
#else
(void) id;
(void) transId;
(void) info_hash;
(void) port;
(void) token;
#endif
}
void bdNode::msgin_reply_post(bdId *id, bdToken *transId)
{
/* generate message, send to udp */
mAccount.incCounter(BDACCOUNT_MSG_REPLYPOSTHASH, false);
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_reply_post() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " From: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << std::endl;
#else
(void) id;
(void) transId;
#endif
}
/************************************************************************************************************
******************************************** Message Interface **********************************************
************************************************************************************************************/
/* Outgoing Messages */
std::string getConnectMsgType(int msgtype)
{
switch(msgtype)
{
case BITDHT_MSG_TYPE_CONNECT_REQUEST:
return "ConnectRequest";
break;
case BITDHT_MSG_TYPE_CONNECT_REPLY:
return "ConnectReply";
break;
case BITDHT_MSG_TYPE_CONNECT_START:
return "ConnectStart";
break;
case BITDHT_MSG_TYPE_CONNECT_ACK:
return "ConnectAck";
break;
default:
return "ConnectUnknown";
break;
}
}
void bdNode::msgout_connect_genmsg(bdId *id, bdToken *transId, int msgtype, bdId *srcAddr, bdId *destAddr, int mode, int status)
{
std::cerr << "bdConnectManager::msgout_connect_genmsg() Type: " << getConnectMsgType(msgtype);
std::cerr << " TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " SrcAddr: ";
mFns->bdPrintId(std::cerr, srcAddr);
std::cerr << " DestAddr: ";
mFns->bdPrintId(std::cerr, destAddr);
std::cerr << " Mode: " << mode;
std::cerr << " Status: " << status;
std::cerr << std::endl;
#ifdef DEBUG_NODE_MSGOUT
#endif
switch(msgtype)
{
default:
case BITDHT_MSG_TYPE_CONNECT_REQUEST:
mAccount.incCounter(BDACCOUNT_MSG_CONNECTREQUEST, true);
break;
case BITDHT_MSG_TYPE_CONNECT_REPLY:
mAccount.incCounter(BDACCOUNT_MSG_CONNECTREPLY, true);
break;
case BITDHT_MSG_TYPE_CONNECT_START:
mAccount.incCounter(BDACCOUNT_MSG_CONNECTSTART, true);
break;
case BITDHT_MSG_TYPE_CONNECT_ACK:
mAccount.incCounter(BDACCOUNT_MSG_CONNECTACK, true);
break;
}
registerOutgoingMsg(id, transId, msgtype);
/* create string */
char msg[10240];
int avail = 10240;
int blen = bitdht_connect_genmsg(transId, &(mOwnId), msgtype, srcAddr, destAddr, mode, status, msg, avail-1);
sendPkt(msg, blen, id->addr);
}
void bdNode::msgin_connect_genmsg(bdId *id, bdToken *transId, int msgtype,
bdId *srcAddr, bdId *destAddr, int mode, int status)
{
std::list<bdId>::iterator it;
std::cerr << "bdConnectManager::msgin_connect_genmsg() Type: " << getConnectMsgType(msgtype);
std::cerr << " TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " From: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " SrcAddr: ";
mFns->bdPrintId(std::cerr, srcAddr);
std::cerr << " DestAddr: ";
mFns->bdPrintId(std::cerr, destAddr);
std::cerr << " Mode: " << mode;
std::cerr << " Status: " << status;
std::cerr << std::endl;
#ifdef DEBUG_NODE_MSGS
#else
(void) transId;
#endif
/* switch to actual work functions */
uint32_t peerflags = 0;
switch(msgtype)
{
case BITDHT_MSG_TYPE_CONNECT_REQUEST:
peerflags = BITDHT_PEER_STATUS_RECV_CONNECT_MSG;
mAccount.incCounter(BDACCOUNT_MSG_CONNECTREQUEST, false);
mConnMgr->recvedConnectionRequest(id, srcAddr, destAddr, mode);
break;
case BITDHT_MSG_TYPE_CONNECT_REPLY:
peerflags = BITDHT_PEER_STATUS_RECV_CONNECT_MSG;
mAccount.incCounter(BDACCOUNT_MSG_CONNECTREPLY, false);
mConnMgr->recvedConnectionReply(id, srcAddr, destAddr, mode, status);
break;
case BITDHT_MSG_TYPE_CONNECT_START:
peerflags = BITDHT_PEER_STATUS_RECV_CONNECT_MSG;
mAccount.incCounter(BDACCOUNT_MSG_CONNECTSTART, false);
mConnMgr->recvedConnectionStart(id, srcAddr, destAddr, mode, status);
break;
case BITDHT_MSG_TYPE_CONNECT_ACK:
peerflags = BITDHT_PEER_STATUS_RECV_CONNECT_MSG;
mAccount.incCounter(BDACCOUNT_MSG_CONNECTACK, false);
mConnMgr->recvedConnectionAck(id, srcAddr, destAddr, mode);
break;
default:
break;
}
/* received message - so peer must be good */
addPeer(id, peerflags);
}
/****************** Other Functions ******************/
void bdNode::genNewToken(bdToken *token)
{
#ifdef DEBUG_NODE_ACTIONS
fprintf(stderr, "bdNode::genNewToken()");
fprintf(stderr, ")\n");
#endif
// XXX is this a good way to do it?
// Variable length, from 4 chars up to lots... 10?
// leave for the moment, but fix.
std::ostringstream out;
out << std::setw(4) << std::setfill('0') << bdRandom::random_u32();
std::string num = out.str();
int len = num.size();
if (len > BITDHT_TOKEN_MAX_LEN)
len = BITDHT_TOKEN_MAX_LEN;
for(int i = 0; i < len; i++)
{
token->data[i] = num[i];
}
token->len = len;
}
uint32_t transIdCounter = 0;
void bdNode::genNewTransId(bdToken *token)
{
/* generate message, send to udp */
#ifdef DEBUG_NODE_ACTIONS
fprintf(stderr, "bdNode::genNewTransId()");
fprintf(stderr, ")\n");
#endif
std::ostringstream out;
out << std::setw(2) << std::setfill('0') << transIdCounter++;
std::string num = out.str();
int len = num.size();
if (len > BITDHT_TOKEN_MAX_LEN)
len = BITDHT_TOKEN_MAX_LEN;
for(int i = 0; i < len; i++)
{
token->data[i] = num[i];
}
token->len = len;
}
/* Store Remote Query for processing */
int bdNode::queueQuery(bdId *id, bdNodeId *query, bdToken *transId, uint32_t query_type)
{
#ifdef DEBUG_NODE_ACTIONS
std::cerr << "bdnode::queueQuery()" << std::endl;
#endif
mRemoteQueries.push_back(bdRemoteQuery(id, query, transId, query_type));
return 1;
}
/*************** Register Transaction Ids *************/
void bdNode::registerOutgoingMsg(bdId *id, bdToken *transId, uint32_t msgType)
{
#ifdef DEBUG_MSG_CHECKS
std::cerr << "bdNode::registerOutgoingMsg(";
mFns->bdPrintId(std::cerr, id);
std::cerr << ", " << msgType << ")";
std::cerr << std::endl;
#else
(void) id;
(void) msgType;
#endif
#ifdef USE_HISTORY
mHistory.addMsg(id, transId, msgType, false);
#else
(void) transId;
#endif
/****
#define BITDHT_MSG_TYPE_UNKNOWN 0
#define BITDHT_MSG_TYPE_PING 1
#define BITDHT_MSG_TYPE_PONG 2
#define BITDHT_MSG_TYPE_FIND_NODE 3
#define BITDHT_MSG_TYPE_REPLY_NODE 4
#define BITDHT_MSG_TYPE_GET_HASH 5
#define BITDHT_MSG_TYPE_REPLY_HASH 6
#define BITDHT_MSG_TYPE_REPLY_NEAR 7
#define BITDHT_MSG_TYPE_POST_HASH 8
#define BITDHT_MSG_TYPE_REPLY_POST 9
***/
}
uint32_t bdNode::checkIncomingMsg(bdId *id, bdToken *transId, uint32_t msgType)
{
#ifdef DEBUG_MSG_CHECKS
std::cerr << "bdNode::checkIncomingMsg(";
mFns->bdPrintId(std::cerr, id);
std::cerr << ", " << msgType << ")";
std::cerr << std::endl;
#else
(void) id;
(void) msgType;
#endif
#ifdef USE_HISTORY
mHistory.addMsg(id, transId, msgType, true);
#else
(void) transId;
#endif
return 0;
}
void bdNode::cleanupTransIdRegister()
{
return;
}
/*************** Internal Msg Storage *****************/
bdNodeNetMsg::bdNodeNetMsg(char *msg, int len, struct sockaddr_in *in_addr)
:data(NULL), mSize(len), addr(*in_addr)
{
data = (char *) malloc(len);
memcpy(data, msg, len);
//print(std::cerr);
}
void bdNodeNetMsg::print(std::ostream &out)
{
out << "bdNodeNetMsg::print(" << mSize << ") to "
<< inet_ntoa(addr.sin_addr) << ":" << htons(addr.sin_port);
out << std::endl;
}
bdNodeNetMsg::~bdNodeNetMsg()
{
free(data);
}