From 02b6dab0817e0725de42c20c901ef68a66cafb8d Mon Sep 17 00:00:00 2001 From: drbob Date: Tue, 14 Jun 2011 22:31:03 +0000 Subject: [PATCH] Finished Basic Implementation of UdpRelay code. This will be inserted into TOU for Relayed connections. * added udprelay to libretroshare.pro * FIXED joss' BUG. Revision 1906. (caused 1.0.0.0:1 in IP serialisation) WTF! * added missing return in p3tbidht. * Lots of improvements to udprelay git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-peernet@4264 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/dht/p3bitdht.cc | 1 + libretroshare/src/libretroshare.pro | 3 +- libretroshare/src/serialiser/rstlvbase.cc | 8 +- libretroshare/src/tcponudp/udppeer.h | 27 - libretroshare/src/tcponudp/udprelay.cc | 588 +++++++++++++++------- libretroshare/src/tcponudp/udprelay.h | 126 ++++- 6 files changed, 539 insertions(+), 214 deletions(-) diff --git a/libretroshare/src/dht/p3bitdht.cc b/libretroshare/src/dht/p3bitdht.cc index 22b674a6d..3d6f5ed30 100644 --- a/libretroshare/src/dht/p3bitdht.cc +++ b/libretroshare/src/dht/p3bitdht.cc @@ -64,6 +64,7 @@ virtual int dhtConnectCallback(const bdId *srcId, const bdId *proxyId, const bdI { // nothing here fore now! //return mParent->ValueCallback(id, key, status); + return 0; } private: diff --git a/libretroshare/src/libretroshare.pro b/libretroshare/src/libretroshare.pro index 4dfd0603c..15c9eedd9 100644 --- a/libretroshare/src/libretroshare.pro +++ b/libretroshare/src/libretroshare.pro @@ -75,13 +75,14 @@ HEADERS += tcponudp/udppeer.h \ tcponudp/tcpstream.h \ tcponudp/tou.h \ tcponudp/udpstunner.h \ + tcponudp/udprelay.h \ SOURCES += tcponudp/udppeer.cc \ tcponudp/tcppacket.cc \ tcponudp/tcpstream.cc \ tcponudp/tou.cc \ tcponudp/bss_tou.c \ - tcponudp/udpstunner.cc \ + tcponudp/udprelay.cc \ # These two aren't actually used (and don't compile) .... # but could be useful later diff --git a/libretroshare/src/serialiser/rstlvbase.cc b/libretroshare/src/serialiser/rstlvbase.cc index 331581938..b672c089b 100644 --- a/libretroshare/src/serialiser/rstlvbase.cc +++ b/libretroshare/src/serialiser/rstlvbase.cc @@ -600,13 +600,7 @@ bool SetTlvIpAddrPortV4(void *data, uint32_t size, uint32_t *offset, ok &= SetTlvBase(data, tlvend, offset, type, tlvsize); sockaddr_in addr = *out; - //it looks like if ip or port is null that there is a problem - if (addr.sin_addr.s_addr == 0) { - addr.sin_addr.s_addr = 1; - } - if (addr.sin_port == 0) { - addr.sin_port = 1; - } + /* now add the data .... (its already in network order) - so flip */ uint32_t ipaddr = addr.sin_addr.s_addr; ok &= setRawUInt32(data, tlvend, offset, ntohl(ipaddr)); diff --git a/libretroshare/src/tcponudp/udppeer.h b/libretroshare/src/tcponudp/udppeer.h index e4357fa34..f25d1cffc 100644 --- a/libretroshare/src/tcponudp/udppeer.h +++ b/libretroshare/src/tcponudp/udppeer.h @@ -71,31 +71,4 @@ int status(std::ostream &out); }; -class UdpRelayReceiver: public UdpSubReceiver, public UdpPublisher -{ - public: - - UdpRelayReceiver(UdpPublisher *pub); -virtual ~UdpRelayReceiver() { return; } - - /* add a TCPonUDP stream */ -int addUdpPeer(UdpPeer *peer, const struct sockaddr_in &raddr); -int removeUdpPeer(UdpPeer *peer); - - /* callback for recved data (overloaded from UdpReceiver) */ -virtual int recvPkt(void *data, int size, struct sockaddr_in &from); - - /* wrapper function for relay (overloaded from UdpPublisher) */ -virtual int sendPkt(const void *data, int size, struct sockaddr_in &to, int ttl); - -int status(std::ostream &out); - - private: - - RsMutex peerMtx; /* for all class data (below) */ - - std::map streams; - -}; - #endif diff --git a/libretroshare/src/tcponudp/udprelay.cc b/libretroshare/src/tcponudp/udprelay.cc index 622cf25b7..f7bea71f5 100644 --- a/libretroshare/src/tcponudp/udprelay.cc +++ b/libretroshare/src/tcponudp/udprelay.cc @@ -23,140 +23,108 @@ * */ -#include "udppeer.h" +#include "udprelay.h" #include /* - * #define DEBUG_UDP_PEER 1 + * #define DEBUG_UDP_RELAY 1 */ +#define DEBUG_UDP_RELAY 1 -UdpPeerReceiver::UdpPeerReceiver(UdpPublisher *pub) +/****************** UDP RELAY STUFF **********/ + +#define MAX_RELAY_UDP_PACKET_SIZE 1024 + +UdpRelayReceiver::UdpRelayReceiver(UdpPublisher *pub) :UdpSubReceiver(pub) { + mClassLimit.resize(UDP_RELAY_NUM_CLASS); + mClassCount.resize(UDP_RELAY_NUM_CLASS); + + setRelayTotal(UDP_RELAY_DEFAULT_COUNT_ALL); + + for(int i = 0; i < UDP_RELAY_NUM_CLASS; i++) + { + mClassCount[i] = 0; + } + + /* only allocate this space once */ + mTmpSendPkt = malloc(MAX_RELAY_UDP_PACKET_SIZE); + mTmpSendSize = MAX_RELAY_UDP_PACKET_SIZE; + return; } -/* higher level interface */ -int UdpPeerReceiver::recvPkt(void *data, int size, struct sockaddr_in &from) +UdpRelayReceiver::~UdpRelayReceiver() { - /* print packet information */ -#ifdef DEBUG_UDP_PEER - std::cerr << "UdpPeerReceiver::recvPkt(" << size << ") from: " << from; - std::cerr << std::endl; -#endif + free(mTmpSendPkt); +} - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ - /* look for a peer */ - std::map::iterator it; - it = streams.find(from); - - if (it == streams.end()) +int UdpRelayReceiver::addUdpPeer(UdpPeer *peer, UdpRelayAddrSet *endPoints, const struct sockaddr_in &proxyaddr) +{ + RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + + struct sockaddr_in realPeerAddr = endPoints->mDestAddr; + + /* check for duplicate */ + std::map::iterator it; + it = mStreams.find(realPeerAddr); + bool ok = (it == mStreams.end()); + if (!ok) { - /* peer unknown */ -#ifdef DEBUG_UDP_PEER - std::cerr << "UdpPeerReceiver::recvPkt() Peer Unknown!"; - std::cerr << std::endl; +#ifdef DEBUG_UDP_RELAY + std::cerr << "UdpPeerReceiver::addUdpPeer() ERROR Peer already exists!" << std::endl; #endif return 0; } - else - { - /* forward to them */ -#ifdef DEBUG_UDP_PEER - std::cerr << "UdpPeerReceiver::recvPkt() Sending to UdpPeer: "; - std::cerr << it->first; - std::cerr << std::endl; -#endif - (it->second)->recvPkt(data, size); - return 1; - } - /* done */ -} - -int UdpPeerReceiver::status(std::ostream &out) -{ - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ - - out << "UdpPeerReceiver::status()" << std::endl; - out << "UdpPeerReceiver::peers:" << std::endl; - std::map::iterator it; - for(it = streams.begin(); it != streams.end(); it++) - { - out << "\t" << it->first << std::endl; - } - out << std::endl; - + /* setup a peer */ + UdpRelayEnd ure(peer, endPoints, &proxyaddr); + + mStreams[realPeerAddr] = ure; + return 1; } - /* add a TCPonUDP stream */ -int UdpPeerReceiver::addUdpPeer(UdpPeer *peer, const struct sockaddr_in &raddr) + +int UdpRelayReceiver::removeUdpPeer(UdpPeer *peer) { - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ - - - /* check for duplicate */ - std::map::iterator it; - it = streams.find(raddr); - bool ok = (it == streams.end()); - if (!ok) + RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + + std::map::iterator it; + for(it = mStreams.begin(); it != mStreams.end(); it++) { -#ifdef DEBUG_UDP_PEER - std::cerr << "UdpPeerReceiver::addUdpPeer() Peer already exists!" << std::endl; - std::cerr << "UdpPeerReceiver::addUdpPeer() ERROR" << std::endl; -#endif - } - else - { - streams[raddr] = peer; - } - - return ok; -} - -int UdpPeerReceiver::removeUdpPeer(UdpPeer *peer) -{ - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ - - /* check for duplicate */ - std::map::iterator it; - for(it = streams.begin(); it != streams.end(); it++) - { - if (it->second == peer) + if (it->second.mPeer == peer) { -#ifdef DEBUG_UDP_PEER - std::cerr << "UdpPeerReceiver::removeUdpPeer() SUCCESS" << std::endl; -#endif - streams.erase(it); + mStreams.erase(it); return 1; } } - -#ifdef DEBUG_UDP_PEER - std::cerr << "UdpPeerReceiver::removeUdpPeer() ERROR" << std::endl; -#endif return 0; } +#define RELAY_MAX_BANDWIDTH 1000 +#define RELAY_TIMEOUT 30 -/****************** UDP RELAY STUFF **********/ -int UdpRelayReciever::checkRelays() +int UdpRelayReceiver::checkRelays() { RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ /* iterate through the Relays */ - out << "UdpRelayReceiver::checkRelays()" << std::endl; + std::cerr << "UdpRelayReceiver::checkRelays()"; + std::cerr << std::endl; - std::list eraseList; - std::map::iterator rit; + std::list eraseList; + std::map::iterator rit; + time_t now = time(NULL); + for(rit = mRelays.begin(); rit != mRelays.end(); rit++) { /* calc bandwidth */ @@ -164,95 +132,287 @@ int UdpRelayReciever::checkRelays() rit->second.mDataSize = 0; rit->second.mLastBandwidthTS = now; + std::cerr << "UdpRelayReceiver::checkRelays()"; + std::cerr << "Relay: " << rit->first; + std::cerr << " using bandwidth: " << rit->second.mBandwidth; + std::cerr << std::endl; + if (rit->second.mBandwidth > RELAY_MAX_BANDWIDTH) { + std::cerr << "UdpRelayReceiver::checkRelays()"; + std::cerr << "Dropping Relay due to excessive Bandwidth: " << rit->first; + std::cerr << std::endl; + /* if exceeding bandwidth -> drop */ eraseList.push_back(rit->first); } else if (now - rit->second.mLastTS > RELAY_TIMEOUT) { /* if haven't transmitted for ages -> drop */ - out << "\t" << rit->first << " : " << rit->second; - out << std::endl; + std::cerr << "UdpRelayReceiver::checkRelays()"; + std::cerr << "Dropping Relay due to Timeout: " << rit->first; + std::cerr << std::endl; eraseList.push_back(rit->first); } } - std::list::iterator it; + std::list::iterator it; for(it = eraseList.begin(); it != eraseList.end(); it++) { - /* find in Relay list */ - - /* rotate around and delete matching set */ + removeUdpRelay(&(*it)); } + return 1; } -int UdpRelayReciever::addUdpRelay(UdpRelayAddrSet *addrSet) +int UdpRelayReceiver::addUdpRelay(UdpRelayAddrSet *addrSet, int relayClass) { - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ - - -} - -int UdpRelayReciever::removeUdpRelay(UdpRelayAddrSet *addrSet) -{ - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ - - -} - -int UdpRelayReciever::addRelayUdpPeer(UdpPeer *peer, UdpRelayAddrSet *addrSet) -{ - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ - - - -} - -int UdpRelayReceiver::removeRelayUdpPeer(UdpPeer *peer) -{ - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ /* check for duplicate */ - std::map::iterator it; - for(it = mStreams.begin(); it != mStreams.end(); it++) + std::map::iterator rit = mRelays.find(*addrSet); + int ok = (rit == mRelays.end()); + if (!ok) { - if (it->second == peer) - { -#ifdef DEBUG_UDP_RELAY - std::cerr << "UdpRelayReceiver::removeUdpPeer() SUCCESS" << std::endl; -#endif - mStreams.erase(it); - return 1; - } +//#ifdef DEBUG_UDP_RELAY + std::cerr << "UdpRelayReceiver::addUdpRelay() ERROR Peer already exists!" << std::endl; +//#endif + return 0; } -#ifdef DEBUG_UDP_RELAY - std::cerr << "UdpRelayReceiver::removeUdpPeer() ERROR" << std::endl; -#endif + /* will install if there is space! */ + if (installRelayClass_locked(relayClass)) + { +//#ifdef DEBUG_UDP_RELAY + std::cerr << "UdpRelayReceiver::addUdpRelay() adding Relay" << std::endl; +//#endif + /* create UdpRelay */ + UdpRelayProxy udpRelay(addrSet, relayClass); + UdpRelayAddrSet alt = addrSet->flippedSet(); + + /* must install two (A, B) & (B, A) */ + mRelays[*addrSet] = udpRelay; + mRelays[alt] = udpRelay; + + return 1; + } + +//#ifdef DEBUG_UDP_RELAY + std::cerr << "UdpRelayReceiver::addUdpRelay() ERROR Too many Relays!" << std::endl; +//#endif return 0; } +int UdpRelayReceiver::removeUdpRelay(UdpRelayAddrSet *addrSet) +{ + RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + + /* find in Relay list */ + std::map::iterator rit = mRelays.find(*addrSet); + if (rit == mRelays.end()) + { + /* ERROR */ + std::cerr << "UdpRelayReceiver::removeUdpRelay()"; + std::cerr << "ERROR Finding Relay: " << *addrSet; + std::cerr << std::endl; + } + else + { + /* lets drop the count here too */ + removeRelayClass_locked(rit->second.mRelayClass); + mRelays.erase(rit); + } + + /* rotate around and delete matching set */ + UdpRelayAddrSet alt = addrSet->flippedSet(); + + rit = mRelays.find(alt); + if (rit == mRelays.end()) + { + std::cerr << "UdpRelayReceiver::removeUdpRelay()"; + std::cerr << "Error Finding Alt Relay: " << alt; + std::cerr << std::endl; + /* ERROR */ + } + else + { + mRelays.erase(rit); + } + return 1; +} + + /* Need some stats, to work out how many relays we are supporting */ +int UdpRelayReceiver::installRelayClass_locked(int classIdx) +{ + /* check for total number of Relays */ + if (mClassCount[UDP_RELAY_CLASS_ALL] >= mClassLimit[UDP_RELAY_CLASS_ALL]) + { + std::cerr << "UdpRelayReceiver::installRelayClass() ERROR Too many Relays already"; + std::cerr << std::endl; + return 0; + } + + /* check the idx too */ + if ((classIdx < 0) || (classIdx >= UDP_RELAY_NUM_CLASS)) + { + std::cerr << "UdpRelayReceiver::installRelayClass() ERROR class Idx invalid"; + std::cerr << std::endl; + return 0; + } + + /* now check the specifics of the class */ + if (mClassCount[classIdx] >= mClassLimit[classIdx]) + { + std::cerr << "UdpRelayReceiver::installRelayClass() ERROR Relay Class Limit Exceeded"; + std::cerr << std::endl; + + return 0; + } + + std::cerr << "UdpRelayReceiver::installRelayClass() Relay Class Ok, Count incremented"; + std::cerr << std::endl; + + /* if we get here we can add one */ + mClassCount[UDP_RELAY_CLASS_ALL]++; + mClassCount[classIdx]++; + + return 1; +} + +int UdpRelayReceiver::removeRelayClass_locked(int classIdx) +{ + /* check for total number of Relays */ + if (mClassCount[UDP_RELAY_CLASS_ALL] < 1) + { + std::cerr << "UdpRelayReceiver::removeRelayClass() ERROR no relays installed"; + std::cerr << std::endl; + return 0; + } + + /* check the idx too */ + if ((classIdx < 0) || (classIdx >= UDP_RELAY_NUM_CLASS)) + { + std::cerr << "UdpRelayReceiver::removeRelayClass() ERROR class Idx invalid"; + std::cerr << std::endl; + return 0; + } + + /* now check the specifics of the class */ + if (mClassCount[classIdx] < 1) + { + std::cerr << "UdpRelayReceiver::removeRelayClass() ERROR no relay of class installed"; + std::cerr << std::endl; + + return 0; + } + + std::cerr << "UdpRelayReceiver::removeRelayClass() Ok, Count decremented"; + std::cerr << std::endl; + + /* if we get here we can add one */ + mClassCount[UDP_RELAY_CLASS_ALL]--; + mClassCount[classIdx]--; + + return 1; +} + + +int UdpRelayReceiver::setRelayTotal(int count) +{ + RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + + mClassLimit[UDP_RELAY_CLASS_ALL] = count; + mClassLimit[UDP_RELAY_CLASS_GENERAL] = (int) (UDP_RELAY_FRAC_GENERAL * count); + mClassLimit[UDP_RELAY_CLASS_FOF] = (int) (UDP_RELAY_FRAC_FOF * count); + mClassLimit[UDP_RELAY_CLASS_FRIENDS] = (int) (UDP_RELAY_FRAC_FRIENDS * count); + + return count; +} + + +int UdpRelayReceiver::setRelayClassMax(int classIdx, int count) +{ + RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + + /* check the idx */ + if ((classIdx < 0) || (classIdx >= UDP_RELAY_NUM_CLASS)) + { + std::cerr << "UdpRelayReceiver::setRelayMaximum() ERROR class Idx invalid"; + std::cerr << std::endl; + return 0; + } + + mClassLimit[classIdx] = count; + return 1; +} + + +int UdpRelayReceiver::getRelayClassMax(int classIdx) +{ + RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + + /* check the idx */ + if ((classIdx < 0) || (classIdx >= UDP_RELAY_NUM_CLASS)) + { + std::cerr << "UdpRelayReceiver::getRelayMaximum() ERROR class Idx invalid"; + std::cerr << std::endl; + return 0; + } + + return mClassLimit[classIdx]; +} + +int UdpRelayReceiver::getRelayCount(int classIdx) +{ + RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + + /* check the idx */ + if ((classIdx < 0) || (classIdx >= UDP_RELAY_NUM_CLASS)) + { + std::cerr << "UdpRelayReceiver::getRelayCount() ERROR class Idx invalid"; + std::cerr << std::endl; + return 0; + } + + return mClassCount[classIdx]; +} + + +int UdpRelayReceiver::RelayStatus(std::ostream &out) +{ + RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + + /* iterate through the Relays */ + out << "UdpRelayReceiver::RelayStatus()"; + out << std::endl; + + std::map::iterator rit; + for(rit = mRelays.begin(); rit != mRelays.end(); rit++) + { + out << "Relay for: " << rit->first; + out << std::endl; + + out << "\tClass: " << rit->second.mRelayClass; + out << "\tBandwidth: " << rit->second.mBandwidth; + out << "\tDataSize: " << rit->second.mDataSize; + out << "\tLastBandwidthTS: " << rit->second.mLastBandwidthTS; + } + return 1; +} int UdpRelayReceiver::status(std::ostream &out) { - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ out << "UdpRelayReceiver::status()" << std::endl; out << "UdpRelayReceiver::Relayed Connections:" << std::endl; - std::map::iterator rit; + RelayStatus(out); - for(rit = mRelays.begin(); rit != mRelays.end(); rit++) - { - out << "\t" << rit->first << " : " << rit->second; - out << std::endl; - } + RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ out << "UdpRelayReceiver::Connections:" << std::endl; - std::map::iterator pit; + std::map::iterator pit; for(pit = mStreams.begin(); pit != mStreams.end(); pit++) { out << "\t" << pit->first << " : " << pit->second; @@ -262,6 +422,8 @@ int UdpRelayReceiver::status(std::ostream &out) return 1; } +#define UDP_RELAY_HEADER_SIZE 16 + /* higher level interface */ int UdpRelayReceiver::recvPkt(void *data, int size, struct sockaddr_in &from) { @@ -270,6 +432,14 @@ int UdpRelayReceiver::recvPkt(void *data, int size, struct sockaddr_in &from) std::cerr << "UdpRelayReceiver::recvPkt(" << size << ") from: " << from; std::cerr << std::endl; #endif + if (!isUdpRelayPacket(data, size)) + { +#ifdef DEBUG_UDP_RELAY + std::cerr << "UdpRelayReceiver::recvPkt() is Not RELAY Pkt"; + std::cerr << std::endl; +#endif + return 0; + } RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ @@ -282,39 +452,41 @@ int UdpRelayReceiver::recvPkt(void *data, int size, struct sockaddr_in &from) } /* lookup relay first (double entries) */ - std::map::iterator rit = mRelays.find(addrSet); + std::map::iterator rit = mRelays.find(addrSet); if (rit != mRelays.end()) { /* we are the relay */ #ifdef DEBUG_UDP_RELAY std::cerr << "UdpRelayReceiver::recvPkt() We are the Relay. Passing onto: "; - std::cerr << it->second.endpoint; + std::cerr << rit->first.mDestAddr; std::cerr << std::endl; #endif /* do accounting */ - it->second.mLastTS = now; - it->second.mDataSize += size; + rit->second.mLastTS = time(NULL); + rit->second.mDataSize += size; - mPublisher->sendPkt(data, size, it->second.endpoint, STD_TTL); + mPublisher->sendPkt(data, size, rit->first.mDestAddr, STD_RELAY_TTL); return 1; } - /* otherwise we are likely to be the endpoint */ - std::map::iterator pit = mStreams.find(addrSet); + /* otherwise we are likely to be the endpoint, + * use the peers Address from the header + */ + std::map::iterator pit = mStreams.find(addrSet.mSrcAddr); if (pit != mStreams.end()) { /* we are the end-point */ #ifdef DEBUG_UDP_RELAY std::cerr << "UdpRelayReceiver::recvPkt() Sending to UdpPeer: "; - std::cerr << it->first; + std::cerr << rit->first; std::cerr << std::endl; #endif /* remove the header */ - void *pktdata = (void *) (((uint8_t *) data) + RELAY_HEADER_SIZE); - int pktsize = size - RELAY_HEADER_SIZE; + void *pktdata = (void *) (((uint8_t *) data) + UDP_RELAY_HEADER_SIZE); + int pktsize = size - UDP_RELAY_HEADER_SIZE; if (pktsize > 0) { - (it->second)->recvPkt(pktdata, pktsize); + (pit->second).mPeer->recvPkt(pktdata, pktsize); } else { @@ -337,50 +509,122 @@ int UdpRelayReceiver::recvPkt(void *data, int size, struct sockaddr_in &from) } -int UdpRelayReceiver::sendPkt(const void *data, int size, sockaddr_in &to, int ttl) +/* the address here must be the end point!, + * it cannot be proxy, as we could be using the same proxy for multiple connections. + */ +int UdpRelayReceiver::sendPkt(const void *data, int size, const struct sockaddr_in &to, int ttl) { RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ /* work out who the proxy is */ - std::map::iterator it; - it = mProxies.find(to); - if (it == mProxies.end()) + std::map::iterator it; + it = mStreams.find(to); + if (it == mStreams.end()) { +//#ifdef DEBUG_UDP_RELAY + std::cerr << "UdpRelayReceiver::sendPkt() Peer Unknown!"; + std::cerr << std::endl; +//#endif return 0; } /* add a header to packet */ - char tmpPkt[MAXSIZE]; - int tmpSize = MAXSIZE; + mTmpSendPkt = malloc(MAX_RELAY_UDP_PACKET_SIZE); + mTmpSendSize = MAX_RELAY_UDP_PACKET_SIZE; + + int finalPktSize = createRelayUdpPacket(data, size, mTmpSendPkt, MAX_RELAY_UDP_PACKET_SIZE, &(it->second)); - createRelayUdpPacket(data, size, tmpPkt, tmpSize, it->second); /* send the packet on */ - return mPublisher->send(tmpPkt, tmpSize, it->second.proxyAddr, STD_TTL); + return mPublisher->sendPkt(mTmpSendPkt, finalPktSize, it->second.mProxyAddr, STD_RELAY_TTL); } -class UdpRelayAddrSet -{ - public: +/***** RELAY PACKET FORMAT **************************** + * + * + * [ 0 | 1 | 2 | 3 ] + * + * [ 'R' 'L' 'Y' Version ] + * [ IP Address 1 ] + * [ Port 1 ][ IP Address 2 .... + * ... IP Address 2][ Port 2 ] + * [.... TUNNELLED DATA .... + * + * + * ... ] + * + * 16 Bytes: 4 ID, 6 IP:Port 1, 6 IP:Port 2 + */ - struct sockaddr_in srcAddr; - struct sockaddr_in proxyAddr; - struct sockaddr_in destAddr; -}; +int isUdpRelayPacket(const void *data, const int size) +{ + if (size < UDP_RELAY_HEADER_SIZE) + return 0; + + return (0 == strncmp((char *) data, "RLY1", 4)); +} int extractUdpRelayAddrSet(const void *data, const int size, UdpRelayAddrSet &addrSet) { - + if (size < UDP_RELAY_HEADER_SIZE) + { + std::cerr << "createRelayUdpPacket() ERROR invalid size"; + std::cerr << std::endl; + return 0; + } + + uint8_t *header = (uint8_t *) data; + + sockaddr_clear(&(addrSet.mSrcAddr)); + sockaddr_clear(&(addrSet.mDestAddr)); + + /* as IP:Port are already in network byte order, we can just write them to the dataspace */ + uint32_t ipaddr; + uint16_t port; + + memcpy(&ipaddr, &(header[4]), 4); + memcpy(&port, &(header[8]), 2); + + addrSet.mSrcAddr.sin_addr.s_addr = ipaddr; + addrSet.mSrcAddr.sin_port = port; + + memcpy(&ipaddr, &(header[10]), 4); + memcpy(&port, &(header[14]), 2); + + addrSet.mDestAddr.sin_addr.s_addr = ipaddr; + addrSet.mDestAddr.sin_port = port; return 1; } -int createRelayUdpPacket(const void *data, const int size, void *newpkt, int *newsize, UdpRelayProxy &urp) + +int createRelayUdpPacket(const void *data, const int size, void *newpkt, int newsize, UdpRelayEnd *ure) { + int pktsize = size + UDP_RELAY_HEADER_SIZE; + if (newsize < pktsize) + { + std::cerr << "createRelayUdpPacket() ERROR invalid size"; + std::cerr << std::endl; + return 0; + } + uint8_t *header = (uint8_t *) newpkt; + /* as IP:Port are already in network byte order, we can just write them to the dataspace */ + uint32_t ipaddr = ure->mLocalAddr.sin_addr.s_addr; + uint16_t port = ure->mLocalAddr.sin_port; + memcpy(&(header[4]), &ipaddr, 4); + memcpy(&(header[8]), &port, 2); - return 1; + ipaddr = ure->mRemoteAddr.sin_addr.s_addr; + port = ure->mRemoteAddr.sin_port; + + memcpy(&(header[10]), &ipaddr, 4); + memcpy(&(header[14]), &port, 2); + + memcpy(&(header[16]), data, size); + + return pktsize; } diff --git a/libretroshare/src/tcponudp/udprelay.h b/libretroshare/src/tcponudp/udprelay.h index 49b11960e..c0df784f6 100644 --- a/libretroshare/src/tcponudp/udprelay.h +++ b/libretroshare/src/tcponudp/udprelay.h @@ -27,32 +27,144 @@ */ #include "tcponudp/udppeer.h" +#include -class UdpRelayReceiver: public UdpSubReceiver, public UdpPublisher +class UdpRelayAddrSet; + +class UdpRelayAddrSet +{ + public: + UdpRelayAddrSet(); + + UdpRelayAddrSet flippedSet(); + + struct sockaddr_in mSrcAddr; /* msg source */ + struct sockaddr_in mDestAddr; /* final destination */ +}; + +int operator<(const UdpRelayAddrSet &a, const UdpRelayAddrSet &b); + +class UdpRelayProxy +{ + public: + UdpRelayProxy(); + UdpRelayProxy(UdpRelayAddrSet *addrSet, int relayClass); + + UdpRelayAddrSet mAddrs; + double mBandwidth; + uint32_t mDataSize; + time_t mLastBandwidthTS; + time_t mLastTS; + + int mRelayClass; +}; + + +class UdpRelayEnd +{ + public: + + UdpRelayEnd() { return; } + UdpRelayEnd(UdpPeer *peer, UdpRelayAddrSet *endPoints, const struct sockaddr_in *proxyaddr); + + struct sockaddr_in mLocalAddr; + struct sockaddr_in mProxyAddr; + struct sockaddr_in mRemoteAddr; + + UdpPeer *mPeer; +}; + +std::ostream &operator<<(std::ostream &out, const UdpRelayAddrSet &uras); +std::ostream &operator<<(std::ostream &out, const UdpRelayProxy &urp); +std::ostream &operator<<(std::ostream &out, const UdpRelayEnd &ure); + +/* we define a couple of classes (determining which class is done elsewhere) + * There will be various maximums for each type. + * Ideally you want to allow your friends to use your proxy in preference + * to randoms. + * + * At N x 2 x maxBandwidth. + * + * 10 x 2 x 1Kb/s => 20Kb/s In and Out. (quite a bit!) + * 20 x 2 x 1Kb/s => 40Kb/s Huge. + */ + +#define UDP_RELAY_DEFAULT_COUNT_ALL 10 +#define UDP_RELAY_FRAC_GENERAL (0.2) +#define UDP_RELAY_FRAC_FOF (0.5) +#define UDP_RELAY_FRAC_FRIENDS (0.8) + +#define UDP_RELAY_NUM_CLASS 4 + +#define UDP_RELAY_CLASS_ALL 0 +#define UDP_RELAY_CLASS_GENERAL 1 +#define UDP_RELAY_CLASS_FOF 2 +#define UDP_RELAY_CLASS_FRIENDS 3 + +#define STD_RELAY_TTL 64 + +class UdpRelayReceiver: public UdpSubReceiver { public: UdpRelayReceiver(UdpPublisher *pub); -virtual ~UdpRelayReceiver() { return; } +virtual ~UdpRelayReceiver(); - /* add a TCPonUDP stream */ -int addUdpPeer(UdpPeer *peer, const struct sockaddr_in &raddr); + /* add a TCPonUDP stream (ENDs) */ +int addUdpPeer(UdpPeer *peer, UdpRelayAddrSet *endPoints, const struct sockaddr_in &proxyaddr); int removeUdpPeer(UdpPeer *peer); + /* add a Relay Point (for the Relay). + * These don't have to be explicitly removed. + * They will be timed out when + * the end-points drop the connections + */ + + int addUdpRelay(UdpRelayAddrSet *addrs, int classIdx); + int removeUdpRelay(UdpRelayAddrSet *addrs); + + /* Need some stats, to work out how many relays we are supporting */ + int checkRelays(); + + int setRelayTotal(int count); /* sets all the Relay Counts (frac based on total) */ + int setRelayClassMax(int classIdx, int count); /* set a specific class maximum */ + int getRelayClassMax(int classIdx); + int getRelayCount(int classIdx); /* how many relays (of this type) do we have */ + int RelayStatus(std::ostream &out); + + /* callback for recved data (overloaded from UdpReceiver) */ virtual int recvPkt(void *data, int size, struct sockaddr_in &from); - /* wrapper function for relay (overloaded from UdpPublisher) */ -virtual int sendPkt(const void *data, int size, struct sockaddr_in &to, int ttl); + /* wrapper function for relay (overloaded from UdpSubReceiver) */ +virtual int sendPkt(const void *data, int size, const struct sockaddr_in &to, int ttl); int status(std::ostream &out); private: + int installRelayClass_locked(int classIdx); + int removeRelayClass_locked(int classIdx); + RsMutex peerMtx; /* for all class data (below) */ - std::map streams; + std::vector mClassLimit, mClassCount; + std::map mStreams; /* indexed by */ + std::map mRelays; /* indexed by */ + + void *mTmpSendPkt; + uint32_t mTmpSendSize; }; +/* utility functions for creating / extracting UdpRelayPackets */ +int isUdpRelayPacket(const void *data, const int size); +int getPacketFromUdpRelayPacket(const void *data, const int size, void **realdata, int *realsize); + +int createRelayUdpPacket(const void *data, const int size, void *newpkt, int newsize, UdpRelayEnd *ure); +int extractUdpRelayAddrSet(const void *data, const int size, UdpRelayAddrSet &addrSet); + + + + #endif