RetroShare/libbitdht/src/bitdht/bdnode.cc
sehraf 3bb03ff89d Added new (optional) callback to libbitdht to ask upper layer if an IP is banned.
In case this callback is implemented it will be used in favour of the built-in ban list.
2016-06-20 22:30:51 +02:00

2486 lines
58 KiB
C++

/*
* bitdht/bdnode.cc
*
* BitDHT: An Flexible DHT library.
*
* Copyright 2010-2011 by Robert Fernie
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 3 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "bitdht@lunamutt.com".
*
*/
#include "bitdht/bdnode.h"
#include "bitdht/bencode.h"
#include "bitdht/bdmsgs.h"
#include "bitdht/bdquerymgr.h"
#include "bitdht/bdfilter.h"
#include "util/bdnet.h"
#include "util/bdrandom.h"
#include "util/bdstring.h"
#include <string.h>
#include <stdlib.h>
#include <iostream>
#include <iomanip>
#define BITDHT_QUERY_START_PEERS 10
#define BITDHT_QUERY_NEIGHBOUR_PEERS 8
#define BITDHT_MAX_REMOTE_QUERY_AGE 3 // 3 seconds, keep it fresh.
#define MAX_REMOTE_PROCESS_PER_CYCLE 5
/****
* #define USE_HISTORY 1
*
* #define DEBUG_NODE_MULTIPEER 1
* #define DEBUG_NODE_PARSE 1
* #define DEBUG_NODE_MSGS 1
* #define DEBUG_NODE_ACTIONS 1
* #define DEBUG_NODE_MSGIN 1
* #define DEBUG_NODE_MSGOUT 1
*
* #define DISABLE_BAD_PEER_FILTER 1
*
***/
//#define DISABLE_BAD_PEER_FILTER 1
//#define USE_HISTORY 1
#define HISTORY_PERIOD 60
bdNode::bdNode(bdNodeId *ownId, std::string dhtVersion, const std::string& bootfile, const std::string& filterfile, bdDhtFunctions *fns, bdNodeManager *manager)
:mNodeSpace(ownId, fns),
mFilterPeers(filterfile,ownId, BITDHT_FILTER_REASON_OWNID, fns, manager),
mQueryMgr(NULL),
mConnMgr(NULL),
mOwnId(*ownId), mDhtVersion(dhtVersion), mStore(bootfile, fns), mFns(fns),
mFriendList(ownId), mHistory(HISTORY_PERIOD)
{
init(); /* (uses this pointers) stuff it - do it here! */
}
void bdNode::init()
{
mQueryMgr = new bdQueryManager(&mNodeSpace, mFns, this);
mConnMgr = new bdConnectManager(&mOwnId, &mNodeSpace, mQueryMgr, mFns, this);
//setNodeOptions(BITDHT_OPTIONS_MAINTAIN_UNSTABLE_PORT);
setNodeOptions(0);
mNodeDhtMode = 0;
setNodeDhtMode(BITDHT_MODE_TRAFFIC_DEFAULT);
}
bool bdNode::getFilteredPeers(std::list<bdFilteredPeer>& peers)
{
mFilterPeers.getFilteredPeers(peers) ;
return true ;
}
//
//void bdNode::loadFilteredPeers(const std::list<bdFilteredPeer>& peers)
//{
// mFilterPeers.loadFilteredPeers(peers) ;
//}
/* Unfortunately I've ended up with 2 calls down through the heirarchy...
* not ideal - must clean this up one day.
*/
#define ATTACH_NUMBER 5
void bdNode::setNodeOptions(uint32_t optFlags)
{
mNodeOptionFlags = optFlags;
if (optFlags & BITDHT_OPTIONS_MAINTAIN_UNSTABLE_PORT)
{
mNodeSpace.setAttachedFlag(BITDHT_PEER_STATUS_DHT_ENGINE | BITDHT_PEER_STATUS_DHT_ENGINE_VERSION, ATTACH_NUMBER);
}
else
{
mNodeSpace.setAttachedFlag(BITDHT_PEER_STATUS_DHT_ENGINE | BITDHT_PEER_STATUS_DHT_ENGINE_VERSION, 0);
}
}
#define BDNODE_HIGH_MSG_RATE 50
#define BDNODE_MED_MSG_RATE 10
#define BDNODE_LOW_MSG_RATE 5
#define BDNODE_TRICKLE_MSG_RATE 3
/* So we are setting this up so you can independently update each parameter....
* if the mask is empty - it'll use the previous parameter.
*
*/
uint32_t bdNode::setNodeDhtMode(uint32_t dhtFlags)
{
std::cerr << "bdNode::setNodeDhtMode(" << dhtFlags << "), origFlags: " << mNodeDhtMode;
std::cerr << std::endl;
uint32_t origFlags = mNodeDhtMode;
uint32_t traffic = dhtFlags & BITDHT_MODE_TRAFFIC_MASK;
if (traffic)
{
switch(traffic)
{
default:
case BITDHT_MODE_TRAFFIC_LOW:
mMaxAllowedMsgs = BDNODE_LOW_MSG_RATE;
break;
case BITDHT_MODE_TRAFFIC_MED:
mMaxAllowedMsgs = BDNODE_MED_MSG_RATE;
break;
case BITDHT_MODE_TRAFFIC_HIGH:
mMaxAllowedMsgs = BDNODE_HIGH_MSG_RATE;
break;
case BITDHT_MODE_TRAFFIC_TRICKLE:
mMaxAllowedMsgs = BDNODE_TRICKLE_MSG_RATE;
break;
}
}
else
{
dhtFlags |= (origFlags & BITDHT_MODE_TRAFFIC_MASK);
}
uint32_t relay = dhtFlags & BITDHT_MODE_RELAYSERVER_MASK;
if ((relay) && (relay != (origFlags & BITDHT_MODE_RELAYSERVER_MASK)))
{
/* changed */
switch(relay)
{
default:
case BITDHT_MODE_RELAYSERVERS_IGNORED:
mRelayMode = BITDHT_RELAYS_OFF;
dropRelayServers();
break;
case BITDHT_MODE_RELAYSERVERS_FLAGGED:
mRelayMode = BITDHT_RELAYS_ON;
pingRelayServers();
break;
case BITDHT_MODE_RELAYSERVERS_ONLY:
mRelayMode = BITDHT_RELAYS_ONLY;
pingRelayServers();
break;
case BITDHT_MODE_RELAYSERVERS_SERVER:
mRelayMode = BITDHT_RELAYS_SERVER;
pingRelayServers();
break;
}
mConnMgr->setRelayMode(mRelayMode);
}
else
{
dhtFlags |= (origFlags & BITDHT_MODE_RELAYSERVER_MASK);
}
mNodeDhtMode = dhtFlags;
std::cerr << "bdNode::setNodeDhtMode() newFlags: " << mNodeDhtMode;
std::cerr << std::endl;
return dhtFlags;
}
void bdNode::getOwnId(bdNodeId *id)
{
*id = mOwnId;
}
/***** Startup / Shutdown ******/
void bdNode::restartNode()
{
mAccount.resetStats();
mStore.reloadFromStore();
/* setup */
bdPeer peer;
while(mStore.getPeer(&peer))
{
addPotentialPeer(&(peer.mPeerId), NULL);
}
}
void bdNode::shutdownNode()
{
/* clear the queries */
mQueryMgr->shutdownQueries();
mConnMgr->shutdownConnections();
mRemoteQueries.clear();
/* clear the space */
mNodeSpace.clear();
mHashSpace.clear();
/* clear other stuff */
mPotentialPeers.clear();
mStore.clear();
/* clean up any outgoing messages */
while(mOutgoingMsgs.size() > 0)
{
bdNodeNetMsg *msg = mOutgoingMsgs.front();
mOutgoingMsgs.pop_front();
/* cleanup message */
delete msg;
}
}
/* Crappy initial store... use bdspace as answer */
void bdNode::updateStore()
{
mStore.writeStore();
}
bool bdNode::addressBanned(const sockaddr_in& raddr)
{
return !mFilterPeers.addrOkay(const_cast<sockaddr_in*>(&raddr)) ;
}
void bdNode::printState()
{
std::cerr << "bdNode::printState() for Peer: ";
mFns->bdPrintNodeId(std::cerr, &mOwnId);
std::cerr << std::endl;
mNodeSpace.printDHT();
mQueryMgr->printQueries();
mConnMgr->printConnections();
std::cerr << "Outstanding Potential Peers: " << mPotentialPeers.size();
std::cerr << std::endl;
#ifdef USE_HISTORY
mHistory.cleanupOldMsgs();
mHistory.printMsgs();
mHistory.analysePeers();
mHistory.peerTypeAnalysis();
// Incoming Query Analysis.
std::cerr << "Outstanding Query Requests: " << mRemoteQueries.size();
std::cerr << std::endl;
mQueryHistory.printMsgs();
#endif
mAccount.printStats(std::cerr);
}
void bdNode::iterationOff()
{
/* clean up any incoming messages */
while(mIncomingMsgs.size() > 0)
{
bdNodeNetMsg *msg = mIncomingMsgs.front();
mIncomingMsgs.pop_front();
/* cleanup message */
delete msg;
}
}
void bdNode::iteration()
{
#ifdef DEBUG_NODE_MULTIPEER
std::cerr << "bdNode::iteration() of Peer: ";
mFns->bdPrintNodeId(std::cerr, &mOwnId);
std::cerr << std::endl;
#endif
/* process incoming msgs */
while(mIncomingMsgs.size() > 0)
{
bdNodeNetMsg *msg = mIncomingMsgs.front();
mIncomingMsgs.pop_front();
recvPkt(msg->data, msg->mSize, msg->addr);
/* cleanup message */
delete msg;
}
/* assume that this is called once per second... limit the messages
* in theory, a query can generate up to 10 peers (which will all require a ping!).
* we want to handle all the pings we can... so we don't hold up the process.
* but we also want enough queries to keep things moving.
* so allow up to 90% of messages to be pings.
*
* ignore responses to other peers... as the number is very small generally
*
* The Rate IS NOW DEFINED IN NodeDhtMode.
*/
int allowedPings = 0.9 * mMaxAllowedMsgs;
int sentMsgs = 0;
int sentPings = 0;
#define BDNODE_MAX_POTENTIAL_PEERS_MULTIPLIER 5
/* Disable Queries if our Ping Queue is too long */
if (mPotentialPeers.size() > mMaxAllowedMsgs * BDNODE_MAX_POTENTIAL_PEERS_MULTIPLIER)
{
#ifdef DEBUG_NODE_MULTIPEER
std::cerr << "bdNode::iteration() Disabling Queries until PotentialPeer Queue reduced";
std::cerr << std::endl;
#endif
allowedPings = mMaxAllowedMsgs;
}
while((mPotentialPeers.size() > 0) && (sentMsgs < allowedPings))
{
/* check history ... is we have pinged them already...
* then simulate / pretend we have received a pong,
* and don't bother sending another ping.
*/
bdId pid = mPotentialPeers.front();
mPotentialPeers.pop_front();
/* don't send too many queries ... check history first */
#if 0
#ifdef USE_HISTORY
if (mHistory.validPeer(&pid))
{
/* just add as peer */
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::iteration() Pinging Known Potential Peer : ";
mFns->bdPrintId(std::cerr, &pid);
std::cerr << std::endl;
#endif
}
#endif
#endif
/**** TEMP ****/
{
send_ping(&pid);
sentMsgs++;
sentPings++;
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::iteration() Pinging Potential Peer : ";
mFns->bdPrintId(std::cerr, &pid);
std::cerr << std::endl;
#endif
}
}
/* allow each query to send up to one query... until maxMsgs has been reached */
int sentQueries = mQueryMgr->iterateQueries(mMaxAllowedMsgs-sentMsgs);
sentMsgs += sentQueries;
#ifdef DEBUG_NODE_ACTIONS
std::cerr << "bdNode::iteration() maxMsgs: " << maxMsgs << " sentPings: " << sentPings;
std::cerr << " / " << allowedPings;
std::cerr << " sentQueries: " << sentQueries;
std::cerr << std::endl;
#endif
/* process remote query too */
processRemoteQuery();
std::list<bdId> peerIds;
std::list<bdId>::iterator oit;
mNodeSpace.scanOutOfDatePeers(peerIds);
for(oit = peerIds.begin(); oit != peerIds.end(); oit++)
{
send_ping(&(*oit));
mAccount.incCounter(BDACCOUNT_MSG_OUTOFDATEPING, true);
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::iteration() Pinging Out-Of-Date Peer: ";
mFns->bdPrintId(std::cerr, &(*oit));
std::cerr << std::endl;
#endif
}
// Handle Connection loops.
mConnMgr->tickConnections();
mAccount.doStats();
}
/***************************************************************************************
***************************************************************************************
***************************************************************************************/
void bdNode::send_ping(bdId *id)
{
bdToken transId;
genNewTransId(&transId);
msgout_ping(id, &transId);
}
void bdNode::send_query(bdId *id, bdNodeId *targetNodeId, bool localnet)
{
/* push out query */
bdToken transId;
genNewTransId(&transId);
msgout_find_node(id, &transId, targetNodeId, localnet);
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::send_query() Find Node Req for : ";
mFns->bdPrintId(std::cerr, &id);
std::cerr << " searching for : ";
mFns->bdPrintNodeId(std::cerr, &targetNodeId);
std::cerr << std::endl;
#endif
}
void bdNode::send_connect_msg(bdId *id, int msgtype, bdId *srcAddr, bdId *destAddr, int mode, int param, int status)
{
/* push out query */
bdToken transId;
genNewTransId(&transId);
msgout_connect_genmsg(id, &transId, msgtype, srcAddr, destAddr, mode, param, status);
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::send_connect_msg() to: ";
mFns->bdPrintId(std::cerr, &id);
std::cerr << std::endl;
#endif
}
void bdNode::checkPotentialPeer(bdId *id, bdId *src)
{
/* Check BadPeer Filters for Potential Peers too */
/* first check the filters */
if (!mFilterPeers.addrOkay(&(id->addr)))
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::checkPotentialPeer(";
mFns->bdPrintId(std::cerr, id);
std::cerr << ") BAD ADDRESS!!!! SHOULD DISCARD POTENTIAL PEER";
std::cerr << std::endl;
#endif
return;
}
/* is it masquarading? */
bdFriendEntry entry;
if (mFriendList.findPeerEntry(&(id->id), entry))
{
struct sockaddr_in knownAddr;
if (entry.addrKnown(&knownAddr))
{
if (knownAddr.sin_addr.s_addr != id->addr.sin_addr.s_addr)
{
#ifndef DISABLE_BAD_PEER_FILTER
std::cerr << "bdNode::checkPotentialPeer(";
mFns->bdPrintId(std::cerr, id);
std::cerr << ") MASQARADING AS KNOWN PEER - FLAGGING AS BAD";
std::cerr << std::endl;
// Stores in queue for later callback and desemination around the network.
mBadPeerQueue.queuePeer(id, 0);
mFilterPeers.addPeerToFilter(id->addr, 0);
std::list<struct sockaddr_in> filteredIPs;
mFilterPeers.filteredIPs(filteredIPs);
mStore.filterIpList(filteredIPs);
return;
#endif
}
}
}
bool isWorthyPeer = mQueryMgr->checkPotentialPeer(id, src);
if (isWorthyPeer)
{
addPotentialPeer(id, src);
}
if (src) // src can be NULL!
{
mConnMgr->addPotentialConnectionProxy(src, id); // CAUTION: Order switched!
}
}
void bdNode::addPotentialPeer(bdId *id, bdId * /*src*/)
{
mPotentialPeers.push_back(*id);
}
// virtual so manager can do callback.
// peer flags defined in bdiface.h
void bdNode::addPeer(const bdId *id, uint32_t peerflags)
{
#ifdef DEBUG_NODE_ACTIONS
fprintf(stderr, "bdNode::addPeer(");
mFns->bdPrintId(std::cerr, id);
fprintf(stderr, ")\n");
#endif
/* first check the filters */
if (mFilterPeers.checkPeer(id, peerflags))
{
std::cerr << "bdNode::addPeer(";
mFns->bdPrintId(std::cerr, id);
std::cerr << ", " << std::hex << peerflags << std::dec;
std::cerr << ") FAILED the BAD PEER FILTER!!!! DISCARDING MSG";
std::cerr << std::endl;
std::list<struct sockaddr_in> filteredIPs;
mFilterPeers.filteredIPs(filteredIPs);
mStore.filterIpList(filteredIPs);
mBadPeerQueue.queuePeer(id, peerflags);
return;
}
/* next we check if it is a friend, whitelist etc, and adjust flags */
bdFriendEntry entry;
if (mFriendList.findPeerEntry(&(id->id), entry))
{
/* found! */
peerflags |= entry.getPeerFlags(); // translates internal into general ones.
struct sockaddr_in knownAddr;
if (entry.addrKnown(&knownAddr))
{
if (knownAddr.sin_addr.s_addr != id->addr.sin_addr.s_addr)
{
#ifndef DISABLE_BAD_PEER_FILTER
std::cerr << "bdNode::addPeer(";
mFns->bdPrintId(std::cerr, id);
std::cerr << ", " << std::hex << peerflags << std::dec;
std::cerr << ") MASQARADING AS KNOWN PEER - FLAGGING AS BAD";
std::cerr << std::endl;
// Stores in queue for later callback and desemination around the network.
mBadPeerQueue.queuePeer(id, peerflags);
mFilterPeers.addPeerToFilter(id->addr, peerflags);
std::list<struct sockaddr_in> filteredIPs;
mFilterPeers.filteredIPs(filteredIPs);
mStore.filterIpList(filteredIPs);
// DO WE EXPLICITLY NEED TO DO THIS, OR WILL THEY JUST BE DROPPED?
//mNodeSpace.remove_badpeer(id);
//mQueryMgr->remove_badpeer(id);
// FLAG in NodeSpace (Should be dropped very quickly anyway)
mNodeSpace.flagpeer(id, 0, BITDHT_PEER_EXFLAG_BADPEER);
return;
#endif
}
}
}
mQueryMgr->addPeer(id, peerflags);
mNodeSpace.add_peer(id, peerflags);
bdPeer peer;
peer.mPeerId = *id;
peer.mPeerFlags = peerflags;
peer.mLastRecvTime = time(NULL);
mStore.addStore(&peer);
// Finally we pass to connections for them to use.
mConnMgr->updatePotentialConnectionProxy(id, peerflags);
//#define DISPLAY_BITDHTNODES 1
#ifdef DISPLAY_BITDHTNODES
/* TEMP to extract IDS for BloomFilter */
if (peerflags & BITDHT_PEER_STATUS_DHT_ENGINE)
{
std::cerr << "bdNode::addPeer() FOUND BITDHT PEER";
std::cerr << std::endl;
mFns->bdPrintNodeId(std::cerr, &(id->id));
std::cerr << std::endl;
}
#endif
}
/************************************ Process Remote Query *************************/
/* increased the allowed processing rate from 1/sec => 5/sec */
void bdNode::processRemoteQuery()
{
int nProcessed = 0;
time_t oldTS = time(NULL) - BITDHT_MAX_REMOTE_QUERY_AGE;
while(nProcessed < MAX_REMOTE_PROCESS_PER_CYCLE)
{
/* extra exit clause */
if (mRemoteQueries.size() < 1)
{
#ifdef USE_HISTORY
if (nProcessed)
{
mQueryHistory.cleanupOldMsgs();
}
#endif
return;
}
bdRemoteQuery &query = mRemoteQueries.front();
// filtering.
bool badPeer = false;
#ifdef USE_HISTORY
// store result in badPeer to activate the filtering.
mQueryHistory.addIncomingQuery(query.mQueryTS, &(query.mId), &(query.mQuery));
#endif
/* discard older ones (stops queue getting overloaded) */
if ((query.mQueryTS > oldTS) && (!badPeer))
{
/* recent enough to process! */
nProcessed++;
switch(query.mQueryType)
{
case BD_QUERY_NEIGHBOURS:
case BD_QUERY_LOCALNET:
{
/* search bdSpace for neighbours */
std::list<bdId> nearList;
std::multimap<bdMetric, bdId> nearest;
std::multimap<bdMetric, bdId>::iterator it;
if (query.mQueryType == BD_QUERY_LOCALNET)
{
std::list<bdId> excludeList;
mNodeSpace.find_nearest_nodes_with_flags(&(query.mQuery),
BITDHT_QUERY_NEIGHBOUR_PEERS,
excludeList, nearest, BITDHT_PEER_STATUS_DHT_APPL);
}
else
{
if (mRelayMode == BITDHT_RELAYS_SERVER)
{
std::list<bdId> excludeList;
mNodeSpace.find_nearest_nodes_with_flags(&(query.mQuery),
BITDHT_QUERY_NEIGHBOUR_PEERS,
excludeList, nearest, BITDHT_PEER_STATUS_DHT_RELAY_SERVER);
}
else
{
mNodeSpace.find_nearest_nodes(&(query.mQuery), BITDHT_QUERY_NEIGHBOUR_PEERS, nearest);
}
}
for(it = nearest.begin(); it != nearest.end(); it++)
{
nearList.push_back(it->second);
}
msgout_reply_find_node(&(query.mId), &(query.mTransId), nearList);
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::processRemoteQuery() Reply to Find Node: ";
mFns->bdPrintId(std::cerr, &(query.mId));
std::cerr << " searching for : ";
mFns->bdPrintNodeId(std::cerr, &(query.mQuery));
std::cerr << ", found " << nearest.size() << " nodes ";
std::cerr << std::endl;
#endif
break;
}
case BD_QUERY_HASH:
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::processRemoteQuery() Reply to Query Node: ";
mFns->bdPrintId(std::cerr, &(query.mId));
std::cerr << " TODO";
std::cerr << std::endl;
#endif
/* TODO */
/* for now drop */
/* unprocess! */
nProcessed--;
break;
}
default:
{
/* drop */
/* unprocess! */
nProcessed--;
break;
}
}
}
else
{
if (badPeer)
{
std::cerr << "bdNode::processRemoteQuery() Query from BadPeer: Discarding: ";
}
#ifdef DEBUG_NODE_MSGS
else
{
std::cerr << "bdNode::processRemoteQuery() Query Too Old: Discarding: ";
}
#endif
#ifdef DEBUG_NODE_MSGS
mFns->bdPrintId(std::cerr, &(query.mId));
std::cerr << std::endl;
#endif
}
mRemoteQueries.pop_front();
}
#ifdef USE_HISTORY
mQueryHistory.cleanupOldMsgs();
#endif
}
/************************************ Message Buffering ****************************/
/* interaction with outside world */
int bdNode::outgoingMsg(struct sockaddr_in *addr, char *msg, int *len)
{
if (mOutgoingMsgs.size() > 0)
{
bdNodeNetMsg *bdmsg = mOutgoingMsgs.front();
//bdmsg->print(std::cerr);
mOutgoingMsgs.pop_front();
//bdmsg->print(std::cerr);
/* truncate if necessary */
if (bdmsg->mSize < *len)
{
//std::cerr << "bdNode::outgoingMsg space(" << *len << ") msgsize(" << bdmsg->mSize << ")";
//std::cerr << std::endl;
*len = bdmsg->mSize;
}
else
{
//std::cerr << "bdNode::outgoingMsg space(" << *len << ") small - trunc from: "
//<< bdmsg->mSize;
//std::cerr << std::endl;
}
memcpy(msg, bdmsg->data, *len);
*addr = bdmsg->addr;
//bdmsg->print(std::cerr);
delete bdmsg;
return 1;
}
return 0;
}
void bdNode::incomingMsg(struct sockaddr_in *addr, char *msg, int len)
{
/* check against the filter */
if (mFilterPeers.addrOkay(addr))
{
bdNodeNetMsg *bdmsg = new bdNodeNetMsg(msg, len, addr);
mIncomingMsgs.push_back(bdmsg);
}
#ifdef DEBUG_NODE_MSGOUT
else
{
std::cerr << "bdNode::incomingMsg() Incoming Packet Filtered";
std::cerr << std::endl;
}
#endif
}
/************************************ Message Handling *****************************/
/* Outgoing Messages */
void bdNode::msgout_ping(bdId *id, bdToken *transId)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_ping() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << std::endl;
#endif
// THIS IS CRASHING HISTORY.
// LIKELY ID is not always valid!
// Either PotentialPeers or Out-Of-Date Peers.
//registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_PING);
bdId dupId(*id);
registerOutgoingMsg(&dupId, transId, BITDHT_MSG_TYPE_PING, NULL);
/* generate message, send to udp */
bdToken vid;
uint32_t vlen = BITDHT_TOKEN_MAX_LEN;
if (mDhtVersion.size() < vlen)
{
vlen = mDhtVersion.size();
}
memcpy(vid.data, mDhtVersion.c_str(), vlen);
vid.len = vlen;
/* create string */
char msg[10240];
int avail = 10240;
int blen = bitdht_create_ping_msg(transId, &(mOwnId), &vid, msg, avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_PING, true);
}
void bdNode::msgout_pong(bdId *id, bdToken *transId)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_pong() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " Version: " << version;
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << std::endl;
#endif
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_PONG, NULL);
/* generate message, send to udp */
bdToken vid;
uint32_t vlen = BITDHT_TOKEN_MAX_LEN;
if (mDhtVersion.size() < vlen)
{
vlen = mDhtVersion.size();
}
memcpy(vid.data, mDhtVersion.c_str(), vlen);
vid.len = vlen;
char msg[10240];
int avail = 10240;
int blen = bitdht_response_ping_msg(transId, &(mOwnId), &vid, msg, avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_PONG, true);
}
void bdNode::msgout_find_node(bdId *id, bdToken *transId, bdNodeId *query, bool localnet)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_find_node() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Query: ";
mFns->bdPrintNodeId(std::cerr, query);
std::cerr << std::endl;
#endif
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_FIND_NODE, query);
char msg[10240];
int avail = 10240;
int blen = bitdht_find_node_msg(transId, &(mOwnId), query, localnet, msg, avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_QUERYNODE, true);
}
void bdNode::msgout_reply_find_node(bdId *id, bdToken *transId, std::list<bdId> &peers)
{
char msg[10240];
int avail = 10240;
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_REPLY_NODE, NULL);
mAccount.incCounter(BDACCOUNT_MSG_REPLYFINDNODE, true);
int blen = bitdht_resp_node_msg(transId, &(mOwnId), peers, msg, avail-1);
sendPkt(msg, blen, id->addr);
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_reply_find_node() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Peers:";
std::list<bdId>::iterator it;
for(it = peers.begin(); it != peers.end(); it++)
{
std::cerr << " ";
mFns->bdPrintId(std::cerr, &(*it));
}
std::cerr << std::endl;
#endif
}
/*****************
* SECOND HALF
*
*****/
void bdNode::msgout_get_hash(bdId *id, bdToken *transId, bdNodeId *info_hash)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_get_hash() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " InfoHash: ";
mFns->bdPrintNodeId(std::cerr, info_hash);
std::cerr << std::endl;
#endif
char msg[10240];
int avail = 10240;
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_GET_HASH, info_hash);
int blen = bitdht_get_peers_msg(transId, &(mOwnId), info_hash, msg, avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_QUERYHASH, true);
}
void bdNode::msgout_reply_hash(bdId *id, bdToken *transId, bdToken *token, std::list<std::string> &values)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_reply_hash() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Token: ";
bdPrintToken(std::cerr, token);
std::cerr << " Peers: ";
std::list<std::string>::iterator it;
for(it = values.begin(); it != values.end(); it++)
{
std::cerr << " ";
bdPrintCompactPeerId(std::cerr, *it);
}
std::cerr << std::endl;
#endif
char msg[10240];
int avail = 10240;
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_REPLY_HASH, NULL);
int blen = bitdht_peers_reply_hash_msg(transId, &(mOwnId), token, values, msg, avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_REPLYQUERYHASH, true);
}
void bdNode::msgout_reply_nearest(bdId *id, bdToken *transId, bdToken *token, std::list<bdId> &nodes)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_reply_nearest() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Token: ";
bdPrintToken(std::cerr, token);
std::cerr << " Nodes:";
std::list<bdId>::iterator it;
for(it = nodes.begin(); it != nodes.end(); it++)
{
std::cerr << " ";
mFns->bdPrintId(std::cerr, &(*it));
}
std::cerr << std::endl;
#endif
char msg[10240];
int avail = 10240;
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_REPLY_NEAR, NULL);
int blen = bitdht_peers_reply_closest_msg(transId, &(mOwnId), token, nodes, msg, avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_REPLYQUERYHASH, true);
}
void bdNode::msgout_post_hash(bdId *id, bdToken *transId, bdNodeId *info_hash, uint32_t port, bdToken *token)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_post_hash() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Info_Hash: ";
mFns->bdPrintNodeId(std::cerr, info_hash);
std::cerr << " Port: " << port;
std::cerr << " Token: ";
bdPrintToken(std::cerr, token);
std::cerr << std::endl;
#endif
char msg[10240];
int avail = 10240;
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_POST_HASH, info_hash);
int blen = bitdht_announce_peers_msg(transId,&(mOwnId),info_hash,port,token,msg,avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_POSTHASH, true);
}
void bdNode::msgout_reply_post(bdId *id, bdToken *transId)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdNode::msgout_reply_post() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << std::endl;
#endif
/* generate message, send to udp */
char msg[10240];
int avail = 10240;
registerOutgoingMsg(id, transId, BITDHT_MSG_TYPE_REPLY_POST, NULL);
int blen = bitdht_reply_announce_msg(transId, &(mOwnId), msg, avail-1);
sendPkt(msg, blen, id->addr);
mAccount.incCounter(BDACCOUNT_MSG_REPLYPOSTHASH, true);
}
void bdNode::sendPkt(char *msg, int len, struct sockaddr_in addr)
{
//fprintf(stderr, "bdNode::sendPkt(%d) to %s:%d\n",
// len, inet_ntoa(addr.sin_addr), htons(addr.sin_port));
/* filter outgoing packets */
if (mFilterPeers.addrOkay(&addr))
{
bdNodeNetMsg *bdmsg = new bdNodeNetMsg(msg, len, &addr);
//bdmsg->print(std::cerr);
mOutgoingMsgs.push_back(bdmsg);
//bdmsg->print(std::cerr);
}
else
{
std::cerr << "bdNode::sendPkt() Outgoing Packet Filtered";
std::cerr << std::endl;
}
return;
}
/********************* Incoming Messages *************************/
/*
* These functions are holding up udp queue -> so quickly
* parse message, and get on with it!
*/
void bdNode::recvPkt(char *msg, int len, struct sockaddr_in addr)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() msg[" << len << "] = ";
for(int i = 0; i < len; i++)
{
if ((msg[i] > 31) && (msg[i] < 127))
{
std::cerr << msg[i];
}
else
{
std::cerr << "[" << (int) msg[i] << "]";
}
}
std::cerr << std::endl;
#endif
/* convert to a be_node */
be_node *node = be_decoden(msg, len);
if (!node)
{
/* invalid decode */
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Failure to decode. Dropping Msg";
std::cerr << std::endl;
std::cerr << "message length: " << len;
std::cerr << std::endl;
std::cerr << "msg[] = ";
for(int i = 0; i < len; i++)
{
if ((msg[i] > 31) && (msg[i] < 127))
{
std::cerr << msg[i];
}
else
{
std::cerr << "[" << (int) msg[i] << "]";
}
}
std::cerr << std::endl;
#endif
return;
}
/* find message type */
uint32_t beType = beMsgType(node);
bool beQuery = (BE_Y_Q == beMsgGetY(node));
if (!beType)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Invalid Message Type. Dropping Msg";
std::cerr << std::endl;
#endif
/* invalid message */
be_free(node);
return;
}
/************************* handle token (all) **************************/
be_node *be_transId = beMsgGetDictNode(node, "t");
bdToken transId;
if (be_transId)
{
beMsgGetToken(be_transId, transId);
}
else
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() TransId Failure. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/************************* handle data (all) **************************/
/* extract common features */
char dictkey[2] = "r";
if (beQuery)
{
dictkey[0] = 'a';
}
be_node *be_data = beMsgGetDictNode(node, dictkey);
if (!be_data)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Missing Data Body. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/************************** handle id (all) ***************************/
be_node *be_id = beMsgGetDictNode(be_data, "id");
bdNodeId id;
if (be_id)
{
beMsgGetNodeId(be_id, id);
}
else
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Missing Peer Id. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/************************ handle version (optional:pong) **************/
be_node *be_version = NULL;
bdToken versionId;
if ((beType == BITDHT_MSG_TYPE_PONG) || (beType == BITDHT_MSG_TYPE_PING))
{
be_version = beMsgGetDictNode(node, "v");
if (!be_version)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() NOTE: PONG missing Optional Version.";
std::cerr << std::endl;
#endif
}
}
if (be_version)
{
beMsgGetToken(be_version, versionId);
}
/************************ handle options (optional:bitdht extension) **************/
be_node *be_options = beMsgGetDictNode(node, "o");
bool localnet = false;
if (be_options)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Found Options Node, localnet";
std::cerr << std::endl;
#endif
localnet = true;
}
/*********** handle target (query) or info_hash (get_hash) ************/
bdNodeId target_info_hash;
be_node *be_target = NULL;
if (beType == BITDHT_MSG_TYPE_FIND_NODE)
{
be_target = beMsgGetDictNode(be_data, "target");
if (!be_target)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Missing Target / Info_Hash. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
}
else if ((beType == BITDHT_MSG_TYPE_GET_HASH) ||
(beType == BITDHT_MSG_TYPE_POST_HASH))
{
be_target = beMsgGetDictNode(be_data, "info_hash");
if (!be_target)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Missing Target / Info_Hash. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
}
if (be_target)
{
beMsgGetNodeId(be_target, target_info_hash);
}
/*********** handle nodes (reply_query or reply_near) *****************/
std::list<bdId> nodes;
be_node *be_nodes = NULL;
if ((beType == BITDHT_MSG_TYPE_REPLY_NODE) ||
(beType == BITDHT_MSG_TYPE_REPLY_NEAR))
{
be_nodes = beMsgGetDictNode(be_data, "nodes");
if (!be_nodes)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Missing Nodes. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
}
if (be_nodes)
{
beMsgGetListBdIds(be_nodes, nodes);
}
/******************* handle values (reply_hash) ***********************/
std::list<std::string> values;
be_node *be_values = NULL;
if (beType == BITDHT_MSG_TYPE_REPLY_HASH)
{
be_values = beMsgGetDictNode(be_data, "values");
if (!be_values)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Missing Values. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
}
if (be_values)
{
beMsgGetListStrings(be_values, values);
}
/************ handle token (reply_hash, reply_near, post hash) ********/
bdToken token;
be_node *be_token = NULL;
if ((beType == BITDHT_MSG_TYPE_REPLY_HASH) ||
(beType == BITDHT_MSG_TYPE_REPLY_NEAR) ||
(beType == BITDHT_MSG_TYPE_POST_HASH))
{
be_token = beMsgGetDictNode(be_data, "token");
if (!be_token)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() Missing Token. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
}
if (be_token)
{
beMsgGetToken(be_transId, transId);
}
/****************** handle port (post hash) ***************************/
uint32_t port;
be_node *be_port = NULL;
if (beType == BITDHT_MSG_TYPE_POST_HASH)
{
be_port = beMsgGetDictNode(be_data, "port");
if (!be_port)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() POST_HASH Missing Port. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
}
if (be_port)
{
beMsgGetUInt32(be_port, &port);
}
/****************** handle Connect (lots) ***************************/
bdId connSrcAddr;
bdId connDestAddr;
uint32_t connMode;
uint32_t connParam = 0;
uint32_t connStatus;
uint32_t connType;
be_node *be_ConnSrcAddr = NULL;
be_node *be_ConnDestAddr = NULL;
be_node *be_ConnMode = NULL;
be_node *be_ConnParam = NULL;
be_node *be_ConnStatus = NULL;
be_node *be_ConnType = NULL;
if (beType == BITDHT_MSG_TYPE_CONNECT)
{
/* SrcAddr */
be_ConnSrcAddr = beMsgGetDictNode(be_data, "src");
if (!be_ConnSrcAddr)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() CONNECT Missing SrcAddr. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/* DestAddr */
be_ConnDestAddr = beMsgGetDictNode(be_data, "dest");
if (!be_ConnDestAddr)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() CONNECT Missing DestAddr. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/* Mode */
be_ConnMode = beMsgGetDictNode(be_data, "mode");
if (!be_ConnMode)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() CONNECT Missing Mode. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/* Param */
be_ConnParam = beMsgGetDictNode(be_data, "param");
if (!be_ConnParam)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() CONNECT Missing Param. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/* Status */
be_ConnStatus = beMsgGetDictNode(be_data, "status");
if (!be_ConnStatus)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() CONNECT Missing Status. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
/* Type */
be_ConnType = beMsgGetDictNode(be_data, "type");
if (!be_ConnType)
{
#ifdef DEBUG_NODE_PARSE
std::cerr << "bdNode::recvPkt() CONNECT Missing Type. Dropping Msg";
std::cerr << std::endl;
#endif
be_free(node);
return;
}
}
if (be_ConnSrcAddr)
{
beMsgGetBdId(be_ConnSrcAddr, connSrcAddr);
}
if (be_ConnDestAddr)
{
beMsgGetBdId(be_ConnDestAddr, connDestAddr);
}
if (be_ConnMode)
{
beMsgGetUInt32(be_ConnMode, &connMode);
}
if (be_ConnParam)
{
beMsgGetUInt32(be_ConnParam, &connParam);
}
if (be_ConnStatus)
{
beMsgGetUInt32(be_ConnStatus, &connStatus);
}
if (be_ConnType)
{
beMsgGetUInt32(be_ConnType, &connType);
}
/****************** Bits Parsed Ok. Process Msg ***********************/
/* Construct Source Id */
bdId srcId(id, addr);
if (be_target)
{
registerIncomingMsg(&srcId, &transId, beType, &target_info_hash);
}
else
{
registerIncomingMsg(&srcId, &transId, beType, NULL);
}
switch(beType)
{
case BITDHT_MSG_TYPE_PING: /* a: id, transId */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Responding to Ping : ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << std::endl;
#endif
if (be_version)
{
msgin_ping(&srcId, &transId, &versionId);
}
else
{
msgin_ping(&srcId, &transId, NULL);
}
break;
}
case BITDHT_MSG_TYPE_PONG: /* r: id, transId */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Received Pong from : ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << std::endl;
#endif
if (be_version)
{
msgin_pong(&srcId, &transId, &versionId);
}
else
{
msgin_pong(&srcId, &transId, NULL);
}
break;
}
case BITDHT_MSG_TYPE_FIND_NODE: /* a: id, transId, target */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Req Find Node from : ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << " Looking for: ";
mFns->bdPrintNodeId(std::cerr, &target_info_hash);
std::cerr << std::endl;
#endif
msgin_find_node(&srcId, &transId, &target_info_hash, localnet);
break;
}
case BITDHT_MSG_TYPE_REPLY_NODE: /* r: id, transId, nodes */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Received Reply Node from : ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << std::endl;
#endif
msgin_reply_find_node(&srcId, &transId, nodes);
break;
}
case BITDHT_MSG_TYPE_GET_HASH: /* a: id, transId, info_hash */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Received SearchHash : ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << " for Hash: ";
mFns->bdPrintNodeId(std::cerr, &target_info_hash);
std::cerr << std::endl;
#endif
msgin_get_hash(&srcId, &transId, &target_info_hash);
break;
}
case BITDHT_MSG_TYPE_REPLY_HASH: /* r: id, transId, token, values */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Received Reply Hash : ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << std::endl;
#endif
msgin_reply_hash(&srcId, &transId, &token, values);
break;
}
case BITDHT_MSG_TYPE_REPLY_NEAR: /* r: id, transId, token, nodes */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Received Reply Near : ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << std::endl;
#endif
msgin_reply_nearest(&srcId, &transId, &token, nodes);
break;
}
case BITDHT_MSG_TYPE_POST_HASH: /* a: id, transId, info_hash, port, token */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Post Hash from : ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << " to post: ";
mFns->bdPrintNodeId(std::cerr, &target_info_hash);
std::cerr << " with port: " << port;
std::cerr << std::endl;
#endif
msgin_post_hash(&srcId, &transId, &target_info_hash, port, &token);
break;
}
case BITDHT_MSG_TYPE_REPLY_POST: /* r: id, transId */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() Reply Post from: ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << std::endl;
#endif
msgin_reply_post(&srcId, &transId);
break;
}
case BITDHT_MSG_TYPE_CONNECT: /* a: id, src, dest, mode, status, type */
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() ConnectMsg from: ";
mFns->bdPrintId(std::cerr, &srcId);
std::cerr << std::endl;
#endif
msgin_connect_genmsg(&srcId, &transId, connType,
&connSrcAddr, &connDestAddr,
connMode, connParam, connStatus);
break;
}
default:
{
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::recvPkt() ERROR";
std::cerr << std::endl;
#endif
/* ERROR */
break;
}
}
be_free(node);
return;
}
/* Input: id, token.
* Response: pong(id, token)
*/
void bdNode::msgin_ping(bdId *id, bdToken *transId, bdToken *versionId)
{
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_ping() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << std::endl;
#endif
mAccount.incCounter(BDACCOUNT_MSG_PING, false);
/* peer is alive */
uint32_t peerflags = BITDHT_PEER_STATUS_RECV_PING; /* no id typically, so cant get version */
peerflags |= parseVersion(versionId);
addPeer(id, peerflags);
/* reply */
msgout_pong(id, transId);
}
/* Input: id, token, (+optional version)
* Response: store peer.
*/
void bdNode::msgin_pong(bdId *id, bdToken *transId, bdToken *versionId)
{
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_pong() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " Version: TODO!"; // << version;
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << std::endl;
#else
(void) transId;
#endif
uint32_t peerflags = BITDHT_PEER_STATUS_RECV_PONG;
peerflags |= parseVersion(versionId);
mAccount.incCounter(BDACCOUNT_MSG_PONG, false);
addPeer(id, peerflags);
}
uint32_t bdNode::parseVersion(bdToken *versionId)
{
/* recv pong, and peer is alive. add to DHT */
//uint32_t vId = 0; // TODO XXX convertBdVersionToVID(versionId);
/* calculate version match with peer */
bool sameDhtEngine = false;
bool sameDhtVersion = false;
bool sameAppl = false;
bool sameApplVersion = false;
if (versionId)
{
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::parseVersion() Peer Version: ";
for(int i = 0; i < versionId->len; i++)
{
std::cerr << versionId->data[i];
}
std::cerr << std::endl;
#endif
#ifdef USE_HISTORY
std::string version;
for(int i = 0; i < versionId->len; i++)
{
if (isalnum(versionId->data[i]))
{
version += versionId->data[i];
}
else
{
version += 'X';
}
}
mHistory.setPeerType(id, version);
#endif
/* check two bytes */
if ((versionId->len >= 2) && (mDhtVersion.size() >= 2) &&
(versionId->data[0] == mDhtVersion[0]) && (versionId->data[1] == mDhtVersion[1]))
{
sameDhtEngine = true;
}
/* check two bytes.
* Due to Old Versions not having this field, we need to check that they are numbers.
* We have a Major version, and minor version....
* This flag is set if Major is same, and minor is greater or equal to our version.
*/
if ((versionId->len >= 4) && (mDhtVersion.size() >= 4))
{
if ((isdigit(versionId->data[2]) && isdigit(versionId->data[3])) &&
(versionId->data[2] == mDhtVersion[2]) && (versionId->data[3] >= mDhtVersion[3]))
{
sameDhtVersion = true;
}
}
if ((sameDhtVersion) && (!sameDhtEngine))
{
sameDhtVersion = false;
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::parseVersion() STRANGE Peer Version: ";
for(uint32_t i = 0; i < versionId->len; i++)
{
std::cerr << versionId->data[i];
}
std::cerr << std::endl;
#endif
}
/* check two bytes */
if ((versionId->len >= 6) && (mDhtVersion.size() >= 6) &&
(versionId->data[4] == mDhtVersion[4]) && (versionId->data[5] == mDhtVersion[5]))
{
sameAppl = true;
}
/* check two bytes */
if ((versionId->len >= 8) && (mDhtVersion.size() >= 8))
{
if ((isdigit(versionId->data[6]) && isdigit(versionId->data[7])) &&
(versionId->data[6] == mDhtVersion[6]) && (versionId->data[7] >= mDhtVersion[7]))
{
sameApplVersion = true;
}
}
}
else
{
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::parseVersion() No Version";
std::cerr << std::endl;
#endif
}
uint32_t peerflags = BITDHT_PEER_STATUS_RECV_PONG; /* should have id too */
if (sameDhtEngine)
{
peerflags |= BITDHT_PEER_STATUS_DHT_ENGINE;
}
if (sameDhtVersion)
{
peerflags |= BITDHT_PEER_STATUS_DHT_ENGINE_VERSION;
}
if (sameAppl)
{
peerflags |= BITDHT_PEER_STATUS_DHT_APPL;
}
if (sameApplVersion)
{
peerflags |= BITDHT_PEER_STATUS_DHT_APPL_VERSION;
}
return peerflags;
}
/* Input: id, token, queryId */
void bdNode::msgin_find_node(bdId *id, bdToken *transId, bdNodeId *query, bool localnet)
{
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_find_node() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " From: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Query: ";
mFns->bdPrintNodeId(std::cerr, query);
std::cerr << std::endl;
#endif
mAccount.incCounter(BDACCOUNT_MSG_QUERYNODE, false);
/* store query... */
uint32_t query_type = BD_QUERY_NEIGHBOURS;
if (localnet)
{
query_type = BD_QUERY_LOCALNET;
}
queueQuery(id, query, transId, query_type);
uint32_t peerflags = 0; /* no id, and no help! */
addPeer(id, peerflags);
}
void bdNode::msgin_reply_find_node(bdId *id, bdToken *transId, std::list<bdId> &nodes)
{
std::list<bdId>::iterator it;
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::msgin_reply_find_node() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " From: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Peers:";
for(it = nodes.begin(); it != nodes.end(); it++)
{
std::cerr << " ";
mFns->bdPrintId(std::cerr, &(*it));
}
std::cerr << std::endl;
#else
(void) transId;
#endif
mAccount.incCounter(BDACCOUNT_MSG_REPLYFINDNODE, false);
/* add neighbours to the potential list */
for(it = nodes.begin(); it != nodes.end(); it++)
{
checkPotentialPeer(&(*it), id);
}
/* received reply - so peer must be good */
uint32_t peerflags = BITDHT_PEER_STATUS_RECV_NODES; /* no id ;( */
addPeer(id, peerflags);
}
/********* THIS IS THE SECOND STAGE
*
*/
void bdNode::msgin_get_hash(bdId *id, bdToken *transId, bdNodeId *info_hash)
{
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_get_hash() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " From: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " InfoHash: ";
mFns->bdPrintNodeId(std::cerr, info_hash);
std::cerr << std::endl;
#endif
mAccount.incCounter(BDACCOUNT_MSG_QUERYHASH, false);
/* generate message, send to udp */
queueQuery(id, info_hash, transId, BD_QUERY_HASH);
}
void bdNode::msgin_reply_hash(bdId *id, bdToken *transId, bdToken *token, std::list<std::string> &values)
{
mAccount.incCounter(BDACCOUNT_MSG_REPLYQUERYHASH, false);
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_reply_hash() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " From: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Token: ";
bdPrintToken(std::cerr, token);
std::cerr << " Peers: ";
std::list<std::string>::iterator it;
for(it = values.begin(); it != values.end(); it++)
{
std::cerr << " ";
bdPrintCompactPeerId(std::cerr, *it);
}
std::cerr << std::endl;
#else
(void) id;
(void) transId;
(void) token;
(void) values;
#endif
}
void bdNode::msgin_reply_nearest(bdId *id, bdToken *transId, bdToken *token, std::list<bdId> &nodes)
{
mAccount.incCounter(BDACCOUNT_MSG_REPLYQUERYHASH, false);
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_reply_nearest() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " From: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Token: ";
bdPrintToken(std::cerr, token);
std::cerr << " Nodes:";
std::list<bdId>::iterator it;
for(it = nodes.begin(); it != nodes.end(); it++)
{
std::cerr << " ";
mFns->bdPrintId(std::cerr, &(*it));
}
std::cerr << std::endl;
#else
(void) id;
(void) transId;
(void) token;
(void) nodes;
#endif
}
void bdNode::msgin_post_hash(bdId *id, bdToken *transId, bdNodeId *info_hash, uint32_t port, bdToken *token)
{
mAccount.incCounter(BDACCOUNT_MSG_POSTHASH, false);
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_post_hash() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " From: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " Info_Hash: ";
mFns->bdPrintNodeId(std::cerr, info_hash);
std::cerr << " Port: " << port;
std::cerr << " Token: ";
bdPrintToken(std::cerr, token);
std::cerr << std::endl;
#else
(void) id;
(void) transId;
(void) info_hash;
(void) port;
(void) token;
#endif
}
void bdNode::msgin_reply_post(bdId *id, bdToken *transId)
{
/* generate message, send to udp */
mAccount.incCounter(BDACCOUNT_MSG_REPLYPOSTHASH, false);
#ifdef DEBUG_NODE_MSGIN
std::cerr << "bdNode::msgin_reply_post() TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " From: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << std::endl;
#else
(void) id;
(void) transId;
#endif
}
/************************************************************************************************************
******************************************** Message Interface **********************************************
************************************************************************************************************/
/* Outgoing Messages */
std::string getConnectMsgType(int msgtype)
{
switch(msgtype)
{
case BITDHT_MSG_TYPE_CONNECT_REQUEST:
return "ConnectRequest";
break;
case BITDHT_MSG_TYPE_CONNECT_REPLY:
return "ConnectReply";
break;
case BITDHT_MSG_TYPE_CONNECT_START:
return "ConnectStart";
break;
case BITDHT_MSG_TYPE_CONNECT_ACK:
return "ConnectAck";
break;
default:
return "ConnectUnknown";
break;
}
}
void bdNode::msgout_connect_genmsg(bdId *id, bdToken *transId, int msgtype, bdId *srcAddr, bdId *destAddr, int mode, int param, int status)
{
#ifdef DEBUG_NODE_MSGOUT
std::cerr << "bdConnectManager::msgout_connect_genmsg() Type: " << getConnectMsgType(msgtype);
std::cerr << " TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " To: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " SrcAddr: ";
mFns->bdPrintId(std::cerr, srcAddr);
std::cerr << " DestAddr: ";
mFns->bdPrintId(std::cerr, destAddr);
std::cerr << " Mode: " << mode;
std::cerr << " Param: " << param;
std::cerr << " Status: " << status;
std::cerr << std::endl;
#endif
switch(msgtype)
{
default:
case BITDHT_MSG_TYPE_CONNECT_REQUEST:
mAccount.incCounter(BDACCOUNT_MSG_CONNECTREQUEST, true);
break;
case BITDHT_MSG_TYPE_CONNECT_REPLY:
mAccount.incCounter(BDACCOUNT_MSG_CONNECTREPLY, true);
break;
case BITDHT_MSG_TYPE_CONNECT_START:
mAccount.incCounter(BDACCOUNT_MSG_CONNECTSTART, true);
break;
case BITDHT_MSG_TYPE_CONNECT_ACK:
mAccount.incCounter(BDACCOUNT_MSG_CONNECTACK, true);
break;
}
registerOutgoingMsg(id, transId, msgtype, NULL);
/* create string */
char msg[10240];
int avail = 10240;
int blen = bitdht_connect_genmsg(transId, &(mOwnId), msgtype, srcAddr, destAddr, mode, param, status, msg, avail-1);
sendPkt(msg, blen, id->addr);
}
void bdNode::msgin_connect_genmsg(bdId *id, bdToken *transId, int msgtype,
bdId *srcAddr, bdId *destAddr, int mode, int param, int status)
{
std::list<bdId>::iterator it;
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdConnectManager::msgin_connect_genmsg() Type: " << getConnectMsgType(msgtype);
std::cerr << " TransId: ";
bdPrintTransId(std::cerr, transId);
std::cerr << " From: ";
mFns->bdPrintId(std::cerr, id);
std::cerr << " SrcAddr: ";
mFns->bdPrintId(std::cerr, srcAddr);
std::cerr << " DestAddr: ";
mFns->bdPrintId(std::cerr, destAddr);
std::cerr << " Mode: " << mode;
std::cerr << " Param: " << param;
std::cerr << " Status: " << status;
std::cerr << std::endl;
#else
(void) transId;
#endif
/* switch to actual work functions */
uint32_t peerflags = 0;
switch(msgtype)
{
case BITDHT_MSG_TYPE_CONNECT_REQUEST:
peerflags = BITDHT_PEER_STATUS_RECV_CONNECT_MSG;
mAccount.incCounter(BDACCOUNT_MSG_CONNECTREQUEST, false);
mConnMgr->recvedConnectionRequest(id, srcAddr, destAddr, mode, param);
break;
case BITDHT_MSG_TYPE_CONNECT_REPLY:
peerflags = BITDHT_PEER_STATUS_RECV_CONNECT_MSG;
mAccount.incCounter(BDACCOUNT_MSG_CONNECTREPLY, false);
mConnMgr->recvedConnectionReply(id, srcAddr, destAddr, mode, param, status);
break;
case BITDHT_MSG_TYPE_CONNECT_START:
peerflags = BITDHT_PEER_STATUS_RECV_CONNECT_MSG;
mAccount.incCounter(BDACCOUNT_MSG_CONNECTSTART, false);
mConnMgr->recvedConnectionStart(id, srcAddr, destAddr, mode, param);
break;
case BITDHT_MSG_TYPE_CONNECT_ACK:
peerflags = BITDHT_PEER_STATUS_RECV_CONNECT_MSG;
mAccount.incCounter(BDACCOUNT_MSG_CONNECTACK, false);
mConnMgr->recvedConnectionAck(id, srcAddr, destAddr, mode);
break;
default:
break;
}
/* received message - so peer must be good */
addPeer(id, peerflags);
}
/****************** Other Functions ******************/
void bdNode::genNewToken(bdToken *token)
{
#ifdef DEBUG_NODE_ACTIONS
fprintf(stderr, "bdNode::genNewToken()");
fprintf(stderr, ")\n");
#endif
// XXX is this a good way to do it?
// Variable length, from 4 chars up to lots... 10?
// leave for the moment, but fix.
std::string num;
bd_sprintf(num, "%04lx", bdRandom::random_u32());
int len = num.size();
if (len > BITDHT_TOKEN_MAX_LEN)
len = BITDHT_TOKEN_MAX_LEN;
for(int i = 0; i < len; i++)
{
token->data[i] = num[i];
}
token->len = len;
}
uint32_t transIdCounter = 0;
void bdNode::genNewTransId(bdToken *token)
{
/* generate message, send to udp */
#ifdef DEBUG_NODE_ACTIONS
fprintf(stderr, "bdNode::genNewTransId()");
fprintf(stderr, ")\n");
#endif
std::string num;
bd_sprintf(num, "%02lx", transIdCounter++);
int len = num.size();
if (len > BITDHT_TOKEN_MAX_LEN)
len = BITDHT_TOKEN_MAX_LEN;
for(int i = 0; i < len; i++)
{
token->data[i] = num[i];
}
token->len = len;
}
/* Store Remote Query for processing */
int bdNode::queueQuery(bdId *id, bdNodeId *query, bdToken *transId, uint32_t query_type)
{
#ifdef DEBUG_NODE_ACTIONS
std::cerr << "bdnode::queueQuery()" << std::endl;
#endif
mRemoteQueries.push_back(bdRemoteQuery(id, query, transId, query_type));
return 1;
}
/*************** Register Transaction Ids *************/
void bdNode::registerOutgoingMsg(bdId *id, bdToken *transId, uint32_t msgType, bdNodeId *aboutId)
{
#ifdef DEBUG_MSG_CHECKS
std::cerr << "bdNode::registerOutgoingMsg(";
mFns->bdPrintId(std::cerr, id);
std::cerr << ", " << msgType << ")";
std::cerr << std::endl;
#else
(void) id;
(void) msgType;
#endif
#ifdef USE_HISTORY
// splitting up - to see if we can isolate the crash causes.
switch(msgType)
{
// disabled types (which appear to crash it!)
case BITDHT_MSG_TYPE_PING:
if (!id)
{
return;
}
if ((id->id.data[0] == 0)
&& (id->id.data[1] == 0)
&& (id->id.data[2] == 0)
&& (id->id.data[3] == 0))
{
return;
}
break;
case BITDHT_MSG_TYPE_UNKNOWN:
case BITDHT_MSG_TYPE_PONG:
case BITDHT_MSG_TYPE_FIND_NODE:
case BITDHT_MSG_TYPE_REPLY_NODE:
case BITDHT_MSG_TYPE_GET_HASH:
case BITDHT_MSG_TYPE_REPLY_HASH:
case BITDHT_MSG_TYPE_REPLY_NEAR:
case BITDHT_MSG_TYPE_POST_HASH:
case BITDHT_MSG_TYPE_REPLY_POST:
break;
}
// This line appears to cause crashes on OSX.
mHistory.addMsg(id, transId, msgType, false, aboutId);
#else
(void) transId;
(void) aboutId;
#endif
/****
#define BITDHT_MSG_TYPE_UNKNOWN 0
#define BITDHT_MSG_TYPE_PING 1
#define BITDHT_MSG_TYPE_PONG 2
#define BITDHT_MSG_TYPE_FIND_NODE 3
#define BITDHT_MSG_TYPE_REPLY_NODE 4
#define BITDHT_MSG_TYPE_GET_HASH 5
#define BITDHT_MSG_TYPE_REPLY_HASH 6
#define BITDHT_MSG_TYPE_REPLY_NEAR 7
#define BITDHT_MSG_TYPE_POST_HASH 8
#define BITDHT_MSG_TYPE_REPLY_POST 9
***/
}
uint32_t bdNode::registerIncomingMsg(bdId *id, bdToken *transId, uint32_t msgType, bdNodeId *aboutId)
{
#ifdef DEBUG_MSG_CHECKS
std::cerr << "bdNode::registerIncomingMsg(";
mFns->bdPrintId(std::cerr, id);
std::cerr << ", " << msgType << ")";
std::cerr << std::endl;
#else
(void) id;
(void) msgType;
#endif
#ifdef USE_HISTORY
mHistory.addMsg(id, transId, msgType, true, aboutId);
#else
(void) transId;
(void) aboutId;
#endif
return 0;
}
void bdNode::cleanupTransIdRegister()
{
return;
}
/*************** Internal Msg Storage *****************/
bdNodeNetMsg::bdNodeNetMsg(char *msg, int len, struct sockaddr_in *in_addr)
:data(NULL), mSize(len), addr(*in_addr)
{
data = (char *) malloc(len);
if(data == NULL)
{
std::cerr << "(EE) " << __PRETTY_FUNCTION__ << ": ERROR. cannot allocate memory for " << len << " bytes." << std::endl;
return ;
}
memcpy(data, msg, len);
//print(std::cerr);
}
void bdNodeNetMsg::print(std::ostream &out)
{
out << "bdNodeNetMsg::print(" << mSize << ") to "
<< bdnet_inet_ntoa(addr.sin_addr) << ":" << htons(addr.sin_port);
out << std::endl;
}
bdNodeNetMsg::~bdNodeNetMsg()
{
free(data);
}
/**************** In/Out of Relay Mode ******************/
void bdNode::dropRelayServers()
{
/* We leave them there... just drop the flags */
uint32_t flags = BITDHT_PEER_STATUS_DHT_RELAY_SERVER;
std::list<bdNodeId> peerList;
std::list<bdNodeId>::iterator it;
mFriendList.findPeersWithFlags(flags, peerList);
for(it = peerList.begin(); it != peerList.end(); it++)
{
mFriendList.removePeer(&(*it));
}
mNodeSpace.clean_node_flags(flags);
}
void bdNode::pingRelayServers()
{
/* if Relay's have been switched on, do search/ping to locate servers */
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::pingRelayServers()";
std::cerr << std::endl;
#endif
bool doSearch = true;
uint32_t flags = BITDHT_PEER_STATUS_DHT_RELAY_SERVER;
std::list<bdNodeId> peerList;
std::list<bdNodeId>::iterator it;
mFriendList.findPeersWithFlags(flags, peerList);
for(it = peerList.begin(); it != peerList.end(); it++)
{
if (doSearch)
{
uint32_t qflags = BITDHT_QFLAGS_INTERNAL | BITDHT_QFLAGS_DISGUISE;
mQueryMgr->addQuery(&(*it), qflags);
#ifdef DEBUG_NODE_MSGS
std::cerr << "bdNode::pingRelayServers() Adding Internal Search for Relay Server: ";
mFns->bdPrintNodeId(std::cerr, &(*it));
std::cerr << std::endl;
#endif
}
else
{
/* try ping - if we have an address??? */
}
}
}