Merging branches/v0.5-peernet/libbitdht (Merging r4237 through r4353 into '.')

There are many significant improvements to the DHT here. 
See commit logs on v0.5-peernet branch for details.

This is not the final merge, but brings over the majority of expected v0.5-peernet/libbitdht changes 




git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@4354 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2011-06-29 10:46:11 +00:00
parent f517442989
commit fff40eceac
40 changed files with 6843 additions and 746 deletions

View file

@ -48,6 +48,9 @@
//#define DEBUG_UDP_BITDHT 1
#define BITDHT_VERSION_IDENTIFER 1
//#define BITDHT_VERSION "01" // Original RS 0.5.0/0.5.1 version.
#define BITDHT_VERSION "02" // Connections + Full DHT implementation.
/*************************************/
UdpBitDht::UdpBitDht(UdpPublisher *pub, bdNodeId *id, std::string appVersion, std::string bootstrapfile, bdDhtFunctions *fns)
@ -57,6 +60,7 @@ UdpBitDht::UdpBitDht(UdpPublisher *pub, bdNodeId *id, std::string appVersion, st
#ifdef BITDHT_VERSION_IDENTIFER
usedVersion = "BD";
usedVersion += BITDHT_VERSION;
#endif
usedVersion += appVersion;
@ -118,6 +122,30 @@ void UdpBitDht::removeCallback(BitDhtCallback *cb)
mBitDhtManager->removeCallback(cb);
}
void UdpBitDht::ConnectionRequest(struct sockaddr_in *laddr, bdNodeId *target, uint32_t mode, uint32_t start)
{
bdStackMutex stack(dhtMtx); /********** MUTEX LOCKED *************/
mBitDhtManager->ConnectionRequest(laddr, target, mode, start);
}
void UdpBitDht::ConnectionAuth(bdId *srcId, bdId *proxyId, bdId *destId, uint32_t mode, uint32_t loc, uint32_t answer)
{
bdStackMutex stack(dhtMtx); /********** MUTEX LOCKED *************/
mBitDhtManager->ConnectionAuth(srcId, proxyId, destId, mode, loc, answer);
}
void UdpBitDht::ConnectionOptions(uint32_t allowedModes, uint32_t flags)
{
bdStackMutex stack(dhtMtx); /********** MUTEX LOCKED *************/
mBitDhtManager->ConnectionOptions(allowedModes, flags);
}
int UdpBitDht::getDhtPeerAddress(const bdNodeId *id, struct sockaddr_in &from)
{
bdStackMutex stack(dhtMtx); /********** MUTEX LOCKED *************/
@ -132,6 +160,30 @@ int UdpBitDht::getDhtValue(const bdNodeId *id, std::string key, std::string &va
return mBitDhtManager->getDhtValue(id, key, value);
}
int UdpBitDht::getDhtBucket(const int idx, bdBucket &bucket)
{
bdStackMutex stack(dhtMtx); /********** MUTEX LOCKED *************/
return mBitDhtManager->getDhtBucket(idx, bucket);
}
int UdpBitDht::getDhtQueries(std::map<bdNodeId, bdQueryStatus> &queries)
{
bdStackMutex stack(dhtMtx); /********** MUTEX LOCKED *************/
return mBitDhtManager->getDhtQueries(queries);
}
int UdpBitDht::getDhtQueryStatus(const bdNodeId *id, bdQuerySummary &query)
{
bdStackMutex stack(dhtMtx); /********** MUTEX LOCKED *************/
return mBitDhtManager->getDhtQueryStatus(id, query);
}
/* stats and Dht state */
int UdpBitDht:: startDht()

View file

@ -67,9 +67,18 @@ virtual void findDhtValue(bdNodeId *id, std::string key, uint32_t mode);
virtual void addCallback(BitDhtCallback *cb);
virtual void removeCallback(BitDhtCallback *cb);
/***** Connections Requests *****/
virtual void ConnectionRequest(struct sockaddr_in *laddr, bdNodeId *target, uint32_t mode, uint32_t start);
virtual void ConnectionAuth(bdId *srcId, bdId *proxyId, bdId *destId, uint32_t mode, uint32_t loc, uint32_t answer);
virtual void ConnectionOptions(uint32_t allowedModes, uint32_t flags);
/***** Get Results Details *****/
virtual int getDhtPeerAddress(const bdNodeId *id, struct sockaddr_in &from);
virtual int getDhtValue(const bdNodeId *id, std::string key, std::string &value);
virtual int getDhtBucket(const int idx, bdBucket &bucket);
virtual int getDhtQueries(std::map<bdNodeId, bdQueryStatus> &queries);
virtual int getDhtQueryStatus(const bdNodeId *id, bdQuerySummary &query);
/* stats and Dht state */
virtual int startDht();

View file

@ -24,6 +24,7 @@
*/
#include "udp/udplayer.h"
#include "util/bdrandom.h"
#include <iostream>
#include <sstream>
@ -310,7 +311,7 @@ void UdpLayer::recv_loop()
}
int UdpLayer::sendPkt(const void *data, int size, sockaddr_in &to, int ttl)
int UdpLayer::sendPkt(const void *data, int size, const sockaddr_in &to, int ttl)
{
/* if ttl is different -> set it */
if (ttl != getTTL())
@ -492,7 +493,7 @@ int UdpLayer::receiveUdpPacket(void *data, int *size, struct sockaddr_in &from)
return -1;
}
int UdpLayer::sendUdpPacket(const void *data, int size, struct sockaddr_in &to)
int UdpLayer::sendUdpPacket(const void *data, int size, const struct sockaddr_in &to)
{
/* send out */
#ifdef DEBUG_UDP_LAYER
@ -525,28 +526,26 @@ LossyUdpLayer::~LossyUdpLayer() { return; }
int LossyUdpLayer::receiveUdpPacket(void *data, int *size, struct sockaddr_in &from)
{
double prob = (1.0 * (rand() / (RAND_MAX + 1.0)));
if (prob < lossFraction)
if (0 < UdpLayer::receiveUdpPacket(data, size, from))
{
/* but discard */
if (0 < UdpLayer::receiveUdpPacket(data, size, from))
float prob = bdRandom::random_f32();
if (prob < lossFraction)
{
/* discard */
std::cerr << "LossyUdpLayer::receiveUdpPacket() Dropping packet!";
std::cerr << std::endl;
std::cerr << printPkt(data, *size);
std::cerr << std::endl;
std::cerr << "LossyUdpLayer::receiveUdpPacket() Packet Dropped!";
std::cerr << std::endl;
size = 0;
return -1;
}
size = 0;
return -1;
return *size;
}
// otherwise read normally;
return UdpLayer::receiveUdpPacket(data, size, from);
return -1;
}
int LossyUdpLayer::sendUdpPacket(const void *data, int size, struct sockaddr_in &to)
@ -571,3 +570,99 @@ int LossyUdpLayer::sendUdpPacket(const void *data, int size, struct sockaddr_in
return UdpLayer::sendUdpPacket(data, size, to);
}
/**************************** LossyUdpLayer - for Testing **************/
PortRange::PortRange() :lport(0), uport(0) { return; }
PortRange::PortRange(uint16_t lp, uint16_t up) :lport(lp), uport(up) { return; }
bool PortRange::inRange(uint16_t port)
{
if (port < lport)
{
return false;
}
if (port > uport)
{
return false;
}
return true;
}
RestrictedUdpLayer::RestrictedUdpLayer(UdpReceiver *udpr,
struct sockaddr_in &local)
:UdpLayer(udpr, local)
{
return;
}
RestrictedUdpLayer::~RestrictedUdpLayer() { return; }
void RestrictedUdpLayer::addRestrictedPortRange(int lp, int up)
{
PortRange pr(lp, up);
mLostPorts.push_back(pr);
}
int RestrictedUdpLayer::receiveUdpPacket(void *data, int *size, struct sockaddr_in &from)
{
if (0 < UdpLayer::receiveUdpPacket(data, size, from))
{
/* check the port against list */
uint16_t inPort = ntohs(from.sin_port);
std::list<PortRange>::iterator it;
for(it = mLostPorts.begin(); it != mLostPorts.end(); it++)
{
if (it->inRange(inPort))
{
#ifdef DEBUG_UDP_LAYER
std::cerr << "RestrictedUdpLayer::receiveUdpPacket() Dropping packet";
std::cerr << ", Port(" << inPort << ") in restricted range!";
std::cerr << std::endl;
//std::cerr << printPkt(data, *size);
//std::cerr << std::endl;
#endif
size = 0;
return -1;
}
}
/* acceptable port */
return *size;
}
return -1;
}
int RestrictedUdpLayer::sendUdpPacket(const void *data, int size, struct sockaddr_in &to)
{
/* check the port against list */
uint16_t outPort = ntohs(to.sin_port);
std::list<PortRange>::iterator it;
for(it = mLostPorts.begin(); it != mLostPorts.end(); it++)
{
if (it->inRange(outPort))
{
/* drop */
#ifdef DEBUG_UDP_LAYER
std::cerr << "RestrictedUdpLayer::sendUdpPacket() Dropping packet";
std::cerr << ", Port(" << outPort << ") in restricted range!";
std::cerr << std::endl;
//std::cerr << printPkt(data, *size);
//std::cerr << std::endl;
#endif
return size;
}
}
// otherwise read normally;
return UdpLayer::sendUdpPacket(data, size, to);
}

View file

@ -60,7 +60,7 @@ class UdpPublisher
{
public:
virtual ~UdpPublisher() {}
virtual int sendPkt(const void *data, int size, struct sockaddr_in &to, int ttl) = 0;
virtual int sendPkt(const void *data, int size, const struct sockaddr_in &to, int ttl) = 0;
};
@ -86,7 +86,7 @@ void recv_loop(); /* uses callback to UdpReceiver */
/* Higher Level Interface */
//int readPkt(void *data, int *size, struct sockaddr_in &from);
int sendPkt(const void *data, int size, struct sockaddr_in &to, int ttl);
int sendPkt(const void *data, int size, const struct sockaddr_in &to, int ttl);
/* monitoring / updates */
int okay();
@ -98,7 +98,7 @@ void recv_loop(); /* uses callback to UdpReceiver */
protected:
virtual int receiveUdpPacket(void *data, int *size, struct sockaddr_in &from);
virtual int sendUdpPacket(const void *data, int size, struct sockaddr_in &to);
virtual int sendUdpPacket(const void *data, int size, const struct sockaddr_in &to);
int setTTL(int t);
int getTTL();
@ -134,5 +134,35 @@ virtual int sendUdpPacket(const void *data, int size, struct sockaddr_in &to);
double lossFraction;
};
class PortRange
{
public:
PortRange();
PortRange(uint16_t lp, uint16_t up);
bool inRange(uint16_t port);
uint16_t lport;
uint16_t uport;
};
/* For Testing - drops packets */
class RestrictedUdpLayer: public UdpLayer
{
public:
RestrictedUdpLayer(UdpReceiver *udpr, struct sockaddr_in &local);
virtual ~RestrictedUdpLayer();
void addRestrictedPortRange(int lp, int up);
protected:
virtual int receiveUdpPacket(void *data, int *size, struct sockaddr_in &from);
virtual int sendUdpPacket(const void *data, int size, struct sockaddr_in &to);
std::list<PortRange> mLostPorts;
};
#endif

