The first commit of the new UDP Connection methods and

the rewrite of the retroshare core networking stack.

This check-in commits the changes to the TCPonUCP code.

This library has been significantly modified to support
multiple UDP "connections" from a single port.
This requires some trickery and a listener thread.
Code to "STUN" peers was also added.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@305 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2008-01-25 06:11:39 +00:00
parent b0f31a1340
commit de0ce110b9
14 changed files with 1610 additions and 658 deletions

View File

@ -3,23 +3,37 @@ RS_TOP_DIR=..
include $(RS_TOP_DIR)/make.opt
EXECS = librs udp_server test_tou pair_tou reset_tou internal_tou largefile_tou
EXECS = librs udpsock_test udpsort_test udp_server
#test_tou pair_tou reset_tou internal_tou largefile_tou
OBJ = tcpstream.o tcppacket.o udplayer.o tou_net.o tou.o
OBJ = tou_net.o udplayer.o udpsorter.o udptestfn.o
OBJ += tcppacket.o tcpstream.o tou.o
#tou.o
all : $(OBJ) $(EXECS)
.cc.o:
$(CC) $(CFLAGS) -c $<
udp_server: $(OBJ) udp_server.o
$(CC) $(CFLAGS) -o udp_server $(OBJ) udp_server.o $(LIBS)
clean:
-$(RM) $(OBJ) $(BIOOBJ) test_tou.o pair_tou.o udp_server.o reset_tou.o internal_tou.o largefile_tou.o
-$(RM) $(OBJ) $(BIOOBJ) udpsock_test.o udpsort_test.o udp_server.o
#test_tou.o pair_tou.o udp_server.o reset_tou.o internal_tou.o largefile_tou.o
clobber: clean
-$(RM) udp_server test_tou pair_tou reset_tou internal_tou largefile_tou libtou.so ../lib/libtou.a
-$(RM) udpsock_test udpsort_test udp_server
#test_tou pair_tou reset_tou internal_tou largefile_tou libtou.so ../lib/libtou.a
udpsock_test : $(OBJ) udpsock_test.o
$(CC) $(CFLAGS) -o udpsock_test $(OBJ) udpsock_test.o -lpthread -L../lib -lretroshare
udpsort_test : $(OBJ) udpsort_test.o
$(CC) $(CFLAGS) -o udpsort_test $(OBJ) udpsort_test.o -lpthread -L../lib -lretroshare
udp_server: $(OBJ) udp_server.o
$(CC) $(CFLAGS) -o udp_server $(OBJ) udp_server.o -lpthread -L../lib -lretroshare
test_tou : $(OBJ) test_tou.o
$(CC) $(CFLAGS) -o test_tou $(OBJ) test_tou.o $(LIBS)

View File

