diff --git a/libretroshare/src/tcponudp/udprelay.cc b/libretroshare/src/tcponudp/udprelay.cc index dbf728659..2223cadf7 100644 --- a/libretroshare/src/tcponudp/udprelay.cc +++ b/libretroshare/src/tcponudp/udprelay.cc @@ -72,45 +72,88 @@ UdpRelayReceiver::~UdpRelayReceiver() int UdpRelayReceiver::addUdpPeer(UdpPeer *peer, UdpRelayAddrSet *endPoints, const struct sockaddr_in *proxyaddr) { - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ - struct sockaddr_in realPeerAddr = endPoints->mDestAddr; - - /* check for duplicate */ - std::map::iterator it; - it = mStreams.find(realPeerAddr); - bool ok = (it == mStreams.end()); - if (!ok) { -#ifdef DEBUG_UDP_RELAY - std::cerr << "UdpPeerReceiver::addUdpPeer() ERROR Peer already exists!" << std::endl; -#endif - return 0; + RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/ + + /* check for duplicate */ + std::map::iterator it; + it = mStreams.find(realPeerAddr); + bool ok = (it == mStreams.end()); + if (!ok) + { + #ifdef DEBUG_UDP_RELAY + std::cerr << "UdpPeerReceiver::addUdpPeer() ERROR Peer already exists!" << std::endl; + #endif + return 0; + } + + /* setup a peer */ + UdpRelayEnd ure(endPoints, proxyaddr); + + mStreams[realPeerAddr] = ure; } - - /* setup a peer */ - UdpRelayEnd ure(peer, endPoints, proxyaddr); - - mStreams[realPeerAddr] = ure; - + + { + RsStackMutex stack(udppeerMtx); /********** LOCK MUTEX *********/ + + +#ifdef DEBUG_UDP_RELAY + std::cerr << "UdpPeerReceiver::addUdpPeer() Just installing UdpPeer!" << std::endl; +#endif + + /* just overwrite */ + mPeers[realPeerAddr] = peer; + } + return 1; } int UdpRelayReceiver::removeUdpPeer(UdpPeer *peer) { - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ - - std::map::iterator it; - for(it = mStreams.begin(); it != mStreams.end(); it++) + bool found = false; + struct sockaddr_in realPeerAddr; + + /* cleanup UdpPeer, and get reference for data */ { - if (it->second.mPeer == peer) + RsStackMutex stack(udppeerMtx); /********** LOCK MUTEX *********/ + + std::map::iterator it; + for(it = mPeers.begin(); it != mPeers.end(); it++) { - mStreams.erase(it); - return 1; + if (it->second == peer) + { + mPeers.erase(it); + found = true; + realPeerAddr = it->first; + + break; + } } } - return 0; + + if (!found) + { + return 0; + } + + /* now we cleanup the associated data */ + { + RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/ + + std::map::iterator it; + it = mStreams.find(realPeerAddr); + if (it != mStreams.end()) + { + mStreams.erase(it); + } + else + { + /* ERROR */ + } + } + return 1; } @@ -122,7 +165,7 @@ int UdpRelayReceiver::removeUdpPeer(UdpPeer *peer) int UdpRelayReceiver::checkRelays() { - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/ /* iterate through the Relays */ @@ -167,15 +210,23 @@ int UdpRelayReceiver::checkRelays() std::list::iterator it; for(it = eraseList.begin(); it != eraseList.end(); it++) { - removeUdpRelay(&(*it)); + removeUdpRelay_relayLocked(&(*it)); } return 1; } +int UdpRelayReceiver::removeUdpRelay(UdpRelayAddrSet *addrSet) +{ + RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/ + + return removeUdpRelay_relayLocked(addrSet); +} + + int UdpRelayReceiver::addUdpRelay(UdpRelayAddrSet *addrSet, int relayClass) { - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/ /* check for duplicate */ std::map::iterator rit = mRelays.find(*addrSet); @@ -189,7 +240,7 @@ int UdpRelayReceiver::addUdpRelay(UdpRelayAddrSet *addrSet, int relayClass) } /* will install if there is space! */ - if (installRelayClass_locked(relayClass)) + if (installRelayClass_relayLocked(relayClass)) { //#ifdef DEBUG_UDP_RELAY std::cerr << "UdpRelayReceiver::addUdpRelay() adding Relay" << std::endl; @@ -211,10 +262,10 @@ int UdpRelayReceiver::addUdpRelay(UdpRelayAddrSet *addrSet, int relayClass) return 0; } -int UdpRelayReceiver::removeUdpRelay(UdpRelayAddrSet *addrSet) -{ - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + +int UdpRelayReceiver::removeUdpRelay_relayLocked(UdpRelayAddrSet *addrSet) +{ /* find in Relay list */ std::map::iterator rit = mRelays.find(*addrSet); if (rit == mRelays.end()) @@ -227,7 +278,7 @@ int UdpRelayReceiver::removeUdpRelay(UdpRelayAddrSet *addrSet) else { /* lets drop the count here too */ - removeRelayClass_locked(rit->second.mRelayClass); + removeRelayClass_relayLocked(rit->second.mRelayClass); mRelays.erase(rit); } @@ -250,7 +301,7 @@ int UdpRelayReceiver::removeUdpRelay(UdpRelayAddrSet *addrSet) } /* Need some stats, to work out how many relays we are supporting */ -int UdpRelayReceiver::installRelayClass_locked(int classIdx) +int UdpRelayReceiver::installRelayClass_relayLocked(int classIdx) { /* check for total number of Relays */ if (mClassCount[UDP_RELAY_CLASS_ALL] >= mClassLimit[UDP_RELAY_CLASS_ALL]) @@ -287,7 +338,7 @@ int UdpRelayReceiver::installRelayClass_locked(int classIdx) return 1; } -int UdpRelayReceiver::removeRelayClass_locked(int classIdx) +int UdpRelayReceiver::removeRelayClass_relayLocked(int classIdx) { /* check for total number of Relays */ if (mClassCount[UDP_RELAY_CLASS_ALL] < 1) @@ -327,7 +378,7 @@ int UdpRelayReceiver::removeRelayClass_locked(int classIdx) int UdpRelayReceiver::setRelayTotal(int count) { - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/ mClassLimit[UDP_RELAY_CLASS_ALL] = count; mClassLimit[UDP_RELAY_CLASS_GENERAL] = (int) (UDP_RELAY_FRAC_GENERAL * count); @@ -340,7 +391,7 @@ int UdpRelayReceiver::setRelayTotal(int count) int UdpRelayReceiver::setRelayClassMax(int classIdx, int count) { - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/ /* check the idx */ if ((classIdx < 0) || (classIdx >= UDP_RELAY_NUM_CLASS)) @@ -357,7 +408,7 @@ int UdpRelayReceiver::setRelayClassMax(int classIdx, int count) int UdpRelayReceiver::getRelayClassMax(int classIdx) { - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/ /* check the idx */ if ((classIdx < 0) || (classIdx >= UDP_RELAY_NUM_CLASS)) @@ -372,7 +423,7 @@ int UdpRelayReceiver::getRelayClassMax(int classIdx) int UdpRelayReceiver::getRelayCount(int classIdx) { - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/ /* check the idx */ if ((classIdx < 0) || (classIdx >= UDP_RELAY_NUM_CLASS)) @@ -388,7 +439,7 @@ int UdpRelayReceiver::getRelayCount(int classIdx) int UdpRelayReceiver::RelayStatus(std::ostream &out) { - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/ /* iterate through the Relays */ out << "UdpRelayReceiver::RelayStatus()"; @@ -416,7 +467,7 @@ int UdpRelayReceiver::status(std::ostream &out) RelayStatus(out); - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ + RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/ out << "UdpRelayReceiver::Connections:" << std::endl; @@ -452,8 +503,6 @@ int UdpRelayReceiver::recvPkt(void *data, int size, struct sockaddr_in &from) return 0; } - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ - /* decide if we are the relay, or the endpoint */ UdpRelayAddrSet addrSet; if (!extractUdpRelayAddrSet(data, size, addrSet)) @@ -461,55 +510,64 @@ int UdpRelayReceiver::recvPkt(void *data, int size, struct sockaddr_in &from) /* fails most basic test, drop */ return 0; } - - /* lookup relay first (double entries) */ - std::map::iterator rit = mRelays.find(addrSet); - if (rit != mRelays.end()) - { - /* we are the relay */ -#ifdef DEBUG_UDP_RELAY - std::cerr << "UdpRelayReceiver::recvPkt() We are the Relay. Passing onto: "; - std::cerr << rit->first.mDestAddr; - std::cerr << std::endl; -#endif - /* do accounting */ - rit->second.mLastTS = time(NULL); - rit->second.mDataSize += size; - mPublisher->sendPkt(data, size, rit->first.mDestAddr, STD_RELAY_TTL); - return 1; + { + RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/ + + /* lookup relay first (double entries) */ + std::map::iterator rit = mRelays.find(addrSet); + if (rit != mRelays.end()) + { + /* we are the relay */ + #ifdef DEBUG_UDP_RELAY + std::cerr << "UdpRelayReceiver::recvPkt() We are the Relay. Passing onto: "; + std::cerr << rit->first.mDestAddr; + std::cerr << std::endl; + #endif + /* do accounting */ + rit->second.mLastTS = time(NULL); + rit->second.mDataSize += size; + + mPublisher->sendPkt(data, size, rit->first.mDestAddr, STD_RELAY_TTL); + return 1; + } } /* otherwise we are likely to be the endpoint, * use the peers Address from the header */ - std::map::iterator pit = mStreams.find(addrSet.mSrcAddr); - if (pit != mStreams.end()) + { - /* we are the end-point */ -#ifdef DEBUG_UDP_RELAY - std::cerr << "UdpRelayReceiver::recvPkt() Sending to UdpPeer: "; - std::cerr << rit->first; - std::cerr << std::endl; -#endif - /* remove the header */ - void *pktdata = (void *) (((uint8_t *) data) + UDP_RELAY_HEADER_SIZE); - int pktsize = size - UDP_RELAY_HEADER_SIZE; - if (pktsize > 0) + RsStackMutex stack(udppeerMtx); /********** LOCK MUTEX *********/ + + std::map::iterator pit = mPeers.find(addrSet.mSrcAddr); + if (pit != mPeers.end()) { - (pit->second).mPeer->recvPkt(pktdata, pktsize); - } - else - { - /* packet undersized */ -#ifdef DEBUG_UDP_RELAY - std::cerr << "UdpRelayReceiver::recvPkt() ERROR Packet Undersized"; + /* we are the end-point */ + #ifdef DEBUG_UDP_RELAY + std::cerr << "UdpRelayReceiver::recvPkt() Sending to UdpPeer: "; + std::cerr << pit->first; std::cerr << std::endl; -#endif + #endif + /* remove the header */ + void *pktdata = (void *) (((uint8_t *) data) + UDP_RELAY_HEADER_SIZE); + int pktsize = size - UDP_RELAY_HEADER_SIZE; + if (pktsize > 0) + { + (pit->second)->recvPkt(pktdata, pktsize); + } + else + { + /* packet undersized */ + #ifdef DEBUG_UDP_RELAY + std::cerr << "UdpRelayReceiver::recvPkt() ERROR Packet Undersized"; + std::cerr << std::endl; + #endif + } + return 1; } - return 1; + /* done */ } - /* done */ /* unknown */ #ifdef DEBUG_UDP_RELAY @@ -525,10 +583,10 @@ int UdpRelayReceiver::recvPkt(void *data, int size, struct sockaddr_in &from) */ int UdpRelayReceiver::sendPkt(const void *data, int size, const struct sockaddr_in &to, int ttl) { - RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/ - + RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/ + /* work out who the proxy is */ - std::map::iterator it; + std::map::iterator it; it = mStreams.find(to); if (it == mStreams.end()) { @@ -538,14 +596,10 @@ int UdpRelayReceiver::sendPkt(const void *data, int size, const struct sockaddr_ //#endif return 0; } - + /* add a header to packet */ - mTmpSendPkt = malloc(MAX_RELAY_UDP_PACKET_SIZE); - mTmpSendSize = MAX_RELAY_UDP_PACKET_SIZE; - int finalPktSize = createRelayUdpPacket(data, size, mTmpSendPkt, MAX_RELAY_UDP_PACKET_SIZE, &(it->second)); - /* send the packet on */ return mPublisher->sendPkt(mTmpSendPkt, finalPktSize, it->second.mProxyAddr, STD_RELAY_TTL); } @@ -738,12 +792,10 @@ UdpRelayEnd::UdpRelayEnd() sockaddr_clear(&mProxyAddr); sockaddr_clear(&mRemoteAddr); - mPeer = NULL; } -UdpRelayEnd::UdpRelayEnd(UdpPeer *peer, UdpRelayAddrSet *endPoints, const struct sockaddr_in *proxyaddr) +UdpRelayEnd::UdpRelayEnd(UdpRelayAddrSet *endPoints, const struct sockaddr_in *proxyaddr) { - mPeer = peer; mLocalAddr = endPoints->mSrcAddr; mRemoteAddr = endPoints->mDestAddr; mProxyAddr = *proxyaddr; diff --git a/libretroshare/src/tcponudp/udprelay.h b/libretroshare/src/tcponudp/udprelay.h index 80e810d59..12df5a6a9 100644 --- a/libretroshare/src/tcponudp/udprelay.h +++ b/libretroshare/src/tcponudp/udprelay.h @@ -66,13 +66,11 @@ class UdpRelayEnd public: UdpRelayEnd(); - UdpRelayEnd(UdpPeer *peer, UdpRelayAddrSet *endPoints, const struct sockaddr_in *proxyaddr); + UdpRelayEnd(UdpRelayAddrSet *endPoints, const struct sockaddr_in *proxyaddr); struct sockaddr_in mLocalAddr; struct sockaddr_in mProxyAddr; struct sockaddr_in mRemoteAddr; - - UdpPeer *mPeer; }; std::ostream &operator<<(std::ostream &out, const UdpRelayAddrSet &uras); @@ -144,10 +142,24 @@ int status(std::ostream &out); private: - int installRelayClass_locked(int classIdx); - int removeRelayClass_locked(int classIdx); + int removeUdpRelay_relayLocked(UdpRelayAddrSet *addrs); + int installRelayClass_relayLocked(int classIdx); + int removeRelayClass_relayLocked(int classIdx); - RsMutex peerMtx; /* for all class data (below) */ + /* Unfortunately, Due the reentrant nature of this classes activities... + * the SendPkt() must be callable from inside RecvPkt(). + * This means we need two seperate mutexes. + * - one for UdpPeer's, and one for Relay Data. + * + * care must be taken to lock these mutex's in a consistent manner to avoid deadlock. + * - You are not allowed to hold both at the same time! + */ + + RsMutex udppeerMtx; /* for all class data (below) */ + + std::map mPeers; /* indexed by */ + + RsMutex relayMtx; /* for all class data (below) */ std::vector mClassLimit, mClassCount; std::map mStreams; /* indexed by */