View file

@ -47,6 +47,34 @@ UdpStack::UdpStack(struct sockaddr_in &local)
return;
}
UdpStack::UdpStack(int testmode, struct sockaddr_in &local)
:udpLayer(NULL), laddr(local)
{
std::cerr << "UdpStack::UdpStack() Evoked in TestMode" << std::endl;
if (testmode == UDP_TEST_LOSSY_LAYER)
{
std::cerr << "UdpStack::UdpStack() Installing LossyUdpLayer" << std::endl;
udpLayer = new LossyUdpLayer(this, laddr, UDP_TEST_LOSSY_FRAC);
}
else if (testmode == UDP_TEST_RESTRICTED_LAYER)
{
std::cerr << "UdpStack::UdpStack() Installing RestrictedUdpLayer" << std::endl;
udpLayer = new RestrictedUdpLayer(this, laddr);
}
else
{
std::cerr << "UdpStack::UdpStack() Installing Standard UdpLayer" << std::endl;
// standard layer
openSocket();
}
return;
}
UdpLayer *UdpStack::getUdpLayer() /* for testing only */
{
return udpLayer;
}
bool UdpStack::resetAddress(struct sockaddr_in &local)
{
std::cerr << "UdpStack::resetAddress(" << local << ")";
@ -84,7 +112,7 @@ int UdpStack::recvPkt(void *data, int size, struct sockaddr_in &from)
return 1;
}
int UdpStack::sendPkt(const void *data, int size, struct sockaddr_in &to, int ttl)
int UdpStack::sendPkt(const void *data, int size, const struct sockaddr_in &to, int ttl)
{
/* print packet information */
#ifdef DEBUG_UDP_RECV
@ -198,7 +226,7 @@ UdpSubReceiver::UdpSubReceiver(UdpPublisher *pub)
return;
}
int UdpSubReceiver::sendPkt(const void *data, int size, struct sockaddr_in &to, int ttl)
int UdpSubReceiver::sendPkt(const void *data, int size, const struct sockaddr_in &to, int ttl)
{
/* print packet information */
#ifdef DEBUG_UDP_RECV

View file

@ -55,7 +55,7 @@ class UdpSubReceiver: public UdpReceiver
UdpSubReceiver(UdpPublisher *pub);
/* calls mPublisher->sendPkt */
virtual int sendPkt(const void *data, int size, struct sockaddr_in &to, int ttl);
virtual int sendPkt(const void *data, int size, const struct sockaddr_in &to, int ttl);
/* callback for recved data (overloaded from UdpReceiver) */
//virtual int recvPkt(void *data, int size, struct sockaddr_in &from) = 0;
@ -63,13 +63,21 @@ virtual int sendPkt(const void *data, int size, struct sockaddr_in &to, int ttl)
};
#define UDP_TEST_LOSSY_LAYER 1
#define UDP_TEST_RESTRICTED_LAYER 2
#define UDP_TEST_LOSSY_FRAC (0.10)
class UdpStack: public UdpReceiver, public UdpPublisher
{
public:
UdpStack(struct sockaddr_in &local);
UdpStack(int testmode, struct sockaddr_in &local);
virtual ~UdpStack() { return; }
UdpLayer *getUdpLayer(); /* for testing only */
bool resetAddress(struct sockaddr_in &local);
@ -79,7 +87,7 @@ int removeReceiver(UdpReceiver *recv);
/* Packet IO */
/* pass-through send packets */
virtual int sendPkt(const void *data, int size, struct sockaddr_in &to, int ttl);
virtual int sendPkt(const void *data, int size, const struct sockaddr_in &to, int ttl);
/* callback for recved data (overloaded from UdpReceiver) */
virtual int recvPkt(void *data, int size, struct sockaddr_in &from);