Improvement to BanPeers Sharing Code.

* Added p3BanList as a service.
 * Added interfaces to communicate addresses.
 * Added debug to p3BanList.
 * Fixed several bugs in the AddEntry/Condense
 * Fixed Mutex deadlocks.

Improvements to Dsdv code too.
 * Added p3Dsdv as a service.
 * Added Function to create a TEST service for routing.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-dhtmods@4687 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2011-11-25 00:58:01 +00:00
parent 8d4a7ed4f0
commit 85ea54395b
15 changed files with 304 additions and 18 deletions

View File

@ -70,6 +70,11 @@ virtual int dhtConnectCallback(const bdId *srcId, const bdId *proxyId, const bdI
return mParent->ConnectCallback(srcId, proxyId, destId, mode, point, param, cbtype, errcode);
}
virtual int dhtInfoCallback(const bdId *id, uint32_t type, uint32_t flags, std::string info)
{
return mParent->InfoCallback(id, type, flags, info);
}
private:
p3BitDht *mParent;
@ -84,6 +89,8 @@ p3BitDht::p3BitDht(std::string id, pqiConnectCb *cb, p3NetMgr *nm,
mProxyStunner = NULL;
mRelay = NULL;
mPeerSharer = NULL;
std::string dhtVersion = "RS51"; // should come from elsewhere!
mOwnRsId = id;
@ -150,6 +157,10 @@ void p3BitDht::setupConnectBits(UdpStunner *dhtStunner, UdpStunner *proxyStun
mRelay = relay;
}
void p3BitDht::setupPeerSharer(pqiNetAssistPeerShare *sharer)
{
mPeerSharer = sharer;
}
void p3BitDht::start()
{

View File

@ -155,8 +155,7 @@ virtual std::string getUdpAddressString();
void setupConnectBits(UdpStunner *dhtStunner, UdpStunner *proxyStunner, UdpRelayReceiver *relay);
void setupPeerSharer(pqiNetAssistPeerShare *sharer);
void start(); /* starts up the bitdht thread */
@ -177,6 +176,7 @@ virtual bool getNetworkStats(uint32_t &netsize, uint32_t &localnetsize);
virtual bool findPeer(std::string id);
virtual bool dropPeer(std::string id);
virtual int addBadPeer(const struct sockaddr_in &addr, uint32_t reason, uint32_t flags, uint32_t age);
virtual int addKnownPeer(const std::string &pid, const struct sockaddr_in &addr, uint32_t flags);
//virtual int addFriend(const std::string pid);
//virtual int addFriendOfFriend(const std::string pid);
@ -219,6 +219,8 @@ int PeerCallback(const bdId *id, uint32_t status);
int ValueCallback(const bdNodeId *id, std::string key, uint32_t status);
int ConnectCallback(const bdId *srcId, const bdId *proxyId, const bdId *destId,
uint32_t mode, uint32_t point, uint32_t param, uint32_t cbtype, uint32_t errcode);
int InfoCallback(const bdId *id, uint32_t type, uint32_t flags, std::string info);
int OnlinePeerCallback_locked(const bdId *id, uint32_t status, DhtPeerDetails *dpd);
int UnreachablePeerCallback_locked(const bdId *id, uint32_t status, DhtPeerDetails *dpd);
@ -283,6 +285,8 @@ int calculateNodeId(const std::string pid, bdNodeId *id);
p3NetMgr *mNetMgr;
pqiNetAssistPeerShare *mPeerSharer;
RsMutex dhtMtx;
std::string mOwnRsId;

View File

@ -37,6 +37,50 @@
************************************* Dht Callback ***************************************
******************************************************************************************/
/**** dht NodeCallback ****
*
*
* In the old version, we used this to callback mConnCb->peerStatus()
* We might want to drop this, and concentrate on the connection stuff.
*
* -> if an new dht peer, then pass to Stunners.
* -> do we care if we know them? not really!
*/
int p3BitDht::InfoCallback(const bdId *id, uint32_t type, uint32_t flags, std::string info)
{
/* translate info */
std::string rsid;
struct sockaddr_in addr = id->addr;
int outtype = PNASS_TYPE_BADPEER;
int outreason = PNASS_REASON_UNKNOWN;
int outage = 0;
#ifdef DEBUG_BITDHT_COMMON
std::cerr << "p3BitDht::InfoCallback() likely BAD_PEER: ";
bdStdPrintId(std::cerr, id);
std::cerr << std::endl;
#endif
{
RsStackMutex stack(dhtMtx); /********** LOCKED MUTEX ***************/
DhtPeerDetails *dpd = findInternalDhtPeer_locked(&(id->id), RSDHT_PEERTYPE_ANY);
if (dpd)
{
rsid = dpd->mRsId;
}
}
if (mPeerSharer)
{
mPeerSharer->updatePeer(rsid, addr, outtype, outreason, outage);
}
return 1;
}
/**** dht NodeCallback ****
*

View File

@ -183,6 +183,13 @@ bool p3BitDht::dropPeer(std::string pid)
********************************* Basic Peer Details *************************************
******************************************************************************************/
int p3BitDht::addBadPeer(const struct sockaddr_in &addr, uint32_t reason, uint32_t flags, uint32_t age)
{
//mUdpBitDht->updateKnownPeer(&id, 0, bdflags);
return 1;
}
int p3BitDht::addKnownPeer(const std::string &pid, const struct sockaddr_in &addr, uint32_t flags)
{

View File

@ -1423,6 +1423,22 @@ bool p3NetMgrIMPL::netAssistKnownPeer(std::string id, const struct sockaddr_in &
return true;
}
bool p3NetMgrIMPL::netAssistBadPeer(const struct sockaddr_in &addr, uint32_t reason, uint32_t flags, uint32_t age)
{
std::map<uint32_t, pqiNetAssistConnect *>::iterator it;
#ifdef NETMGR_DEBUG
std::cerr << "p3NetMgrIMPL::netAssistBadPeer(" << rs_inet_ntoa(addr.sin_addr) << ")";
std::cerr << std::endl;
#endif
for(it = mDhts.begin(); it != mDhts.end(); it++)
{
(it->second)->addBadPeer(addr, reason, flags, age);
}
return true;
}
bool p3NetMgrIMPL::netAssistAttach(bool on)
{

View File

@ -120,6 +120,7 @@ virtual bool setVisState(uint32_t visState) = 0;
// Switch DHT On/Off.
virtual bool netAssistFriend(std::string id, bool on) = 0;
virtual bool netAssistKnownPeer(std::string id, const struct sockaddr_in &addr, uint32_t flags) = 0;
virtual bool netAssistBadPeer(const struct sockaddr_in &addr, uint32_t reason, uint32_t flags, uint32_t age) = 0;
virtual bool netAssistStatusUpdate(std::string id, int mode) = 0;
/* Get Network State */
@ -175,6 +176,7 @@ virtual bool setVisState(uint32_t visState);
// Switch DHT On/Off.
virtual bool netAssistFriend(std::string id, bool on);
virtual bool netAssistKnownPeer(std::string id, const struct sockaddr_in &addr, uint32_t flags);
virtual bool netAssistBadPeer(const struct sockaddr_in &addr, uint32_t reason, uint32_t flags, uint32_t age);
virtual bool netAssistStatusUpdate(std::string id, int mode);
/* Get Network State */

View File

@ -79,6 +79,19 @@ virtual bool getExternalAddress(struct sockaddr_in &addr) = 0;
};
#define PNASS_TYPE_BADPEER 0x0001
#define PNASS_REASON_UNKNOWN 0x0001
class pqiNetAssistPeerShare
{
public:
/* share Addresses for various reasons (bad peers, etc) */
virtual void updatePeer(std::string id, struct sockaddr_in addr, int type, int reason, int age) = 0;
};
/* this is for the Stunners
*
*
@ -130,6 +143,7 @@ virtual bool findPeer(std::string id) = 0;
virtual bool dropPeer(std::string id) = 0;
/* add non-active peers (can still toggle active/non-active via above) */
virtual int addBadPeer(const struct sockaddr_in &addr, uint32_t reason, uint32_t flags, uint32_t age) = 0;
virtual int addKnownPeer(const std::string &pid, const struct sockaddr_in &addr, uint32_t flags) = 0;
//virtual int addFriend(const std::string pid) = 0;

View File

@ -37,6 +37,7 @@ extern RsDsdv *rsDsdv;
#define RSDSDV_IDTYPE_PEER 0x0001
#define RSDSDV_IDTYPE_SERVICE 0x0002
#define RSDSDV_IDTYPE_TEST 0x0100
#define RSDSDV_FLAGS_SIGNIFICANT_CHANGE 0x0001
#define RSDSDV_FLAGS_STABLE_ROUTE 0x0002

View File

@ -1781,6 +1781,9 @@ RsTurtle *rsTurtle = NULL ;
#include "services/p3vors.h"
#endif
#include "services/p3banlist.h"
#include "services/p3dsdv.h"
RsControl *createRsControl(RsIface &iface, NotifyBase &notify)
{
@ -2127,6 +2130,16 @@ int RsServer::StartupRetroShare()
rsVoip = mVoipTest;
#endif
// new services to test.
p3BanList *mBanList = new p3BanList(mLinkMgr, mNetMgr);
pqih -> addService(mBanList);
mBitDht->setupPeerSharer(mBanList);
p3Dsdv *mDsdv = new p3Dsdv(mLinkMgr);
pqih -> addService(mDsdv);
rsDsdv = mDsdv;
mDsdv->addTestService();
#endif // MINIMAL_LIBRS
/**************************************************************************/

View File

@ -166,6 +166,11 @@ std::ostream &RsTlvBanListEntry::print(std::ostream &out, uint16_t indent)
/************************************* RsTlvBanList ************************************/
RsTlvBanList::RsTlvBanList()
{
}
void RsTlvBanList::TlvClear()
{
entries.clear();

View File

@ -278,6 +278,11 @@ std::ostream &RsTlvDsdvEntry::print(std::ostream &out, uint16_t indent)
/************************************* RsTlvDsdvEntrySet ************************************/
RsTlvDsdvEntrySet::RsTlvDsdvEntrySet()
{
}
void RsTlvDsdvEntrySet::TlvClear()
{
entries.clear();

View File

@ -37,6 +37,8 @@
* #define DEBUG_BANLIST 1
****/
#define DEBUG_BANLIST 1
/* DEFINE INTERFACE POINTER! */
//RsBanList *rsBanList = NULL;
@ -89,6 +91,12 @@ bool p3BanList::processIncoming()
bool updated = false;
while(NULL != (item = recvItem()))
{
#ifdef DEBUG_BANLIST
std::cerr << "p3BanList::processingIncoming() Received Item:";
std::cerr << std::endl;
item->print(std::cerr);
std::cerr << std::endl;
#endif
switch(item->PacketSubType())
{
default:
@ -134,6 +142,24 @@ bool p3BanList::recvBanItem(RsBanListItem *item)
return updated;
}
/* overloaded from pqiNetAssistSharePeer */
void p3BanList::updatePeer(std::string id, struct sockaddr_in addr, int type, int reason, int age)
{
std::string ownId = mLinkMgr->getOwnId();
int int_reason = 0;
addBanEntry(ownId, addr, RSBANLIST_SOURCE_SELF, int_reason, age);
/* process */
{
RsStackMutex stack(mBanMtx); /****** LOCKED MUTEX *******/
mBanSet.clear();
condenseBanSources_locked();
}
}
bool p3BanList::addBanEntry(const std::string &peerId, const struct sockaddr_in &addr, uint32_t level, uint32_t reason, uint32_t age)
{
RsStackMutex stack(mBanMtx); /****** LOCKED MUTEX *******/
@ -141,13 +167,19 @@ bool p3BanList::addBanEntry(const std::string &peerId, const struct sockaddr_in
time_t now = time(NULL);
bool updated = false;
#ifdef DEBUG_BANLIST
std::cerr << "p3BanList::addBanEntry() Addr: " << rs_inet_ntoa(addr.sin_addr) << " Level: " << level;
std::cerr << " Reason: " << reason << " Age: " << age;
std::cerr << std::endl;
#endif
std::map<std::string, BanList>::iterator it;
it = mBanSources.find(peerId);
if (it == mBanSources.end())
{
BanList bl;
bl.mPeerId = peerId;
bl.mLastUpdate = 0;
bl.mLastUpdate = now;
mBanSources[peerId] = bl;
it = mBanSources.find(peerId);
@ -164,6 +196,9 @@ bool p3BanList::addBanEntry(const std::string &peerId, const struct sockaddr_in
blp.reason = reason;
blp.level = level;
blp.mTs = now - age;
it->second.mBanPeers[addr.sin_addr.s_addr] = blp;
it->second.mLastUpdate = now;
updated = true;
}
else
@ -178,6 +213,8 @@ bool p3BanList::addBanEntry(const std::string &peerId, const struct sockaddr_in
mit->second.reason = reason;
mit->second.level = level;
mit->second.mTs = now - age;
it->second.mLastUpdate = now;
updated = true;
}
}
@ -190,13 +227,29 @@ int p3BanList::condenseBanSources_locked()
time_t now = time(NULL);
std::string ownId = mLinkMgr->getOwnId();
#ifdef DEBUG_BANLIST
std::cerr << "p3BanList::condenseBanSources_locked()";
std::cerr << std::endl;
#endif
std::map<std::string, BanList>::const_iterator it;
for(it = mBanSources.begin(); it != mBanSources.end(); it++)
{
if (now - it->second.mLastUpdate > RSBANLIST_ENTRY_MAX_AGE)
{
#ifdef DEBUG_BANLIST
std::cerr << "p3BanList::condenseBanSources_locked()";
std::cerr << " Ignoring Out-Of-Date peer: " << it->first;
std::cerr << std::endl;
#endif
continue;
}
#ifdef DEBUG_BANLIST
std::cerr << "p3BanList::condenseBanSources_locked()";
std::cerr << " Condensing Info from peer: " << it->first;
std::cerr << std::endl;
#endif
std::map<uint32_t, BanListPeer>::const_iterator lit;
for(lit = it->second.mBanPeers.begin();
@ -205,6 +258,12 @@ int p3BanList::condenseBanSources_locked()
/* check timestamp */
if (now - lit->second.mTs > RSBANLIST_ENTRY_MAX_AGE)
{
#ifdef DEBUG_BANLIST
std::cerr << "p3BanList::condenseBanSources_locked()";
std::cerr << " Ignoring Out-Of-Date Entry for: ";
std::cerr << rs_inet_ntoa(lit->second.addr.sin_addr);
std::cerr << std::endl;
#endif
continue;
}
@ -224,9 +283,21 @@ int p3BanList::condenseBanSources_locked()
bp.level = lvl;
bp.addr.sin_port = 0;
mBanSet[lit->second.addr.sin_addr.s_addr] = bp;
#ifdef DEBUG_BANLIST
std::cerr << "p3BanList::condenseBanSources_locked()";
std::cerr << " Added New Entry for: ";
std::cerr << rs_inet_ntoa(lit->second.addr.sin_addr);
std::cerr << std::endl;
#endif
}
else
{
#ifdef DEBUG_BANLIST
std::cerr << "p3BanList::condenseBanSources_locked()";
std::cerr << " Merging Info for: ";
std::cerr << rs_inet_ntoa(lit->second.addr.sin_addr);
std::cerr << std::endl;
#endif
/* update if necessary */
if (lvl == sit->second.level)
{
@ -239,6 +310,15 @@ int p3BanList::condenseBanSources_locked()
}
}
}
#ifdef DEBUG_BANLIST
std::cerr << "p3BanList::condenseBanSources_locked() Printing New Set:";
std::cerr << std::endl;
#endif
printBanSet_locked(std::cerr);
}
@ -255,10 +335,15 @@ int p3BanList::sendPackets()
if (now - pt > RSBANLIST_SEND_PERIOD)
{
sendBanLists();
printBanSources(std::cerr);
printBanSet(std::cerr);
RsStackMutex stack(mBanMtx); /****** LOCKED MUTEX *******/
std::cerr << "p3BanList::sendPackets() Regular Broadcast";
std::cerr << std::endl;
printBanSources_locked(std::cerr);
printBanSet_locked(std::cerr);
mSentListTime = now;
}
return true ;
@ -325,11 +410,9 @@ int p3BanList::sendBanSet(std::string peerid)
}
int p3BanList::printBanSet(std::ostream &out)
int p3BanList::printBanSet_locked(std::ostream &out)
{
RsStackMutex stack(mBanMtx); /****** LOCKED MUTEX *******/
out << "p3BanList::printBanSet";
out << "p3BanList::printBanSet_locked()";
out << std::endl;
time_t now = time(NULL);
@ -352,10 +435,8 @@ int p3BanList::printBanSet(std::ostream &out)
int p3BanList::printBanSources(std::ostream &out)
int p3BanList::printBanSources_locked(std::ostream &out)
{
RsStackMutex stack(mBanMtx); /****** LOCKED MUTEX *******/
time_t now = time(NULL);
std::map<std::string, BanList>::const_iterator it;

View File

@ -65,13 +65,18 @@ class BanList
* Exchange list of Banned IP addresses with peers.
*/
class p3BanList: /* public RsBanList, */ public p3Service /* , public p3Config, public pqiMonitor */
class p3BanList: /* public RsBanList, */ public p3Service, public pqiNetAssistPeerShare /* , public p3Config, public pqiMonitor */
{
public:
p3BanList(p3LinkMgr *lm, p3NetMgr *nm);
/***** overloaded from RsBanList *****/
/***** overloaded from pqiNetAssistPeerShare *****/
virtual void updatePeer(std::string id, struct sockaddr_in addr, int type, int reason, int age);
/***** overloaded from p3Service *****/
/*!
* This retrieves all chat msg items and also (important!)
@ -93,8 +98,6 @@ class p3BanList: /* public RsBanList, */ public p3Service /* , public p3Config,
void sendBanLists();
int sendBanSet(std::string peerid);
int printBanSources(std::ostream &out);
int printBanSet(std::ostream &out);
/*!
* Interface stuff.
@ -115,6 +118,8 @@ class p3BanList: /* public RsBanList, */ public p3Service /* , public p3Config,
RsMutex mBanMtx;
int condenseBanSources_locked();
int printBanSources_locked(std::ostream &out);
int printBanSet_locked(std::ostream &out);
time_t mSentListTime;
std::map<std::string, BanList> mBanSources;

View File

@ -29,11 +29,14 @@
//#include "serialiser/rsdsdvitems.h"
#include "services/p3dsdv.h"
#include "pqi/p3linkmgr.h"
#include "util/rsrandom.h"
#include <openssl/sha.h>
/****
* #define DEBUG_DSDV 1
****/
#define DEBUG_DSDV 1
/* DEFINE INTERFACE POINTER! */
RsDsdv *rsDsdv = NULL;
@ -98,8 +101,16 @@ int p3Dsdv::sendTables()
if (now - tt > DSDV_BROADCAST_PERIOD)
{
#ifdef DEBUG_DSDV
std::cerr << "p3Dsdv::sendTables() Broadcast Time";
std::cerr << std::endl;
#endif
generateRoutingTables(false);
printDsdvTable(std::cerr);
RsStackMutex stack(mDsdvMtx); /****** LOCKED MUTEX *******/
mSentTablesTime = now;
mSentIncrementTime = now;
@ -359,11 +370,68 @@ void p3Dsdv::statusChange(const std::list<pqipeer> &plist)
}
}
int p3Dsdv::addTestService()
{
RsDsdvId testId;
int rndhash1[SHA_DIGEST_LENGTH / 4];
int rndhash2[SHA_DIGEST_LENGTH / 4];
std::ostringstream rh, sh;
std::string realHash;
std::string seedHash;
int i;
for(i = 0; i < SHA_DIGEST_LENGTH / 4; i++)
{
rndhash1[i] = RSRandom::random_u32();
rndhash2[i] = RSRandom::random_u32();
}
for(int i = 0; i < SHA_DIGEST_LENGTH; i++)
{
rh << std::setw(2) << std::setfill('0') << std::hex << (uint32_t) ((uint8_t *) rndhash1)[i];
sh << std::setw(2) << std::setfill('0') << std::hex << (uint32_t) ((uint8_t *) rndhash2)[i];
}
realHash = rh.str();
seedHash = sh.str();
uint8_t sha_hash[SHA_DIGEST_LENGTH];
memset(sha_hash,0,SHA_DIGEST_LENGTH*sizeof(uint8_t)) ;
SHA_CTX *sha_ctx = new SHA_CTX;
SHA1_Init(sha_ctx);
SHA1_Update(sha_ctx, realHash.c_str(), realHash.length());
SHA1_Update(sha_ctx, seedHash.c_str(), seedHash.length());
SHA1_Final(sha_hash, sha_ctx);
delete sha_ctx;
std::ostringstream keystr;
for(int i = 0; i < SHA_DIGEST_LENGTH; i++)
{
keystr << std::setw(2) << std::setfill('0') << std::hex << (uint32_t) (sha_hash)[i];
}
testId.mIdType = RSDSDV_IDTYPE_TEST;
testId.mAnonChunk = seedHash;
testId.mHash = keystr.str();
addDsdvId(&testId, realHash);
return 1;
}
int p3Dsdv::addDsdvId(RsDsdvId *id, std::string realHash)
{
RsStackMutex stack(mDsdvMtx); /****** LOCKED MUTEX *******/
#ifdef DEBUG_DSDV
std::cerr << "p3Dsdv::addDsdvId() ID: " << *id << " RealHash: " << realHash;
std::cerr << std::endl;
#endif
time_t now = time(NULL);
/* check for duplicate */
@ -407,6 +475,11 @@ int p3Dsdv::dropDsdvId(RsDsdvId *id)
{
RsStackMutex stack(mDsdvMtx); /****** LOCKED MUTEX *******/
#ifdef DEBUG_DSDV
std::cerr << "p3Dsdv::dropDsdvId() ID: " << *id;
std::cerr << std::endl;
#endif
/* This should send out an infinity packet... and flag for deletion */
std::map<std::string, RsDsdvTableEntry>::iterator it;
@ -436,9 +509,12 @@ int p3Dsdv::printDsdvTable(std::ostream &out)
{
RsDsdvTableEntry &v = it->second;
out << v.mDest;
out << " BR: " << v.mBestRoute;
out << " SR: " << v.mBestRoute;
out << " Flags: " << v.mFlags;
out << std::endl;
out << "\tBR: " << v.mBestRoute;
out << std::endl;
out << "\tSR: " << v.mBestRoute;
out << std::endl;
out << "\tFlags: " << v.mFlags;
out << " Own: " << v.mOwnSource;
if (v.mMatched)
{

View File

@ -60,6 +60,8 @@ int addDsdvId(RsDsdvId *id, std::string realHash);
int dropDsdvId(RsDsdvId *id);
int printDsdvTable(std::ostream &out);
int addTestService();
private:
int sendTables();