@ -39,6 +39,7 @@
/*
* #define DEBUG_TCP_STREAM 1
*/
#define DEBUG_TCP_STREAM 1
/*
*#define DEBUG_TCP_STREAM_EXTRA 1
@ -64,8 +65,7 @@ static const double RTT_ALPHA = 0.875;
// platform independent fractional timestamp.
static double getCurrentTS();
TcpStream::TcpStream(UdpLayer *lyr)
TcpStream::TcpStream(UdpSorter *lyr)
:inSize(0), outSizeRead(0), outSizeNet(0),
state(TCP_CLOSED),
inStreamActive(false),
@ -84,19 +84,25 @@ TcpStream::TcpStream(UdpLayer *lyr)
congestThreshold(TCP_MAX_WIN),
congestWinSize(MAX_SEG),
congestUpdate(0),
peerKnown(false),
udp(lyr)
{
return;
}
/* Stream Control! */
int TcpStream::connect()
int TcpStream::connect(const struct sockaddr_in &raddr)
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
setRemoteAddress(raddr);
/* check state */
if (state != TCP_CLOSED)
{
if (state == TCP_ESTABLISHED)
{
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return 0;
}
else if (state < TCP_ESTABLISHED)
@ -108,10 +114,10 @@ int TcpStream::connect()
// major issues!
errorState = EFAULT;
}
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return -1;
}
/* setup Seqnos */
outSeqno = genSequenceNo();
initOurSeqno = outSeqno;
@ -140,25 +146,68 @@ int TcpStream::connect()
* This should help the firewalls along.
*/
udp -> setTTL(1);
setTTL(1);
toSend(pkt);
/* change state */
state = TCP_SYN_SENT;
std::cerr << "TcpStream STATE -> TCP_SYN_SENT" << std::endl;
errorState = EAGAIN;
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return -1;
}
int TcpStream::listenfor(const struct sockaddr_in &raddr)
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
setRemoteAddress(raddr);
/* check state */
if (state != TCP_CLOSED)
{
if (state == TCP_ESTABLISHED)
{
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return 0;
}
else if (state < TCP_ESTABLISHED)
{
errorState = EAGAIN;
}
else
{
// major issues!
errorState = EFAULT;
}
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return -1;
}
errorState = EAGAIN;
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return -1;
}
/* Stream Control! */
int TcpStream::close()
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
cleanup();
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return 0;
}
int TcpStream::closeWrite()
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
/* check state */
/* will always close socket.... */
/* if in TCP_ESTABLISHED....
@ -206,6 +255,7 @@ int TcpStream::closeWrite()
#ifdef DEBUG_TCP_STREAM
std::cerr << "TcpStream::close() Flag Set" << std::endl;
#endif
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return 0;
}
@ -213,67 +263,28 @@ int TcpStream::closeWrite()
std::cerr << "TcpStream::close() pending" << std::endl;
#endif
errorState = EAGAIN;
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return -1;
}
int TcpStream::cleanup()
{
// This shuts it all down! no matter what.
outStreamActive = false;
inStreamActive = false;
state = TCP_CLOSED;
std::cerr << "TcpStream STATE -> TCP_CLOSED" << std::endl;
/* Ensure that TTL -> STD for further STUN ATTEMPTS */
if (udp)
{
udp -> setTTL(TCP_STD_TTL);
}
// clear arrays.
inSize = 0;
while(inQueue.size() > 0)
{
dataBuffer *db = inQueue.front();
inQueue.pop_front();
delete db;
}
while(outPkt.size() > 0)
{
TcpPacket *pkt = outPkt.front();
outPkt.pop_front();
delete pkt;
}
// clear arrays.
outSizeRead = 0;
outSizeNet = 0;
while(outQueue.size() > 0)
{
dataBuffer *db = outQueue.front();
outQueue.pop_front();
delete db;
}
while(inPkt.size() > 0)
{
TcpPacket *pkt = inPkt.front();
inPkt.pop_front();
delete pkt;
}
return 1;
}
bool TcpStream::isConnected()
{
return (state == TCP_ESTABLISHED);
tcpMtx.lock(); /********** LOCK MUTEX *********/
bool isConn = (state == TCP_ESTABLISHED);
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return isConn;
}
int TcpStream::status(std::ostream &out)
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
int tmpstate = state;
// can leave the timestamp here as time()... rough but okay.
out << "TcpStream::status @ (" << time(NULL) << ")" << std::endl;
out << "TcpStream::state = " << (int) state << std::endl;
@ -295,53 +306,78 @@ int TcpStream::status(std::ostream &out)
out << " winsize: " << inWinSize;
out << std::endl;
out << std::endl;
return state;
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return tmpstate;
}
int TcpStream::write_allowed()
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
int ret = 1;
if (state == TCP_CLOSED)
{
errorState = EBADF;
return -1;
ret = -1;
}
else if (state < TCP_ESTABLISHED)
{
errorState = EAGAIN;
return -1;
ret = -1;
}
else if (!outStreamActive)
{
errorState = EBADF;
return -1;
ret = -1;
}
if (ret < 1)
{
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return ret;
}
int maxwrite = (kMaxQueueSize - inQueue.size()) * MAX_SEG;
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return maxwrite;
}
int TcpStream::read_pending()
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
/* error should be detected next time */
int maxread = outSizeRead + outQueue.size() * MAX_SEG + outSizeNet;
int maxread = int_read_pending();
if (state == TCP_CLOSED)
{
errorState = EBADF;
return -1;
maxread = -1;
}
else if (state < TCP_ESTABLISHED)
{
errorState = EAGAIN;
return -1;
maxread = -1;
}
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return maxread;
}
/* INTERNAL */
int TcpStream::int_read_pending()
{
return outSizeRead + outQueue.size() * MAX_SEG + outSizeNet;
}
/* stream Interface */
int TcpStream::write(char *dta, int size) /* write -> pkt -> net */
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
int ret = 1; /* initial error checking */
#ifdef DEBUG_TCP_STREAM_EXTRA
static uint32 TMPtotalwrite = 0;
@ -353,7 +389,7 @@ static uint32 TMPtotalwrite = 0;
std::cerr << "TcpStream::write() Error TCP_CLOSED" << std::endl;
#endif
errorState = EBADF;
return -1;
ret = -1;
}
else if (state < TCP_ESTABLISHED)
{
@ -361,7 +397,7 @@ static uint32 TMPtotalwrite = 0;
std::cerr << "TcpStream::write() Error TCP Not Established" << std::endl;
#endif
errorState = EAGAIN;
return -1;
ret = -1;
}
else if (inQueue.size() > kMaxQueueSize)
{
@ -369,7 +405,7 @@ static uint32 TMPtotalwrite = 0;
std::cerr << "TcpStream::write() Error EAGAIN" << std::endl;
#endif
errorState = EAGAIN;
return -1;
ret = -1;
}
else if (!outStreamActive)
{
@ -377,9 +413,16 @@ static uint32 TMPtotalwrite = 0;
std::cerr << "TcpStream::write() Error TCP_CLOSED" << std::endl;
#endif
errorState = EBADF;
return -1;
ret = -1;
}
if (ret < 1) /* check for initial error */
{
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return ret;
}
#ifdef DEBUG_TCP_STREAM_EXTRA
std::cerr << "TcpStream::write() = Will Succeed " << size << std::endl;
std::cerr << "TcpStream::write() Write Start: " << TMPtotalwrite << std::endl;
@ -404,6 +447,8 @@ static uint32 TMPtotalwrite = 0;
inSize += size;
//std::cerr << "Small Packet - write to net:" << std::endl;
//std::cerr << printPkt(dta, size) << std::endl;
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return size;
}
@ -459,7 +504,6 @@ static uint32 TMPtotalwrite = 0;
#endif
memcpy((void *) inData, (void *) &(dta[size-remSize]), remSize);
inSize = remSize;
return size;
}
else
{
@ -470,11 +514,14 @@ static uint32 TMPtotalwrite = 0;
inSize = 0;
}
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return size;
}
int TcpStream::read(char *dta, int size) /* net -> pkt -> read */
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
#ifdef DEBUG_TCP_STREAM_EXTRA
static uint32 TMPtotalread = 0;
#endif
@ -483,28 +530,34 @@ static uint32 TMPtotalread = 0;
*/
int maxread = outSizeRead + outQueue.size() * MAX_SEG + outSizeNet;
int ret = 1; /* used only for initial errors */
if (state == TCP_CLOSED)
{
errorState = EBADF;
return -1;
ret = -1;
}
else if (state < TCP_ESTABLISHED)
{
errorState = EAGAIN;
return -1;
ret = -1;
}
else if ((!inStreamActive) && (maxread == 0))
{
// finished stream.
return 0;
ret = 0;
}
if (maxread == 0)
else if (maxread == 0)
{
/* must wait for more data */
errorState = EAGAIN;
return -1;
ret = -1;
}
if (ret < 1) /* if ret has been changed */
{
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return ret;
}
if (maxread < size)
@ -520,6 +573,7 @@ static uint32 TMPtotalread = 0;
std::cerr << std::endl;
#endif
errorState = EAGAIN;
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return -1;
}
#endif /* TCP_NO_PARTIAL_READ */
@ -555,6 +609,7 @@ static uint32 TMPtotalread = 0;
/* can allow more in! - update inWinSize */
UpdateInWinSize();
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return size;
}
@ -618,6 +673,7 @@ static uint32 TMPtotalread = 0;
/* can allow more in! - update inWinSize */
UpdateInWinSize();
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return size;
}
#ifdef DEBUG_TCP_STREAM_EXTRA
@ -672,6 +728,7 @@ static uint32 TMPtotalread = 0;
UpdateInWinSize();
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return size;
}
@ -689,23 +746,32 @@ static uint32 TMPtotalread = 0;
/* can allow more in! - update inWinSize */
UpdateInWinSize();
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return size;
}
int TcpStream::recv()
/* Callback from lower Layers */
void TcpStream::recvPkt(void *data, int size)
{
int maxsize = MAX_SEG + TCP_PSEUDO_HDR_SIZE;
int size = maxsize;
uint8 input[maxsize];
double cts = getCurrentTS(); // fractional seconds.
#ifdef DEBUG_TCP_STREAM
std::cerr << "TcpStream::recvPkt()";
std::cerr << std::endl;
#endif
tcpMtx.lock(); /********** LOCK MUTEX *********/
uint8 *input = (uint8 *) data;
#ifdef DEBUG_TCP_STREAM
std::cerr << "TcpStream::recvPkt() Past Lock!";
std::cerr << std::endl;
#endif
#ifdef DEBUG_TCP_STREAM
if (state > TCP_SYN_RCVD)
{
int availRead = outSizeRead + outQueue.size() * MAX_SEG + outSizeNet;
std::cerr << "TcpStream::recv() CC: ";
std::cerr << "TcpStream::recvPkt() CC: ";
std::cerr << " iWS: " << inWinSize;
std::cerr << " aRead: " << availRead;
std::cerr << " iAck: " << inAckno;
@ -718,9 +784,6 @@ int TcpStream::recv()
}
#endif
for(;0 < udp -> readPkt(input, &size); size = maxsize)
{
#ifdef DEBUG_TCP_STREAM
std::cerr << "TcpStream::recv() ReadPkt(" << size << ")" << std::endl;
//std::cerr << printPkt(input, size);
@ -729,7 +792,7 @@ int TcpStream::recv()
TcpPacket *pkt = new TcpPacket();
if (0 < pkt -> readPacket(input, size))
{
lastIncomingPkt = cts;
lastIncomingPkt = getCurrentTS();
handleIncoming(pkt);
}
else
@ -740,8 +803,164 @@ int TcpStream::recv()
#endif
delete pkt;
}
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return;
}
int TcpStream::tick()
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
//std::cerr << "TcpStream::tick()" << std::endl;
recv_check(); /* recv is async */
send();
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return 1;
}
bool TcpStream::getRemoteAddress(struct sockaddr_in &raddr)
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
if (peerKnown)
{
raddr = peeraddr;
}
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return peerKnown;
}
uint8 TcpStream::TcpState()
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
uint8 err = state;
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return err;
}
int TcpStream::TcpErrorState()
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
int err = errorState;
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return err;
}
/********************* SOME EXPOSED DEBUGGING FNS ******************/
static int ilevel = 100;
bool TcpStream::widle()
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
/* init */
if (!lastWriteTF)
{
lastWriteTF = int_wbytes();
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return false;
}
if ((lastWriteTF == int_wbytes()) && (inSize + inQueue.size() == 0))
{
wcount++;
if (wcount > ilevel)
{
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return true;
}
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return false;
}
wcount = 0;
lastWriteTF = int_wbytes();
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return false;
}
bool TcpStream::ridle()
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
/* init */
if (!lastReadTF)
{
lastReadTF = int_rbytes();
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return false;
}
if ((lastReadTF == int_rbytes()) && (outSizeRead + outQueue.size() + outSizeNet== 0))
{
rcount++;
if (rcount > ilevel)
{
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return true;
}
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return false;
}
rcount = 0;
lastReadTF = int_rbytes();
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return false;
}
uint32 TcpStream::wbytes()
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
uint32 wb = int_wbytes();
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return wb;
}
uint32 TcpStream::rbytes()
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
uint32 rb = int_rbytes();
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return rb;
}
/********************* ALL BELOW HERE IS INTERNAL ******************
******************* AND ALWAYS PROTECTED BY A MUTEX ***************/
int TcpStream::recv_check()
{
double cts = getCurrentTS(); // fractional seconds.
#ifdef DEBUG_TCP_STREAM
if (state > TCP_SYN_RCVD)
{
int availRead = outSizeRead + outQueue.size() * MAX_SEG + outSizeNet;
std::cerr << "TcpStream::recv_check() CC: ";
std::cerr << " iWS: " << inWinSize;
std::cerr << " aRead: " << availRead;
std::cerr << " iAck: " << inAckno;
std::cerr << std::endl;
}
else
{
std::cerr << "TcpStream::recv_check() Not Connected";
std::cerr << std::endl;
}
#endif
// make sure we've rcvd something!
if ((state > TCP_SYN_RCVD) &&
(cts - lastIncomingPkt > kNoPktTimeout))
@ -761,6 +980,56 @@ int TcpStream::recv()
return 1;
}
int TcpStream::cleanup()
{
// This shuts it all down! no matter what.
outStreamActive = false;
inStreamActive = false;
state = TCP_CLOSED;
std::cerr << "TcpStream STATE -> TCP_CLOSED" << std::endl;
//peerKnown = false; //??? NOT SURE -> for a rapid reconnetion this might be key??
/* reset TTL */
setTTL(TCP_STD_TTL);
// clear arrays.
inSize = 0;
while(inQueue.size() > 0)
{
dataBuffer *db = inQueue.front();
inQueue.pop_front();
delete db;
}
while(outPkt.size() > 0)
{
TcpPacket *pkt = outPkt.front();
outPkt.pop_front();
delete pkt;
}
// clear arrays.
outSizeRead = 0;
outSizeNet = 0;
while(outQueue.size() > 0)
{
dataBuffer *db = outQueue.front();
outQueue.pop_front();
delete db;
}
while(inPkt.size() > 0)
{
TcpPacket *pkt = inPkt.front();
inPkt.pop_front();
delete pkt;
}
return 1;
}
int TcpStream::handleIncoming(TcpPacket *pkt)
{
#ifdef DEBUG_TCP_STREAM
@ -925,7 +1194,7 @@ int TcpStream::incoming_Closed(TcpPacket *pkt)
/* seq + winsize set in toSend() */
/* as we have received something ... we can up the TTL */
udp -> setTTL(TCP_STD_TTL);
setTTL(TCP_STD_TTL);
#ifdef DEBUG_TCP_STREAM
std::cerr << "TcpStream::incoming_Closed() Sending reply" << std::endl;
@ -984,7 +1253,7 @@ int TcpStream::incoming_SynSent(TcpPacket *pkt)
* As they have sent something, and we have received
* through the firewall, set to STD.
*/
udp -> setTTL(TCP_STD_TTL);
setTTL(TCP_STD_TTL);
/* ack the Syn Packet */
sendAck();
@ -1065,7 +1334,7 @@ int TcpStream::incoming_SynRcvd(TcpPacket *pkt)
/* As they have sent something, and we have received
* through the firewall, set to STD.
*/
udp -> setTTL(TCP_STD_TTL);
setTTL(TCP_STD_TTL);
/* change state */
state = TCP_ESTABLISHED;
@ -1361,7 +1630,7 @@ int TcpStream::UpdateInWinSize()
*
*/
uint32 queuedData = read_pending();
uint32 queuedData = int_read_pending();
if (queuedData < maxWinSize)
{
inWinSize = maxWinSize;
@ -1390,12 +1659,26 @@ int TcpStream::sendAck()
return toSend(new TcpPacket(), false);
}
void TcpStream::setRemoteAddress(const struct sockaddr_in &raddr)
{
peeraddr = raddr;
peerKnown = true;
}
int TcpStream::toSend(TcpPacket *pkt, bool retrans)
{
int outPktSize = MAX_SEG + TCP_PSEUDO_HDR_SIZE;
char tmpOutPkt[outPktSize];
if (!peerKnown)
{
/* Major Error! */
std::cerr << "TcpStream::toSend() peerUnknown ERROR!!!";
std::cerr << std::endl;
exit(1);
}
/* get accurate timestamp */
double cts = getCurrentTS();
@ -1446,7 +1729,7 @@ int TcpStream::toSend(TcpPacket *pkt, bool retrans)
//std::cerr << printPkt(tmpOutPkt, outPktSize) << std::endl;
#endif
udp -> sendPkt(tmpOutPkt, outPktSize);
udp -> sendPkt(tmpOutPkt, outPktSize, peeraddr, ttl);
if (retrans)
{
@ -1476,6 +1759,14 @@ int TcpStream::retrans()
char tmpOutPkt[outPktSize];
bool updateCongestion = true;
if (!peerKnown)
{
/* Major Error! */
std::cerr << "TcpStream::retrans() peerUnknown ERROR!!!";
std::cerr << std::endl;
exit(1);
}
/* now retrans */
double cts = getCurrentTS();
std::list<TcpPacket *>::iterator it;
@ -1571,9 +1862,9 @@ int TcpStream::retrans()
* we should increase the ttl.
*/
if ((pkt->hasSyn()) && (udp -> getTTL() < TCP_STD_TTL))
if ((pkt->hasSyn()) && (getTTL() < TCP_STD_TTL))
{
udp -> setTTL(1 + pkt->retrans /
setTTL(1 + pkt->retrans /
TCP_STARTUP_COUNT_PER_TTL);
std::cerr << "TcpStream::retrans() Startup SYNs";
@ -1609,7 +1900,7 @@ int TcpStream::retrans()
}
udp -> sendPkt(tmpOutPkt, outPktSize);
udp -> sendPkt(tmpOutPkt, outPktSize, peeraddr, ttl);
/* restart timers */
(*it) -> ts = cts;
@ -1770,8 +2061,6 @@ void TcpStream::acknowledge()
}
/* somehow managed to delete everything.... second time lucky */
int TcpStream::send()
{
/* handle network interface always */
@ -1945,24 +2234,25 @@ int TcpStream::send()
{
sendAck();
}
}
return 1;
}
int TcpStream::tick()
#ifdef DEBUG_TCP_STREAM_EXTRA
else
{
//std::cerr << "TcpStream::tick()" << std::endl;
recv();
send();
std::cerr << "TcpStream::send() No Ack";
std::cerr << std::endl;
}
#endif
}
#ifdef DEBUG_TCP_STREAM_EXTRA
else
{
std::cerr << "TcpStream::send() Stuff Sent";
std::cerr << std::endl;
}
#endif
return 1;
}
uint32 TcpStream::genSequenceNo()
{
//return 1000; // TCP_MAX_SEQ - 1000; //1000; //(rand() - 100000) + time(NULL) % 100000;
@ -2018,65 +2308,19 @@ static double getCurrentTS()
static int ilevel = 100;
bool TcpStream::widle()
{
/* init */
if (!lastWriteTF)
{
lastWriteTF = wbytes();
return false;
}
if ((lastWriteTF == wbytes()) && (inSize + inQueue.size() == 0))
{
wcount++;
if (wcount > ilevel)
return true;
return false;
}
wcount = 0;
lastWriteTF = wbytes();
return false;
}
bool TcpStream::ridle()
{
/* init */
if (!lastReadTF)
{
lastReadTF = rbytes();
return false;
}
if ((lastReadTF == rbytes()) && (outSizeRead + outQueue.size() + outSizeNet== 0))
{
rcount++;
if (rcount > ilevel)
return true;
return false;
}
rcount = 0;
lastReadTF = rbytes();
return false;
}
uint32 TcpStream::wbytes()
uint32 TcpStream::int_wbytes()
{
return outSeqno - initOurSeqno - 1;
}
uint32 TcpStream::rbytes()
uint32 TcpStream::int_rbytes()
{
return inAckno - initPeerSeqno - 1;
}
/********* Special debugging stuff *****/
#ifdef DEBUG_TCP_STREAM_EXTRA

View File

@ -36,7 +36,7 @@
*/
#include "tcppacket.h"
#include "udplayer.h"
#include "udpsorter.h"
#define MAX_SEG 1500
#define TCP_MAX_SEQ UINT_MAX
@ -68,25 +68,23 @@ class dataBuffer
#include <deque>
class TcpStream
class TcpStream: public UdpPeer
{
public:
/* Top-Level exposed */
TcpStream(UdpLayer *lyr);
TcpStream(UdpSorter *lyr);
/* user interface */
int status(std::ostream &out);
int connect();
int connect(const struct sockaddr_in &raddr);
int listenfor(const struct sockaddr_in &raddr);
bool isConnected();
bool widle(); /* write idle */
bool ridle(); /* read idle */
uint32 wbytes();
uint32 rbytes();
/* network interface (unreliable) */
int receivePkt(void *dta, int size);
int sendPkt(void *dta, int *size);
/* get tcp information */
bool getRemoteAddress(struct sockaddr_in &raddr);
uint8 TcpState();
int TcpErrorState();
/* stream Interface */
int write(char *dta, int size); /* write -> pkt -> net */
@ -100,12 +98,34 @@ int closeWrite(); /* non-standard, but for clean exit */
int close(); /* standard unix behaviour */
int tick(); /* check iface etc */
/* internal */
/* Callback Funcion from UDP Layers */
virtual void recvPkt(void *data, int size); /* overloaded */
/* Exposed Data Counting */
bool widle(); /* write idle */
bool ridle(); /* read idle */
uint32 wbytes();
uint32 rbytes();
private:
/* Internal Functions - use the Mutex (not reentrant) */
/* Internal Functions - that don't need mutex protection */
uint32 genSequenceNo();
bool isOldSequence(uint32 tst, uint32 curr);
RsMutex tcpMtx;
/* Internal Functions - only called inside mutex protection */
int cleanup();
/* incoming data */
int recv();
int recv_check();
int handleIncoming(TcpPacket *pkt);
int incoming_Closed(TcpPacket *pkt);
int incoming_SynSent(TcpPacket *pkt);
@ -117,22 +137,26 @@ int incoming_TimedWait(TcpPacket *pkt);
int incoming_Closing(TcpPacket *pkt);
int incoming_CloseWait(TcpPacket *pkt);
int incoming_LastAck(TcpPacket *pkt);
int check_InPkts();
int UpdateInWinSize();
int int_read_pending();
/* outgoing data */
int send();
int toSend(TcpPacket *pkt, bool retrans = true);
void acknowledge();
void calcWinSize();
int send();
int retrans();
int sendAck();
void setRemoteAddress(const struct sockaddr_in &raddr);
int getTTL() { return ttl; }
void setTTL(int t) { ttl = t; }
uint32 genSequenceNo();
bool isOldSequence(uint32 tst, uint32 curr);
/* data counting */
uint32 int_wbytes();
uint32 int_rbytes();
/* Internal Data - must have mutex to access! */
/* data (in -> pkts) && (pkts -> out) */
@ -195,9 +219,15 @@ bool isOldSequence(uint32 tst, uint32 curr);
uint32 congestWinSize;
uint32 congestUpdate;
/* existing TTL for this stream (tweaked at startup) */
int ttl;
struct sockaddr_in peeraddr;
bool peerKnown;
/* UdpSorter (has own Mutex!) */
UdpSorter *udp;
/* UdpLayer */
UdpLayer *udp;
};

View File

@ -43,10 +43,7 @@ struct TcpOnUdp_t
{
int tou_fd;
int lasterrno;
UdpLayer *udp;
TcpStream *tcp;
bool know_eaddr;
struct sockaddr_in extaddr;
bool idle;
};
@ -55,32 +52,66 @@ typedef struct TcpOnUdp_t TcpOnUdp;
static std::vector<TcpOnUdp *> tou_streams;
static int tou_inited = 0;
static UdpSorter *udps = NULL;
static int tou_tick_all();
static int tou_init()
/* tou_init - opens the udp port (universal bind) */
int tou_init(const struct sockaddr *my_addr, socklen_t addrlen)
{
if (tou_inited)
return 1;
tou_streams.resize(kInitStreamTable);
udps = new UdpSorter( *((struct sockaddr_in *) my_addr));
/* check the bind succeeded */
if (!(udps->okay()))
{
delete (udps);
udps = NULL;
return -1;
}
tou_inited = 1;
return 1;
}
/* tou_stunpeer supply tou with stun peers. */
int tou_stunpeer(const struct sockaddr *my_addr, socklen_t addrlen,
const char *id)
{
if (!tou_inited)
return -1;
udps->addStunPeer(*(struct sockaddr_in *) my_addr, id);
return 0;
}
int tou_extaddr(struct sockaddr *ext_addr, socklen_t *addrlen)
{
if (!tou_inited)
return -1;
return udps->externalAddr(*(struct sockaddr_in *) ext_addr);
}
/* open - which does nothing */
int tou_socket(int /*domain*/, int /*type*/, int /*protocol*/)
{
tou_init();
if (!tou_inited)
{
return -1;
}
for(unsigned int i = 1; i < tou_streams.size(); i++)
{
if (tou_streams[i] == NULL)
{
tou_streams[i] = new TcpOnUdp();
tou_streams[i] -> tou_fd = i;
tou_streams[i] -> know_eaddr = false;
tou_streams[i] -> udp = NULL;
tou_streams[i] -> tcp = NULL;
return i;
}
@ -93,8 +124,6 @@ int tou_socket(int /*domain*/, int /*type*/, int /*protocol*/)
if (tou == tou_streams[tou_streams.size() -1])
{
tou -> tou_fd = tou_streams.size() -1;
tou -> know_eaddr = false;
tou -> udp = NULL;
tou -> tcp = NULL;
return tou->tou_fd;
}
@ -109,10 +138,6 @@ int tou_socket(int /*domain*/, int /*type*/, int /*protocol*/)
#endif
}
/* bind - opens the udp port */
int tou_bind(int sockfd, const struct sockaddr *my_addr,
socklen_t addrlen)
@ -123,47 +148,11 @@ int tou_bind(int sockfd, const struct sockaddr *my_addr,
}
TcpOnUdp *tous = tou_streams[sockfd];
if (tous->udp)
{
/* this now always returns an error! */
tous -> lasterrno = EADDRINUSE;
return -1;
}
if (tous->tcp)
{
tous -> lasterrno = EADDRINUSE;
return -1;
}
if (addrlen != sizeof(struct sockaddr_in))
{
tous -> lasterrno = EADDRNOTAVAIL;
return -1;
}
/*
* tous->udp = new UdpLayer( *((struct sockaddr_in *) my_addr));
*/
// for testing - drop 5% of packets... */
//tous->udp = new LossyUdpLayer( *((struct sockaddr_in *) my_addr), 0.05);
tous->udp = new UdpLayer( *((struct sockaddr_in *) my_addr));
/* check the bind succeeded */
if (!(tous->udp->okay()))
{
delete (tous->udp);
tous->udp = NULL;
tous -> lasterrno = EADDRINUSE;
return -1;
}
tous->tcp = new TcpStream(tous->udp);
tous->tcp->tick();
tou_tick_all();
return 0;
}
/* records peers address, and sends syn pkt
* the timeout is very slow initially - to give
* the peer a chance to startup
@ -188,12 +177,15 @@ int tou_connect(int sockfd, const struct sockaddr *serv_addr,
return -1;
}
if (tous->tcp->state == 0)
/* create a TCP stream to connect with. */
if (!tous->tcp)
{
tous->udp->setRemoteAddr(*((struct sockaddr_in *) serv_addr));
tous->tcp = new TcpStream(udps);
udps->addUdpPeer(tous->tcp,
*((const struct sockaddr_in *) serv_addr));
}
tous->tcp->connect();
tous->tcp->connect(*(const struct sockaddr_in *) serv_addr);
tous->tcp->tick();
tou_tick_all();
if (tous->tcp->isConnected())
@ -205,6 +197,36 @@ int tou_connect(int sockfd, const struct sockaddr *serv_addr,
return -1;
}
int tou_listenfor(int sockfd, const struct sockaddr *serv_addr,
socklen_t addrlen)
{
if (tou_streams[sockfd] == NULL)
{
return -1;
}
TcpOnUdp *tous = tou_streams[sockfd];
if (addrlen != sizeof(struct sockaddr_in))
{
tous -> lasterrno = EINVAL;
return -1;
}
/* create a TCP stream to connect with. */
if (!tous->tcp)
{
tous->tcp = new TcpStream(udps);
udps->addUdpPeer(tous->tcp,
*((const struct sockaddr_in *) serv_addr));
}
tous->tcp->listenfor(*((struct sockaddr_in *) serv_addr));
tous->tcp->tick();
tou_tick_all();
return 0;
}
int tou_listen(int sockfd, int backlog)
{
tou_tick_all();
@ -233,7 +255,7 @@ int tou_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
if (tous->tcp->isConnected())
{
// should get remote address
tous->udp->getRemoteAddr(*((struct sockaddr_in *) addr));
tous->tcp->getRemoteAddress(*((struct sockaddr_in *) addr));
return sockfd;
}
@ -241,154 +263,6 @@ int tou_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
return -1;
}
int tou_listenfor(int sockfd, const struct sockaddr *serv_addr,
socklen_t addrlen)
{
if (tou_streams[sockfd] == NULL)
{
return -1;
}
TcpOnUdp *tous = tou_streams[sockfd];
if (addrlen != sizeof(struct sockaddr_in))
{
tous -> lasterrno = EINVAL;
return -1;
}
if (tous->tcp->state == 0)
{
tous->udp->setRemoteAddr(*((struct sockaddr_in *) serv_addr));
}
tous->tcp->tick();
tou_tick_all();
return 0;
}
/* This is a udp socket - after all
* independent operation from all of the
* stream stuff.
*/
int tou_extudp(const struct sockaddr *ext, socklen_t tolen)
{
/* request a udp to listen on, in a leachy kinda way */
std::vector<TcpOnUdp *>::iterator it;
for(it = tou_streams.begin(); it != tou_streams.end(); it++)
{
if ((*it) && ((*it)->udp))
{
if ((*it)->know_eaddr)
{
*((struct sockaddr_in *) ext) = (*it)->extaddr;
return (*it)->tou_fd;
}
}
}
return -1;
}
int tou_extaddr(int sockfd, const struct sockaddr *ext, socklen_t tolen)
{
if (tou_streams[sockfd] == NULL)
{
return -1;
}
TcpOnUdp *tous = tou_streams[sockfd];
tous -> know_eaddr = true;
tous -> extaddr = *((struct sockaddr_in *) ext);
return 0;
}
ssize_t tou_recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *from, socklen_t *fromlen)
{
if (tou_streams[sockfd] == NULL)
{
std::cerr << "tou_recvfrom() Invalid sockfd:" << sockfd;
std::cerr << std::endl;
return -1;
}
TcpOnUdp *tous = tou_streams[sockfd];
if (*fromlen != sizeof(struct sockaddr_in))
{
std::cerr << "tou_recvfrom() Invalid addr size";
std::cerr << std::endl;
tous -> lasterrno = EINVAL;
return -1;
}
/* extra checks */
if ((!tous->tcp) || (!tous->udp))
{
std::cerr << "tou_recvfrom() Bad sockfd (!udp) || (!tcp) :" << sockfd;
std::cerr << std::endl;
return -1;
}
if (tous->tcp->state == 0)
{
//std::cerr << "tou_recvfrom() State = 0 .... Allow??";
//std::cerr << std::endl;
}
if (tous->udp->okay())
{
int ret = tous->udp->recvRndPktfrom(buf,len,flags, from, fromlen);
if (ret < 0)
{
//std::cerr << "tou_recvfrom() Sock Ok, Try Again later";
//std::cerr << std::endl;
tous -> lasterrno = EAGAIN;
return -1;
}
std::cerr << "tou_recvfrom() Got a Packet on: " << sockfd;
std::cerr << std::endl;
tous -> lasterrno = 0;
return ret;
}
std::cerr << "tou_recvfrom() Socket Not Okay:" << sockfd;
std::cerr << std::endl;
tous -> lasterrno = EAGAIN;
return -1;
}
ssize_t tou_sendto(int sockfd, const void *buf, size_t len, int flags, const struct
sockaddr *to, socklen_t tolen)
{
if (tou_streams[sockfd] == NULL)
{
return -1;
}
TcpOnUdp *tous = tou_streams[sockfd];
if (tolen != sizeof(struct sockaddr_in))
{
tous -> lasterrno = EINVAL;
return -1;
}
if (tous->tcp->state == 0)
{
}
if (tous->udp->okay())
{
tous->udp->sendToProxy(*((struct sockaddr_in *) to),
buf, len);
return len;
}
tous -> lasterrno = EAGAIN;
return -1;
}
int tou_connected(int sockfd)
{
@ -401,7 +275,7 @@ int tou_connected(int sockfd)
tous->tcp->tick();
tou_tick_all();
return (tous->tcp->state == 4);
return (tous->tcp->TcpState() == 4);
}
@ -422,7 +296,7 @@ ssize_t tou_read(int sockfd, void *buf, size_t count)
int err = tous->tcp->read((char *) buf, count);
if (err < 0)
{
tous->lasterrno = tous->tcp->errorState;
tous->lasterrno = tous->tcp->TcpErrorState();
return -1;
}
return err;
@ -440,7 +314,7 @@ ssize_t tou_write(int sockfd, const void *buf, size_t count)
int err = tous->tcp->write((char *) buf, count);
if (err < 0)
{
tous->lasterrno = tous->tcp->errorState;
tous->lasterrno = tous->tcp->TcpErrorState();
tous->tcp->tick();
tou_tick_all();
return -1;
@ -464,7 +338,7 @@ int tou_maxread(int sockfd)
int ret = tous->tcp->read_pending();
if (ret < 0)
{
tous->lasterrno = tous->tcp->errorState;
tous->lasterrno = tous->tcp->TcpErrorState();
return 0; // error detected next time.
}
return ret;
@ -483,7 +357,7 @@ int tou_maxwrite(int sockfd)
int ret = tous->tcp->write_allowed();
if (ret < 0)
{
tous->lasterrno = tous->tcp->errorState;
tous->lasterrno = tous->tcp->TcpErrorState();
return 0; // error detected next time?
}
return ret;
@ -504,18 +378,19 @@ int tou_close(int sockfd)
/* shut it down */
tous->tcp->close();
tous->udp->close();
delete tous->tcp;
delete tous->udp;
delete tous;
tou_streams[sockfd] = NULL;
return 1;
}
/* get an error number */
int tou_errno(int sockfd)
{
if (!udps)
{
return ENOTSOCK;
}
if (tou_streams[sockfd] == NULL)
{
return ENOTSOCK;
@ -535,12 +410,10 @@ int tou_clear_error(int sockfd)
return 0;
}
/* unfortuately the library needs to be ticked. (not running a thread)
* you can put it in a thread!
*/
/*
* Some helper functions for stuff.
*

View File

@ -51,6 +51,47 @@
extern "C" {
#endif
/* The modification to a single UDP socket means
* that the structure of the TOU interface must be changed.
*
* Init:
* (1) choose our local address. (a universal bind)
* int tou_init(const struct sockaddr *my_addr);
* (2) query if we have determined our external address.
* int tou_extaddr(struct sockaddr *ext_addr, socklen_t *addrlen);
* (3) offer more stunpeers, for external address determination.
* int tou_stunpeer(const struct sockaddr *ext_addr, socklen_t addrlen, const char *id);
* (4) repeat (2)+(3) until a valid extaddr is returned.
*
*/
int tou_init(const struct sockaddr *my_addr, socklen_t addrlen);
int tou_extaddr(struct sockaddr *ext_addr, socklen_t *addrlen);
int tou_stunpeer(const struct sockaddr *ext_addr, socklen_t addrlen, const char *id);
/* Connections are as similar to UNIX as possible
* (1) create a socket: tou_socket() this reserves a socket id.
* (2) connect: active: tou_connect() or passive: tou_listenfor().
* (3) use as a normal socket.
*
* tou_bind() is not valid. tou_init performs this role.
* tou_listen() is not valid. (must listen for a specific address) use tou_listenfor() instead.
* tou_accept() can still be used.
*/
/* creation/connections */
int tou_socket(int domain, int type, int protocol);
int tou_bind(int sockfd, const struct sockaddr *my_addr, socklen_t addrlen); /* null op now */
int tou_listen(int sockfd, int backlog); /* null op now */
int tou_connect(int sockfd, const struct sockaddr *serv_addr, socklen_t addrlen);
int tou_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
/* non-standard bonuses */
int tou_connected(int sockfd);
int tou_listenfor(int sockfd, const struct sockaddr *serv_addr, socklen_t addrlen);
/* UNIX interface: minimum for the SSL BIO interface */
ssize_t tou_read(int sockfd, void *buf, size_t count);
ssize_t tou_write(int sockfd, const void *buf, size_t count);
@ -65,28 +106,6 @@ int tou_maxread(int sockfd);
int tou_maxwrite(int sockfd);
/* creation/connections */
int tou_socket(int domain, int type, int protocol);
int tou_bind(int sockfd, const struct sockaddr *my_addr, socklen_t addrlen);
int tou_connect(int sockfd, const struct sockaddr *serv_addr, socklen_t addrlen);
int tou_listen(int sockfd, int backlog);
int tou_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
/* udp interface */
/* request an external udp port to use - returns sockfd */
int tou_extudp(const struct sockaddr *ext, socklen_t tolen);
int tou_extaddr(int sockfd, const struct sockaddr *to, socklen_t tolen);
ssize_t tou_recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *from, socklen_t *fromlen);
ssize_t tou_sendto(int s, const void *buf, size_t len, int flags, const struct
sockaddr *to, socklen_t tolen);
/* non-standard bonuses */
int tou_connected(int sockfd);
int tou_listenfor(int sockfd, const struct sockaddr *serv_addr,
socklen_t addrlen);
#ifdef __cplusplus
}
#endif

