Found and Fixed nasty Mutex issue in udprelay. This is caused by reentrant behaviour that udprelay has to support.

* Split the Relay Peer class variables into Data, and UdpPeers.
 * Added second Mutex to protect UdpPeers seperately.
 * Found silly memory leak.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-peernet@4291 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2011-06-18 18:11:46 +00:00
parent dec9586c04
commit 713f512406
2 changed files with 163 additions and 99 deletions

View File

@ -72,45 +72,88 @@ UdpRelayReceiver::~UdpRelayReceiver()
int UdpRelayReceiver::addUdpPeer(UdpPeer *peer, UdpRelayAddrSet *endPoints, const struct sockaddr_in *proxyaddr)
{
RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/
struct sockaddr_in realPeerAddr = endPoints->mDestAddr;
/* check for duplicate */
std::map<struct sockaddr_in, UdpRelayEnd>::iterator it;
it = mStreams.find(realPeerAddr);
bool ok = (it == mStreams.end());
if (!ok)
{
#ifdef DEBUG_UDP_RELAY
std::cerr << "UdpPeerReceiver::addUdpPeer() ERROR Peer already exists!" << std::endl;
#endif
return 0;
RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/
/* check for duplicate */
std::map<struct sockaddr_in, UdpRelayEnd>::iterator it;
it = mStreams.find(realPeerAddr);
bool ok = (it == mStreams.end());
if (!ok)
{
#ifdef DEBUG_UDP_RELAY
std::cerr << "UdpPeerReceiver::addUdpPeer() ERROR Peer already exists!" << std::endl;
#endif
return 0;
}
/* setup a peer */
UdpRelayEnd ure(endPoints, proxyaddr);
mStreams[realPeerAddr] = ure;
}
/* setup a peer */
UdpRelayEnd ure(peer, endPoints, proxyaddr);
mStreams[realPeerAddr] = ure;
{
RsStackMutex stack(udppeerMtx); /********** LOCK MUTEX *********/
#ifdef DEBUG_UDP_RELAY
std::cerr << "UdpPeerReceiver::addUdpPeer() Just installing UdpPeer!" << std::endl;
#endif
/* just overwrite */
mPeers[realPeerAddr] = peer;
}
return 1;
}
int UdpRelayReceiver::removeUdpPeer(UdpPeer *peer)
{
RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/
std::map<struct sockaddr_in, UdpRelayEnd>::iterator it;
for(it = mStreams.begin(); it != mStreams.end(); it++)
bool found = false;
struct sockaddr_in realPeerAddr;
/* cleanup UdpPeer, and get reference for data */
{
if (it->second.mPeer == peer)
RsStackMutex stack(udppeerMtx); /********** LOCK MUTEX *********/
std::map<struct sockaddr_in, UdpPeer *>::iterator it;
for(it = mPeers.begin(); it != mPeers.end(); it++)
{
mStreams.erase(it);
return 1;
if (it->second == peer)
{
mPeers.erase(it);
found = true;
realPeerAddr = it->first;
break;
}
}
}
return 0;
if (!found)
{
return 0;
}
/* now we cleanup the associated data */
{
RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/
std::map<struct sockaddr_in, UdpRelayEnd>::iterator it;
it = mStreams.find(realPeerAddr);
if (it != mStreams.end())
{
mStreams.erase(it);
}
else
{
/* ERROR */
}
}
return 1;
}
@ -122,7 +165,7 @@ int UdpRelayReceiver::removeUdpPeer(UdpPeer *peer)
int UdpRelayReceiver::checkRelays()
{
RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/
RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/
/* iterate through the Relays */
@ -167,15 +210,23 @@ int UdpRelayReceiver::checkRelays()
std::list<UdpRelayAddrSet>::iterator it;
for(it = eraseList.begin(); it != eraseList.end(); it++)
{
removeUdpRelay(&(*it));
removeUdpRelay_relayLocked(&(*it));
}
return 1;
}
int UdpRelayReceiver::removeUdpRelay(UdpRelayAddrSet *addrSet)
{
RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/
return removeUdpRelay_relayLocked(addrSet);
}
int UdpRelayReceiver::addUdpRelay(UdpRelayAddrSet *addrSet, int relayClass)
{
RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/
RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/
/* check for duplicate */
std::map<UdpRelayAddrSet, UdpRelayProxy>::iterator rit = mRelays.find(*addrSet);
@ -189,7 +240,7 @@ int UdpRelayReceiver::addUdpRelay(UdpRelayAddrSet *addrSet, int relayClass)
}
/* will install if there is space! */
if (installRelayClass_locked(relayClass))
if (installRelayClass_relayLocked(relayClass))
{
//#ifdef DEBUG_UDP_RELAY
std::cerr << "UdpRelayReceiver::addUdpRelay() adding Relay" << std::endl;
@ -211,10 +262,10 @@ int UdpRelayReceiver::addUdpRelay(UdpRelayAddrSet *addrSet, int relayClass)
return 0;
}
int UdpRelayReceiver::removeUdpRelay(UdpRelayAddrSet *addrSet)
{
RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/
int UdpRelayReceiver::removeUdpRelay_relayLocked(UdpRelayAddrSet *addrSet)
{
/* find in Relay list */
std::map<UdpRelayAddrSet, UdpRelayProxy>::iterator rit = mRelays.find(*addrSet);
if (rit == mRelays.end())
@ -227,7 +278,7 @@ int UdpRelayReceiver::removeUdpRelay(UdpRelayAddrSet *addrSet)
else
{
/* lets drop the count here too */
removeRelayClass_locked(rit->second.mRelayClass);
removeRelayClass_relayLocked(rit->second.mRelayClass);
mRelays.erase(rit);
}
@ -250,7 +301,7 @@ int UdpRelayReceiver::removeUdpRelay(UdpRelayAddrSet *addrSet)
}
/* Need some stats, to work out how many relays we are supporting */
int UdpRelayReceiver::installRelayClass_locked(int classIdx)
int UdpRelayReceiver::installRelayClass_relayLocked(int classIdx)
{
/* check for total number of Relays */
if (mClassCount[UDP_RELAY_CLASS_ALL] >= mClassLimit[UDP_RELAY_CLASS_ALL])
@ -287,7 +338,7 @@ int UdpRelayReceiver::installRelayClass_locked(int classIdx)
return 1;
}
int UdpRelayReceiver::removeRelayClass_locked(int classIdx)
int UdpRelayReceiver::removeRelayClass_relayLocked(int classIdx)
{
/* check for total number of Relays */
if (mClassCount[UDP_RELAY_CLASS_ALL] < 1)
@ -327,7 +378,7 @@ int UdpRelayReceiver::removeRelayClass_locked(int classIdx)
int UdpRelayReceiver::setRelayTotal(int count)
{
RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/
RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/
mClassLimit[UDP_RELAY_CLASS_ALL] = count;
mClassLimit[UDP_RELAY_CLASS_GENERAL] = (int) (UDP_RELAY_FRAC_GENERAL * count);
@ -340,7 +391,7 @@ int UdpRelayReceiver::setRelayTotal(int count)
int UdpRelayReceiver::setRelayClassMax(int classIdx, int count)
{
RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/
RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/
/* check the idx */
if ((classIdx < 0) || (classIdx >= UDP_RELAY_NUM_CLASS))
@ -357,7 +408,7 @@ int UdpRelayReceiver::setRelayClassMax(int classIdx, int count)
int UdpRelayReceiver::getRelayClassMax(int classIdx)
{
RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/
RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/
/* check the idx */
if ((classIdx < 0) || (classIdx >= UDP_RELAY_NUM_CLASS))
@ -372,7 +423,7 @@ int UdpRelayReceiver::getRelayClassMax(int classIdx)
int UdpRelayReceiver::getRelayCount(int classIdx)
{
RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/
RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/
/* check the idx */
if ((classIdx < 0) || (classIdx >= UDP_RELAY_NUM_CLASS))
@ -388,7 +439,7 @@ int UdpRelayReceiver::getRelayCount(int classIdx)
int UdpRelayReceiver::RelayStatus(std::ostream &out)
{
RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/
RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/
/* iterate through the Relays */
out << "UdpRelayReceiver::RelayStatus()";
@ -416,7 +467,7 @@ int UdpRelayReceiver::status(std::ostream &out)
RelayStatus(out);
RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/
RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/
out << "UdpRelayReceiver::Connections:" << std::endl;
@ -452,8 +503,6 @@ int UdpRelayReceiver::recvPkt(void *data, int size, struct sockaddr_in &from)
return 0;
}
RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/
/* decide if we are the relay, or the endpoint */
UdpRelayAddrSet addrSet;
if (!extractUdpRelayAddrSet(data, size, addrSet))
@ -461,55 +510,64 @@ int UdpRelayReceiver::recvPkt(void *data, int size, struct sockaddr_in &from)
/* fails most basic test, drop */
return 0;
}
/* lookup relay first (double entries) */
std::map<UdpRelayAddrSet, UdpRelayProxy>::iterator rit = mRelays.find(addrSet);
if (rit != mRelays.end())
{
/* we are the relay */
#ifdef DEBUG_UDP_RELAY
std::cerr << "UdpRelayReceiver::recvPkt() We are the Relay. Passing onto: ";
std::cerr << rit->first.mDestAddr;
std::cerr << std::endl;
#endif
/* do accounting */
rit->second.mLastTS = time(NULL);
rit->second.mDataSize += size;
mPublisher->sendPkt(data, size, rit->first.mDestAddr, STD_RELAY_TTL);
return 1;
{
RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/
/* lookup relay first (double entries) */
std::map<UdpRelayAddrSet, UdpRelayProxy>::iterator rit = mRelays.find(addrSet);
if (rit != mRelays.end())
{
/* we are the relay */
#ifdef DEBUG_UDP_RELAY
std::cerr << "UdpRelayReceiver::recvPkt() We are the Relay. Passing onto: ";
std::cerr << rit->first.mDestAddr;
std::cerr << std::endl;
#endif
/* do accounting */
rit->second.mLastTS = time(NULL);
rit->second.mDataSize += size;
mPublisher->sendPkt(data, size, rit->first.mDestAddr, STD_RELAY_TTL);
return 1;
}
}
/* otherwise we are likely to be the endpoint,
* use the peers Address from the header
*/
std::map<struct sockaddr_in, UdpRelayEnd>::iterator pit = mStreams.find(addrSet.mSrcAddr);
if (pit != mStreams.end())
{
/* we are the end-point */
#ifdef DEBUG_UDP_RELAY
std::cerr << "UdpRelayReceiver::recvPkt() Sending to UdpPeer: ";
std::cerr << rit->first;
std::cerr << std::endl;
#endif
/* remove the header */
void *pktdata = (void *) (((uint8_t *) data) + UDP_RELAY_HEADER_SIZE);
int pktsize = size - UDP_RELAY_HEADER_SIZE;
if (pktsize > 0)
RsStackMutex stack(udppeerMtx); /********** LOCK MUTEX *********/
std::map<struct sockaddr_in, UdpPeer *>::iterator pit = mPeers.find(addrSet.mSrcAddr);
if (pit != mPeers.end())
{
(pit->second).mPeer->recvPkt(pktdata, pktsize);
}
else
{
/* packet undersized */
#ifdef DEBUG_UDP_RELAY
std::cerr << "UdpRelayReceiver::recvPkt() ERROR Packet Undersized";
/* we are the end-point */
#ifdef DEBUG_UDP_RELAY
std::cerr << "UdpRelayReceiver::recvPkt() Sending to UdpPeer: ";
std::cerr << pit->first;
std::cerr << std::endl;
#endif
#endif
/* remove the header */
void *pktdata = (void *) (((uint8_t *) data) + UDP_RELAY_HEADER_SIZE);
int pktsize = size - UDP_RELAY_HEADER_SIZE;
if (pktsize > 0)
{
(pit->second)->recvPkt(pktdata, pktsize);
}
else
{
/* packet undersized */
#ifdef DEBUG_UDP_RELAY
std::cerr << "UdpRelayReceiver::recvPkt() ERROR Packet Undersized";
std::cerr << std::endl;
#endif
}
return 1;
}
return 1;
/* done */
}
/* done */
/* unknown */
#ifdef DEBUG_UDP_RELAY
@ -525,10 +583,10 @@ int UdpRelayReceiver::recvPkt(void *data, int size, struct sockaddr_in &from)
*/
int UdpRelayReceiver::sendPkt(const void *data, int size, const struct sockaddr_in &to, int ttl)
{
RsStackMutex stack(peerMtx); /********** LOCK MUTEX *********/
RsStackMutex stack(relayMtx); /********** LOCK MUTEX *********/
/* work out who the proxy is */
std::map<struct sockaddr_in, UdpRelayEnd>::iterator it;
std::map<struct sockaddr_in, UdpRelayEnd>::iterator it;
it = mStreams.find(to);
if (it == mStreams.end())
{
@ -538,14 +596,10 @@ int UdpRelayReceiver::sendPkt(const void *data, int size, const struct sockaddr_
//#endif
return 0;
}
/* add a header to packet */
mTmpSendPkt = malloc(MAX_RELAY_UDP_PACKET_SIZE);
mTmpSendSize = MAX_RELAY_UDP_PACKET_SIZE;
int finalPktSize = createRelayUdpPacket(data, size, mTmpSendPkt, MAX_RELAY_UDP_PACKET_SIZE, &(it->second));
/* send the packet on */
return mPublisher->sendPkt(mTmpSendPkt, finalPktSize, it->second.mProxyAddr, STD_RELAY_TTL);
}
@ -738,12 +792,10 @@ UdpRelayEnd::UdpRelayEnd()
sockaddr_clear(&mProxyAddr);
sockaddr_clear(&mRemoteAddr);
mPeer = NULL;
}
UdpRelayEnd::UdpRelayEnd(UdpPeer *peer, UdpRelayAddrSet *endPoints, const struct sockaddr_in *proxyaddr)
UdpRelayEnd::UdpRelayEnd(UdpRelayAddrSet *endPoints, const struct sockaddr_in *proxyaddr)
{
mPeer = peer;
mLocalAddr = endPoints->mSrcAddr;
mRemoteAddr = endPoints->mDestAddr;
mProxyAddr = *proxyaddr;

View File

@ -66,13 +66,11 @@ class UdpRelayEnd
public:
UdpRelayEnd();
UdpRelayEnd(UdpPeer *peer, UdpRelayAddrSet *endPoints, const struct sockaddr_in *proxyaddr);
UdpRelayEnd(UdpRelayAddrSet *endPoints, const struct sockaddr_in *proxyaddr);
struct sockaddr_in mLocalAddr;
struct sockaddr_in mProxyAddr;
struct sockaddr_in mRemoteAddr;
UdpPeer *mPeer;
};
std::ostream &operator<<(std::ostream &out, const UdpRelayAddrSet &uras);
@ -144,10 +142,24 @@ int status(std::ostream &out);
private:
int installRelayClass_locked(int classIdx);
int removeRelayClass_locked(int classIdx);
int removeUdpRelay_relayLocked(UdpRelayAddrSet *addrs);
int installRelayClass_relayLocked(int classIdx);
int removeRelayClass_relayLocked(int classIdx);
RsMutex peerMtx; /* for all class data (below) */
/* Unfortunately, Due the reentrant nature of this classes activities...
* the SendPkt() must be callable from inside RecvPkt().
* This means we need two seperate mutexes.
* - one for UdpPeer's, and one for Relay Data.
*
* care must be taken to lock these mutex's in a consistent manner to avoid deadlock.
* - You are not allowed to hold both at the same time!
*/
RsMutex udppeerMtx; /* for all class data (below) */
std::map<struct sockaddr_in, UdpPeer *> mPeers; /* indexed by <dest> */
RsMutex relayMtx; /* for all class data (below) */
std::vector<int> mClassLimit, mClassCount;
std::map<struct sockaddr_in, UdpRelayEnd> mStreams; /* indexed by <dest> */