View File

@ -115,33 +115,36 @@ int main(int argc, char **argv)
std::cerr << "Local Address: " << laddr << std::endl;
std::cerr << "Remote Address: " << raddr << std::endl;
//LossyUdpLayer udpl(laddr, 0.01);
UdpLayer udpl(laddr);
if (!udpl.openSocket())
UdpSorter udps(laddr);
if (!udps.okay())
{
std::cerr << "Cannot Open Local Address: " << laddr << std::endl;
std::cerr << "UdpSorter not Okay (Cannot Open Local Address): " << laddr << std::endl;
exit(1);
}
udpl.setRemoteAddr(raddr);
TcpStream tcp(&udpl);
TcpStream tcp(&udps);
udps.addUdpPeer(&tcp, raddr);
if (toConnect)
{
tcp.connect();
tcp.connect(raddr);
}
else
{
tcp.listenfor(raddr);
}
while(!tcp.isConnected())
{
sleep(1);
std::cerr << "Waiting for TCP to Connect!" << std::endl;
udpl.status(std::cerr);
udps.status(std::cerr);
tcp.status(std::cerr);
tcp.tick();
}
std::cerr << "TCP Connected***************************" << std::endl;
udpl.status(std::cerr);
udps.status(std::cerr);
tcp.status(std::cerr);
std::cerr << "TCP Connected***************************" << std::endl;
@ -161,7 +164,7 @@ int main(int argc, char **argv)
while(!done)
{
//sleep(1);
usleep(10000);
usleep(100000);
//usleep(1000);
if (blockread != true)
{
@ -196,8 +199,8 @@ int main(int argc, char **argv)
while(!tcp.widle())
{
sleep(1);
//usleep(10000);
//sleep(1);
usleep(100000);
//usleep(1000);
tcp.tick();
if (count++ % 10 == 0)
@ -220,7 +223,7 @@ int main(int argc, char **argv)
while(1)
{
//sleep(1);
usleep(10000);
usleep(100000);
//usleep(1000);
//int writesize = bufsize;
int ret;
@ -258,7 +261,9 @@ int main(int argc, char **argv)
while((stayOpen) || (!tcp.ridle()))
{
tcp.tick();
sleep(1);
//sleep(1);
usleep(100000);
//usleep(1000);
if (count++ % 10 == 0)
{
std::cerr << "Waiting for Idle()" << std::endl;

View File

@ -45,8 +45,8 @@
*/
/*
* #define DEBUG_UDP_LAYER 1
*/
*/ #define DEBUG_UDP_LAYER 1
/**/
static const int UDP_DEF_TTL = 64;
@ -76,7 +76,7 @@ class udpPacket
};
std::ostream &operator<<(std::ostream &out, struct sockaddr_in &addr)
std::ostream &operator<<(std::ostream &out, const struct sockaddr_in &addr)
{
out << "[" << inet_ntoa(addr.sin_addr) << ":";
out << htons(addr.sin_port) << "]";
@ -84,7 +84,7 @@ std::ostream &operator<<(std::ostream &out, struct sockaddr_in &addr)
}
bool operator==(struct sockaddr_in &addr, struct sockaddr_in &addr2)
bool operator==(const struct sockaddr_in &addr, const struct sockaddr_in &addr2)
{
if (addr.sin_family != addr2.sin_family)
return false;
@ -95,6 +95,18 @@ bool operator==(struct sockaddr_in &addr, struct sockaddr_in &addr2)
return true;
}
bool operator<(const struct sockaddr_in &addr, const struct sockaddr_in &addr2)
{
if (addr.sin_family != addr2.sin_family)
return (addr.sin_family < addr2.sin_family);
if (addr.sin_addr.s_addr != addr2.sin_addr.s_addr)
return (addr.sin_addr.s_addr < addr2.sin_addr.s_addr);
if (addr.sin_port != addr2.sin_port)
return (addr.sin_port < addr2.sin_port);
return false;
}
std::string printPkt(void *d, int size)
{
std::ostringstream out;
@ -148,8 +160,8 @@ std::string printPktOffset(unsigned int offset, void *d, unsigned int size)
UdpLayer::UdpLayer(struct sockaddr_in &local)
:laddr(local), raddrKnown(false), errorState(0), ttl(UDP_DEF_TTL)
UdpLayer::UdpLayer(UdpReceiver *udpr, struct sockaddr_in &local)
:recv(udpr), laddr(local), errorState(0), ttl(UDP_DEF_TTL)
{
openSocket();
return;
@ -159,14 +171,6 @@ int UdpLayer::status(std::ostream &out)
{
out << "UdpLayer::status()" << std::endl;
out << "localaddr: " << laddr << std::endl;
if (raddrKnown)
{
out << "remoteaddr: " << raddr << std::endl;
}
else
{
out << "remoteaddr unKnown!" << std::endl;
}
out << "sockfd: " << sockfd << std::endl;
out << std::endl;
return 1;
@ -175,79 +179,96 @@ int UdpLayer::status(std::ostream &out)
int UdpLayer::close()
{
/* close socket if open */
sockMtx.lock(); /********** LOCK MUTEX *********/
if (sockfd > 0)
{
tounet_close(sockfd);
}
sockMtx.unlock(); /******** UNLOCK MUTEX *********/
return 1;
}
void UdpLayer::run()
{
return recv_loop();
}
/* higher level interface */
int UdpLayer::readPkt(void *data, int *size)
void UdpLayer::recv_loop()
{
int nsize = *size;
int maxsize = 16000;
void *inbuf = malloc(maxsize);
int status;
struct timeval timeout;
while(1)
{
/* select on the socket TODO */
fd_set rset;
for(;;) {
FD_ZERO(&rset);
FD_SET(sockfd, &rset);
timeout.tv_sec = 0;
timeout.tv_usec = 500000; /* 500 ms timeout */
status = select(sockfd+1, &rset, NULL, NULL, &timeout);
if (status > 0)
{
break; /* data available, go read it */
}
else if (status < 0)
{
std::cerr << "Error: " << tounet_errno() << std::endl;
}
};
int nsize = maxsize;
struct sockaddr_in from;
if (0 >= receiveUdpPacket(data, &nsize, from))
if (0 < receiveUdpPacket(inbuf, &nsize, from))
{
#ifdef DEBUG_UDP_LAYER
//std::cerr << "UdpLayer::readPkt() not ready" << from;
//std::cerr << std::endl;
std::cerr << "UdpLayer::readPkt() from : " << from << std::endl;
std::cerr << printPkt(inbuf, nsize);
#endif
return -1;
}
#ifdef DEBUG_UDP_LAYER
//std::cerr << "UdpLayer::readPkt() from : " << from << std::endl;
//std::cerr << printPkt(data, nsize);
#endif
if ((raddrKnown) && (from == raddr))
{
#ifdef DEBUG_UDP_LAYER
std::cerr << "UdpLayer::readPkt() from RemoteAddr: " << from;
std::cerr << std::endl;
#endif
*size = nsize;
return nsize;
}
#ifdef DEBUG_UDP_LAYER
std::cerr << "UdpLayer::readPkt() from unknown remote addr: " << from;
std::cerr << std::endl;
#endif
std::cerr << "UdpLayer::readPkt() storing Random packet from: " << from;
std::cerr << std::endl;
randomPkts.push_back(new udpPacket(&from,data, nsize));
return -1;
}
int UdpLayer::sendPkt(void *data, int size)
{
if (raddrKnown)
{
#ifdef DEBUG_UDP_LAYER
std::cerr << "UdpLayer::sendPkt() to: " << raddr << std::endl;
//std::cerr << printPkt(data, size);
#endif
sendUdpPacket(data, size, raddr);
return size;
// send to reciever.
recv -> recvPkt(inbuf, nsize, from);
}
else
{
#ifdef DEBUG_UDP_LAYER
std::cerr << "UdpLayer::sendPacket() unknown remote addr!";
std::cerr << "UdpLayer::readPkt() not ready" << from;
std::cerr << std::endl;
#endif
return -1;
}
return 1;
}
return;
}
int UdpLayer::sendPkt(void *data, int size, sockaddr_in &to, int ttl)
{
/* if ttl is different -> set it */
if (ttl != getTTL())
{
setTTL(ttl);
}
/* and send! */
#ifdef DEBUG_UDP_LAYER
std::cerr << "UdpLayer::sendPkt() to: " << to << std::endl;
std::cerr << printPkt(data, size);
#endif
sendUdpPacket(data, size, to);
return size;
}
/* setup connections */
int UdpLayer::openSocket()
{
sockMtx.lock(); /********** LOCK MUTEX *********/
/* make a socket */
sockfd = tounet_socket(PF_INET, SOCK_DGRAM, 0);
#ifdef DEBUG_UDP_LAYER
@ -262,6 +283,8 @@ int UdpLayer::openSocket()
#endif
errorState = EADDRINUSE;
//exit(1);
sockMtx.unlock(); /******** UNLOCK MUTEX *********/
return -1;
}
@ -272,24 +295,32 @@ int UdpLayer::openSocket()
#endif
}
errorState = 0;
#ifdef DEBUG_UDP_LAYER
std::cerr << "Socket Bound to : " << laddr << std::endl;
#endif
sockMtx.unlock(); /******** UNLOCK MUTEX *********/
#ifdef DEBUG_UDP_LAYER
std::cerr << "Setting TTL to " << UDP_DEF_TTL << std::endl;
#endif
setTTL(UDP_DEF_TTL);
errorState = 0;
return 1;
}
int UdpLayer::setTTL(int t)
{
sockMtx.lock(); /********** LOCK MUTEX *********/
int err = tounet_setsockopt(sockfd, IPPROTO_IP, IP_TTL, &t, sizeof(int));
ttl = t;
sockMtx.unlock(); /******** UNLOCK MUTEX *********/
#ifdef DEBUG_UDP_LAYER
std::cerr << "UdpLayer::setTTL(" << t << ") returned: " << err;
std::cerr << std::endl;
@ -300,42 +331,26 @@ int UdpLayer::setTTL(int t)
int UdpLayer::getTTL()
{
return ttl;
}
sockMtx.lock(); /********** LOCK MUTEX *********/
int t = ttl;
sockMtx.unlock(); /******** UNLOCK MUTEX *********/
int UdpLayer::sendToProxy(struct sockaddr_in &proxy, const void *data, int size)
{
sendUdpPacket(data, size, proxy);
return 1;
}
int UdpLayer::setRemoteAddr(struct sockaddr_in &remote)
{
raddr = remote;
raddrKnown = true;
return 1;
}
int UdpLayer::getRemoteAddr(struct sockaddr_in &remote)
{
if (raddrKnown)
{
remote = raddr;
return 1;
}
return 0;
return t;
}
/* monitoring / updates */
int UdpLayer::okay()
{
sockMtx.lock(); /********** LOCK MUTEX *********/
bool nonFatalError = ((errorState == 0) ||
(errorState == EAGAIN) ||
(errorState == EINPROGRESS));
sockMtx.unlock(); /******** UNLOCK MUTEX *********/
#ifdef DEBUG_UDP_LAYER
if (!nonFatalError)
{
@ -354,91 +369,23 @@ int UdpLayer::tick()
#endif
return 1;
}
/******************* Internals *************************************/
ssize_t UdpLayer::recvRndPktfrom(void *buf, size_t len, int flags,
struct sockaddr *from, socklen_t *fromlen)
{
#ifdef DEBUG_UDP_LAYER
std::cerr << "UdpLayer::recvRndPktfrom()" << std::endl;
#endif
if (*fromlen != sizeof(struct sockaddr_in))
{
#ifdef DEBUG_UDP_LAYER
std::cerr << "UdpLayer::recvRndPktfrom() bad address length" << std::endl;
#endif
return -1;
}
/* if raddr not known -> then we're not connected
* at a higher level and therefore our queue
* will not be filled (no ticking)....
* so feel free the get data.
*/
if (randomPkts.size() == 0)
{
if (!raddrKnown)
{
#ifdef DEBUG_UDP_LAYER
std::cerr << "UdpLayer::recvRndPktfrom() Checking Directly" << std::endl;
#endif
int size = len;
int ret = receiveUdpPacket(buf, &size, *((struct sockaddr_in *) from));
if (ret > 0)
{
#ifdef DEBUG_UDP_LAYER
std::cerr << "UdpLayer::recvRndPktfrom() Got Pkt directly" << std::endl;
std::cerr << "Pkt from:" << inet_ntoa(((struct sockaddr_in *) from)->sin_addr);
std::cerr << ":" << ntohs(((struct sockaddr_in *) from)->sin_port) << std::endl;
#endif
return ret;
}
}
#ifdef DEBUG_UDP_LAYER
std::cerr << "UdpLayer::recvRndPktfrom() Nothing in the Queue" << std::endl;
#endif
return -1;
}
udpPacket *pkt = randomPkts.front();
randomPkts.pop_front();
*((struct sockaddr_in *) from) = pkt->raddr;
unsigned int size = pkt->len;
if (len < size)
{
size = len;
}
memcpy(buf, pkt->data, size);
*((struct sockaddr_in *) from) = pkt->raddr;
#ifdef DEBUG_UDP_LAYER
std::cerr << "UdpLayer::recvRndPktfrom() returning stored Pkt" << std::endl;
std::cerr << "Pkt from:" << inet_ntoa(pkt->raddr.sin_addr);
std::cerr << ":" << ntohs(pkt->raddr.sin_port) << std::endl;
std::cerr << "Length: " << pkt->len << std::endl;
#endif
delete pkt;
return size;
}
/******************* Internals *************************************/
int UdpLayer::receiveUdpPacket(void *data, int *size, struct sockaddr_in &from)
{
struct sockaddr_in fromaddr;
socklen_t fromsize = sizeof(fromaddr);
int insize = *size;
if (0<(insize=tounet_recvfrom(sockfd,data,insize,0,
(struct sockaddr*)&fromaddr,&fromsize)))
sockMtx.lock(); /********** LOCK MUTEX *********/
insize = tounet_recvfrom(sockfd,data,insize,0,
(struct sockaddr*)&fromaddr,&fromsize);
sockMtx.unlock(); /******** UNLOCK MUTEX *********/
if (0 < insize)
{
#ifdef DEBUG_UDP_LAYER
std::cerr << "receiveUdpPacket() from: " << fromaddr;
@ -461,9 +408,13 @@ int UdpLayer::sendUdpPacket(const void *data, int size, struct sockaddr_in &to)
#endif
struct sockaddr_in toaddr = to;
sockMtx.lock(); /********** LOCK MUTEX *********/
tounet_sendto(sockfd, data, size, 0,
(struct sockaddr *) &(toaddr),
sizeof(toaddr));
sockMtx.unlock(); /******** UNLOCK MUTEX *********/
return 1;
}

View File

@ -35,6 +35,9 @@
#include <sys/socket.h>
*/
#include "util/rsthreads.h"
/* universal networking functions */
#include "tou_net.h"
@ -42,43 +45,44 @@
#include <list>
#include <deque>
std::ostream &operator<<(std::ostream &out, struct sockaddr_in &addr);
bool operator==(struct sockaddr_in &addr, struct sockaddr_in &addr2);
std::ostream &operator<<(std::ostream &out, const struct sockaddr_in &addr);
bool operator==(const struct sockaddr_in &addr, const struct sockaddr_in &addr2);
bool operator<(const struct sockaddr_in &addr, const struct sockaddr_in &addr2);
std::string printPkt(void *d, int size);
std::string printPktOffset(unsigned int offset, void *d, unsigned int size);
/* So the UdpLayer ..... has a couple of roles
*
* Firstly Send Proxy Packet() (for address determination).
* all the rest of this functionality is handled elsewhere.
*
* Secondly support TcpStreamer....
/* UdpLayer ..... is the bottom layer which
* just sends and receives Udp packets.
*/
class udpPacket;
class UdpReceiver
{
public:
virtual void recvPkt(void *data, int size, struct sockaddr_in &from) = 0;
};
class UdpLayer
class UdpLayer: public RsThread
{
public:
UdpLayer(struct sockaddr_in &local);
UdpLayer(UdpReceiver *recv, struct sockaddr_in &local);
virtual ~UdpLayer() { return; }
int status(std::ostream &out);
/* setup connections */
int openSocket();
int setTTL(int t);
int getTTL();
int sendToProxy(struct sockaddr_in &proxy, const void *data, int size);
int setRemoteAddr(struct sockaddr_in &remote);
int getRemoteAddr(struct sockaddr_in &remote);
/* RsThread functions */
virtual void run(); /* called once the thread is started */
void recv_loop(); /* uses callback to UdpReceiver */
/* Higher Level Interface */
int readPkt(void *data, int *size);
int sendPkt(void *data, int size);
//int readPkt(void *data, int *size, struct sockaddr_in &from);
int sendPkt(void *data, int size, struct sockaddr_in &to, int ttl);
/* monitoring / updates */
int okay();
@ -86,12 +90,6 @@ int status(std::ostream &out);
int close();
/* unix like interface for recving packets not part
* of the tcp stream
*/
ssize_t recvRndPktfrom(void *buf, size_t len, int flags,
struct sockaddr *from, socklen_t *fromlen);
/* data */
/* internals */
protected:
@ -99,25 +97,21 @@ ssize_t recvRndPktfrom(void *buf, size_t len, int flags,
virtual int receiveUdpPacket(void *data, int *size, struct sockaddr_in &from);
virtual int sendUdpPacket(const void *data, int size, struct sockaddr_in &to);
int setTTL(int t);
int getTTL();
/* low level */
/*
* int rwSocket();
*/
private:
UdpReceiver *recv;
struct sockaddr_in paddr; /* proxy addr */
struct sockaddr_in raddr; /* remote addr */
struct sockaddr_in laddr; /* local addr */
bool raddrKnown;
int errorState;
int sockfd;
int ttl;
std::deque<udpPacket * > randomPkts;
RsMutex sockMtx;
};
#include <iostream>
@ -126,8 +120,8 @@ class LossyUdpLayer: public UdpLayer
{
public:
LossyUdpLayer(struct sockaddr_in &local, double frac)
:UdpLayer(local), lossFraction(frac)
LossyUdpLayer(UdpReceiver *udpr, struct sockaddr_in &local, double frac)
:UdpLayer(udpr, local), lossFraction(frac)
{
return;
}

View File

@ -0,0 +1,91 @@
/*
* libretroshare/src/tcponudp: udpsock_test.cc
*
* TCP-on-UDP (tou) network interface for RetroShare.
*
* Copyright 2007-2008 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#include "udptestfn.h"
#include "udplayer.h"
#define MAX_PEERS 16
int main(int argc, char **argv)
{
/* get local and remote addresses */
struct sockaddr_in local;
struct sockaddr_in peers[MAX_PEERS];
int numpeers = 0;
int i,j;
local.sin_family = AF_INET;
inet_aton("127.0.0.1", &(local.sin_addr));
local.sin_port = htons(8767);
for(i = 0; i < MAX_PEERS; i++)
{
peers[i].sin_family = AF_INET;
inet_aton("127.0.0.1", &(peers[i].sin_addr));
peers[i].sin_port = htons(8768);
}
if (argc < 3)
{
std::cerr << "Usage: " << argv[0] << " <l-port> [<p-port1 ..]";
std::cerr << std::endl;
exit(1);
}
local.sin_port = htons(atoi(argv[1]));
std::cerr << argv[0] << " Local Port: " << ntohs(local.sin_port);
std::cerr << std::endl;
for(i = 2; i < argc; i++)
{
numpeers++;
peers[i-2].sin_port = htons(atoi(argv[i]));
std::cerr << "\tPeer Port: " << ntohs(peers[i-2].sin_port);
std::cerr << std::endl;
}
/* create a udp layer */
UdpRecvTest recv;
UdpLayer udpl(&recv, local);
udpl.start();
int size = 12;
void *data = malloc(size);
int ttl = 64;
/* push packets to the peer */
for(i = 0; i < 60; i++)
{
sleep(1);
for(j = 0; j < numpeers; j++)
{
udpl.sendPkt(data, size, peers[j], ttl);
}
}
return 1;
}

View File

@ -0,0 +1,91 @@
/*
* libretroshare/src/tcponudp: udpsort_test.cc
*
* TCP-on-UDP (tou) network interface for RetroShare.
*
* Copyright 2007-2008 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#include "udptestfn.h"
#include "udpsorter.h"
#define MAX_PEERS 16
int main(int argc, char **argv)
{
/* get local and remote addresses */
struct sockaddr_in local;
struct sockaddr_in peers[MAX_PEERS];
int numpeers = 0;
int i,j;
local.sin_family = AF_INET;
inet_aton("127.0.0.1", &(local.sin_addr));
local.sin_port = htons(8767);
for(i = 0; i < MAX_PEERS; i++)
{
peers[i].sin_family = AF_INET;
inet_aton("127.0.0.1", &(peers[i].sin_addr));
peers[i].sin_port = htons(8768);
}
if (argc < 3)
{
std::cerr << "Usage: " << argv[0] << " <l-port> [<p-port1 ..]";
std::cerr << std::endl;
exit(1);
}
local.sin_port = htons(atoi(argv[1]));
std::cerr << argv[0] << " Local Port: " << ntohs(local.sin_port);
std::cerr << std::endl;
UdpSorter udps(local);
for(i = 2; i < argc; i++)
{
numpeers++;
peers[i-2].sin_port = htons(atoi(argv[i]));
std::cerr << "\tPeer Port: " << ntohs(peers[i-2].sin_port);
std::cerr << std::endl;
UdpPeerTest *pt = new UdpPeerTest(peers[i-2]);
udps.addUdpPeer(pt, peers[i-2]);
}
int size = 12;
void *data = malloc(size);
int ttl = 64;
/* push packets to the peer */
for(i = 0; i < 60; i++)
{
sleep(1);
for(j = 0; j < numpeers; j++)
{
udps.sendPkt(data, size, peers[j], ttl);
}
}
return 1;
}

View File

@ -0,0 +1,441 @@
/*
* libretroshare/src/tcponudp: udpsorter.cc
*
* TCP-on-UDP (tou) network interface for RetroShare.
*
* Copyright 2007-2008 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#include "udpsorter.h"
#include <iostream>
#include <sstream>
#include <iomanip>
static const int STUN_TTL = 64;
/*
* #define DEBUG_UDP_SORTER 1
*/
#define DEBUG_UDP_SORTER 1
UdpSorter::UdpSorter(struct sockaddr_in &local)
: udpLayer(NULL), laddr(local)
{
openSocket();
return;
}
/* higher level interface */
void UdpSorter::recvPkt(void *data, int size, struct sockaddr_in &from)
{
/* print packet information */
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::recvPkt(" << size << ") from: " << from;
std::cerr << std::endl;
#endif
sortMtx.lock(); /********** LOCK MUTEX *********/
/* look for a peer */
std::map<struct sockaddr_in, UdpPeer *>::iterator it;
it = streams.find(from);
/* check for STUN packet */
if (isStunPacket(data, size))
{
std::cerr << "UdpSorter::recvPkt() is Stun Packet";
std::cerr << std::endl;
/* respond */
handleStunPkt(data, size, from);
}
else if (it == streams.end())
{
/* peer unknown */
std::cerr << "UdpSorter::recvPkt() Peer Unknown!";
std::cerr << std::endl;
}
else
{
/* forward to them */
std::cerr << "UdpSorter::recvPkt() Sending to UdpPeer: ";
std::cerr << it->first;
std::cerr << std::endl;
(it->second)->recvPkt(data, size);
}
sortMtx.unlock(); /******** UNLOCK MUTEX *********/
/* done */
}
int UdpSorter::sendPkt(void *data, int size, struct sockaddr_in &to, int ttl)
{
/* print packet information */
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::sendPkt(" << size << ") ttl: " << ttl;
std::cerr << " to: " << to;
std::cerr << std::endl;
#endif
/* send to udpLayer */
return udpLayer->sendPkt(data, size, to, ttl);
}
int UdpSorter::status(std::ostream &out)
{
sortMtx.lock(); /********** LOCK MUTEX *********/
out << "UdpSorter::status()" << std::endl;
out << "localaddr: " << laddr << std::endl;
out << "UdpSorter::peers:" << std::endl;
std::map<struct sockaddr_in, UdpPeer *>::iterator it;
for(it = streams.begin(); it != streams.end(); it++)
{
out << "\t" << it->first << std::endl;
}
out << std::endl;
sortMtx.unlock(); /******** UNLOCK MUTEX *********/
udpLayer->status(out);
return 1;
}
/* setup connections */
int UdpSorter::openSocket()
{
udpLayer = new UdpLayer(this, laddr);
udpLayer->start();
return 1;
}
/* monitoring / updates */
int UdpSorter::okay()
{
return udpLayer->okay();
}
int UdpSorter::tick()
{
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::tick()" << std::endl;
#endif
return 1;
}
int UdpSorter::close()
{
/* TODO */
return 1;
}
/* add a TCPonUDP stream */
int UdpSorter::addUdpPeer(UdpPeer *peer, const struct sockaddr_in &raddr)
{
sortMtx.lock(); /********** LOCK MUTEX *********/
/* check for duplicate */
std::map<struct sockaddr_in, UdpPeer *>::iterator it;
it = streams.find(raddr);
bool ok = (it == streams.end());
if (!ok)
{
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::addUdpPeer() Peer already exists!" << std::endl;
std::cerr << "UdpSorter::addUdpPeer() ERROR" << std::endl;
#endif
}
else
{
streams[raddr] = peer;
}
sortMtx.unlock(); /******** UNLOCK MUTEX *********/
return ok;
}
/******************************* STUN Handling ********************************/
/* respond */
bool UdpSorter::handleStunPkt(void *data, int size, struct sockaddr_in &from)
{
if (size == 20) /* request */
{
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::handleStunPkt() got Request";
std::cerr << std::endl;
#endif
/* generate a response */
int len;
void *pkt = generate_stun_reply(&from, &len);
if (!pkt)
return false;
int sentlen = sendPkt(pkt, len, from, STUN_TTL);
free(pkt);
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::handleStunPkt() sent Response size:" << sentlen;
std::cerr << std::endl;
#endif
return (len == sentlen);
}
else if (size == 28)
{
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::handleStunPkt() got Response";
std::cerr << std::endl;
#endif
/* got response */
struct sockaddr_in eAddr;
bool good = response(data, size, eAddr);
if (good)
{
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::handleStunPkt() got Ext Addr: ";
std::cerr << inet_ntoa(eAddr.sin_addr) << ":" << ntohs(eAddr.sin_port);
std::cerr << std::endl;
#endif
eaddrKnown = true;
eaddr = eAddr;
return true;
}
}
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::handleStunPkt() Bad Packet";
std::cerr << std::endl;
#endif
return false;
}
bool UdpSorter::addStunPeer(const struct sockaddr_in &remote, const char *peerid)
{
/* add to the list */
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::addStunPeer()";
std::cerr << std::endl;
std::cerr << "UdpSorter::addStunPeer() - just stun it!";
std::cerr << std::endl;
#endif
doStun(remote);
return false;
}
bool UdpSorter::externalAddr(struct sockaddr_in &external)
{
if (eaddrKnown)
{
external = eaddr;
return true;
}
return false;
}
int UdpSorter::doStun(struct sockaddr_in stun_addr)
{
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::doStun()";
std::cerr << std::endl;
#endif
/* send out a stun packet -> save in the local variable */
if (!okay())
{
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::doStun() Not Active";
std::cerr << std::endl;
#endif
}
#define MAX_STUN_SIZE 64
char stundata[MAX_STUN_SIZE];
int tmplen = MAX_STUN_SIZE;
bool done = generate_stun_pkt(stundata, &tmplen);
if (!done)
{
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::doStun() Failed";
std::cerr << std::endl;
#endif
//pqioutput(PQL_ALERT, pqistunzone, "pqistunner::stun() Failed!");
return 0;
}
/* send it off */
int sentlen = sendPkt(stundata, tmplen, stun_addr, STUN_TTL);
#ifdef DEBUG_UDP_SORTER
std::ostringstream out;
out << "UdpSorter::doStun() Sent Stun Packet(" << sentlen << ") from:";
out << inet_ntoa(laddr.sin_addr) << ":" << ntohs(laddr.sin_port);
out << " to:";
out << inet_ntoa(stun_addr.sin_addr) << ":" << ntohs(stun_addr.sin_port);
std::cerr << out.str() << std::endl;
//pqioutput(PQL_ALERT, pqistunzone, out.str());
#endif
return 1;
}
bool UdpSorter::response(void *stun_pkt, int size, struct sockaddr_in &addr)
{
/* check what type it is */
if (size < 28)
{
return false;
}
if (((uint16_t *) stun_pkt)[0] != 0x0101)
{
/* not a response */
return false;
}
/* iterate through the packet */
/* for now assume the address follows the header directly */
/* all stay in netbyteorder! */
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = ((uint32_t *) stun_pkt)[6];
addr.sin_port = ((uint16_t *) stun_pkt)[11];
#ifdef DEBUG_UDP_SORTER
std::ostringstream out;
out << "UdpSorter::response() Recvd a Stun Response, ext_addr: ";
out << inet_ntoa(addr.sin_addr) << ":" << ntohs(addr.sin_port);
std::cerr << out.str() << std::endl;
#endif
return true;
}
bool UdpSorter::generate_stun_pkt(void *stun_pkt, int *len)
{
if (*len < 20)
{
return false;
}
/* just the header */
((uint16_t *) stun_pkt)[0] = 0x0001;
((uint16_t *) stun_pkt)[1] = 20; /* only header */
/* transaction id - should be random */
((uint32_t *) stun_pkt)[1] = 0x0020;
((uint32_t *) stun_pkt)[2] = 0x0121;
((uint32_t *) stun_pkt)[3] = 0x0111;
((uint32_t *) stun_pkt)[4] = 0x1010;
*len = 20;
return true;
}
void *UdpSorter::generate_stun_reply(struct sockaddr_in *stun_addr, int *len)
{
/* just the header */
void *stun_pkt = malloc(28);
((uint16_t *) stun_pkt)[0] = 0x0101;
((uint16_t *) stun_pkt)[1] = 28; /* only header + 8 byte addr */
/* transaction id - should be random */
((uint32_t *) stun_pkt)[1] = 0x0020;
((uint32_t *) stun_pkt)[2] = 0x0121;
((uint32_t *) stun_pkt)[3] = 0x0111;
((uint32_t *) stun_pkt)[4] = 0x1010;
/* now add address
* 0 1 2 3
* <INET> <port>
* <inet address>
*/
((uint32_t *) stun_pkt)[6] = stun_addr->sin_addr.s_addr;
((uint16_t *) stun_pkt)[11] = stun_addr->sin_port;
*len = 28;
return stun_pkt;
}
bool UdpSorter::isStunPacket(void *data, int size)
{
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::isStunPacket() ?";
std::cerr << std::endl;
#endif
if (size < 20)
{
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::isStunPacket() (size < 20) -> false";
std::cerr << std::endl;
#endif
return false;
}
/* match size field */
uint16_t pktsize = ((uint16_t *) data)[1];
if (size != pktsize)
{
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::isStunPacket() (size != pktsize) -> false";
std::cerr << std::endl;
#endif
return false;
}
if ((size == 20) && (0x0001 == ((uint16_t *) data)[0]))
{
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::isStunPacket() (size=20 & data[0]=0x0001) -> true";
std::cerr << std::endl;
#endif
/* request */
return true;
}
if ((size == 28) && (0x0101 == ((uint16_t *) data)[0]))
{
#ifdef DEBUG_UDP_SORTER
std::cerr << "UdpSorter::isStunPacket() (size=28 & data[0]=0x0101) -> true";
std::cerr << std::endl;
#endif
/* response */
return true;
}
return false;
}

View File

@ -0,0 +1,102 @@
/*
* libretroshare/src/tcponudp: udpsorter.h
*
* TCP-on-UDP (tou) network interface for RetroShare.
*
* Copyright 2007-2008 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#ifndef TOU_UDP_SORTER_H
#define TOU_UDP_SORTER_H
/* universal networking functions */
#include "tou_net.h"
#include <iosfwd>
#include <map>
#include "udplayer.h"
/* UdpSorter ..... filters the UDP packets.
*/
class UdpPeer
{
public:
virtual void recvPkt(void *data, int size) = 0;
};
class UdpSorter: public UdpReceiver
{
public:
UdpSorter(struct sockaddr_in &local);
virtual ~UdpSorter() { return; }
/* add a TCPonUDP stream */
int addUdpPeer(UdpPeer *peer, const struct sockaddr_in &raddr);
bool addStunPeer(const struct sockaddr_in &remote, const char *peerid);
bool externalAddr(struct sockaddr_in &remote);
/* Packet IO */
/* pass-through send packets */
int sendPkt(void *data, int size, struct sockaddr_in &to, int ttl);
/* callback for recved data (overloaded from UdpReceiver) */
virtual void recvPkt(void *data, int size, struct sockaddr_in &from);
int status(std::ostream &out);
/* setup connections */
int openSocket();
/* monitoring / updates */
int okay();
int tick();
int close();
private:
/* STUN handling */
bool isStunPacket(void *data, int size);
bool handleStunPkt(void *data, int size, struct sockaddr_in &from);
int doStun(struct sockaddr_in stun_addr);
bool response(void *stun_pkt, int size, struct sockaddr_in &addr);
void *generate_stun_reply(struct sockaddr_in *stun_addr, int *len);
bool generate_stun_pkt(void *stun_pkt, int *len);
UdpLayer *udpLayer;
RsMutex sortMtx; /* for all class data (below) */
struct sockaddr_in laddr; /* local addr */
struct sockaddr_in eaddr; /* external addr */
bool eaddrKnown;
std::list<struct sockaddr_in> stunPeers; /* potentials */
std::map<struct sockaddr_in, UdpPeer *> streams;
};
#endif

View File

@ -0,0 +1,48 @@
/*
* libretroshare/src/tcponudp: udptestfn.cc
*
* TCP-on-UDP (tou) network interface for RetroShare.
*
* Copyright 2007-2008 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#include "udptestfn.h"
void UdpRecvTest::recvPkt(void *data, int size, struct sockaddr_in &from)
{
/* print packet information */
std::cerr << "UdpRecvTest::recvPkt(" << size << ") from: " << from;
std::cerr << std::endl;
}
UdpPeerTest::UdpPeerTest(struct sockaddr_in &addr)
:raddr(addr)
{
return;
}
void UdpPeerTest::recvPkt(void *data, int size)
{
/* print packet information */
std::cerr << "UdpPeerTest::recvPkt(" << size << ") from: " << raddr;
std::cerr << std::endl;
}

View File

@ -0,0 +1,49 @@
/*
* libretroshare/src/tcponudp: udptestfn.h
*
* TCP-on-UDP (tou) network interface for RetroShare.
*
* Copyright 2007-2008 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#include "udplayer.h"
#include "udpsorter.h"
#ifndef TOU_UDP_TEST_FN_H
#define TOU_UDP_TEST_FN_H
class UdpRecvTest: public UdpReceiver
{
public:
virtual void recvPkt(void *data, int size, struct sockaddr_in &from);
};
class UdpPeerTest: public UdpPeer
{
public:
UdpPeerTest(struct sockaddr_in &addr);
virtual void recvPkt(void *data, int size);
struct sockaddr_in raddr;
};
#endif