From de0ce110b9acd1a896b347154cc83f2282907315 Mon Sep 17 00:00:00 2001 From: drbob Date: Fri, 25 Jan 2008 06:11:39 +0000 Subject: [PATCH] The first commit of the new UDP Connection methods and the rewrite of the retroshare core networking stack. This check-in commits the changes to the TCPonUCP code. This library has been significantly modified to support multiple UDP "connections" from a single port. This requires some trickery and a listener thread. Code to "STUN" peers was also added. git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@305 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/tcponudp/Makefile | 28 +- libretroshare/src/tcponudp/tcpstream.cc | 590 +++++++++++++++------ libretroshare/src/tcponudp/tcpstream.h | 74 ++- libretroshare/src/tcponudp/tou.cc | 301 +++-------- libretroshare/src/tcponudp/tou.h | 63 ++- libretroshare/src/tcponudp/udp_server.cc | 33 +- libretroshare/src/tcponudp/udplayer.cc | 293 +++++----- libretroshare/src/tcponudp/udplayer.h | 64 +-- libretroshare/src/tcponudp/udpsock_test.cc | 91 ++++ libretroshare/src/tcponudp/udpsort_test.cc | 91 ++++ libretroshare/src/tcponudp/udpsorter.cc | 441 +++++++++++++++ libretroshare/src/tcponudp/udpsorter.h | 102 ++++ libretroshare/src/tcponudp/udptestfn.cc | 48 ++ libretroshare/src/tcponudp/udptestfn.h | 49 ++ 14 files changed, 1610 insertions(+), 658 deletions(-) create mode 100644 libretroshare/src/tcponudp/udpsock_test.cc create mode 100644 libretroshare/src/tcponudp/udpsort_test.cc create mode 100644 libretroshare/src/tcponudp/udpsorter.cc create mode 100644 libretroshare/src/tcponudp/udpsorter.h create mode 100644 libretroshare/src/tcponudp/udptestfn.cc create mode 100644 libretroshare/src/tcponudp/udptestfn.h diff --git a/libretroshare/src/tcponudp/Makefile b/libretroshare/src/tcponudp/Makefile index 6f6a63cf8..219548d33 100644 --- a/libretroshare/src/tcponudp/Makefile +++ b/libretroshare/src/tcponudp/Makefile @@ -3,23 +3,37 @@ RS_TOP_DIR=.. include $(RS_TOP_DIR)/make.opt -EXECS = librs udp_server test_tou pair_tou reset_tou internal_tou largefile_tou +EXECS = librs udpsock_test udpsort_test udp_server +#test_tou pair_tou reset_tou internal_tou largefile_tou -OBJ = tcpstream.o tcppacket.o udplayer.o tou_net.o tou.o +OBJ = tou_net.o udplayer.o udpsorter.o udptestfn.o +OBJ += tcppacket.o tcpstream.o tou.o + +#tou.o all : $(OBJ) $(EXECS) .cc.o: $(CC) $(CFLAGS) -c $< -udp_server: $(OBJ) udp_server.o - $(CC) $(CFLAGS) -o udp_server $(OBJ) udp_server.o $(LIBS) - clean: - -$(RM) $(OBJ) $(BIOOBJ) test_tou.o pair_tou.o udp_server.o reset_tou.o internal_tou.o largefile_tou.o + -$(RM) $(OBJ) $(BIOOBJ) udpsock_test.o udpsort_test.o udp_server.o + +#test_tou.o pair_tou.o udp_server.o reset_tou.o internal_tou.o largefile_tou.o clobber: clean - -$(RM) udp_server test_tou pair_tou reset_tou internal_tou largefile_tou libtou.so ../lib/libtou.a + -$(RM) udpsock_test udpsort_test udp_server + +#test_tou pair_tou reset_tou internal_tou largefile_tou libtou.so ../lib/libtou.a + +udpsock_test : $(OBJ) udpsock_test.o + $(CC) $(CFLAGS) -o udpsock_test $(OBJ) udpsock_test.o -lpthread -L../lib -lretroshare + +udpsort_test : $(OBJ) udpsort_test.o + $(CC) $(CFLAGS) -o udpsort_test $(OBJ) udpsort_test.o -lpthread -L../lib -lretroshare + +udp_server: $(OBJ) udp_server.o + $(CC) $(CFLAGS) -o udp_server $(OBJ) udp_server.o -lpthread -L../lib -lretroshare test_tou : $(OBJ) test_tou.o $(CC) $(CFLAGS) -o test_tou $(OBJ) test_tou.o $(LIBS) diff --git a/libretroshare/src/tcponudp/tcpstream.cc b/libretroshare/src/tcponudp/tcpstream.cc index a2d4a53b7..16eadfdc8 100644 --- a/libretroshare/src/tcponudp/tcpstream.cc +++ b/libretroshare/src/tcponudp/tcpstream.cc @@ -39,6 +39,7 @@ /* * #define DEBUG_TCP_STREAM 1 */ +#define DEBUG_TCP_STREAM 1 /* *#define DEBUG_TCP_STREAM_EXTRA 1 @@ -64,8 +65,7 @@ static const double RTT_ALPHA = 0.875; // platform independent fractional timestamp. static double getCurrentTS(); - -TcpStream::TcpStream(UdpLayer *lyr) +TcpStream::TcpStream(UdpSorter *lyr) :inSize(0), outSizeRead(0), outSizeNet(0), state(TCP_CLOSED), inStreamActive(false), @@ -84,19 +84,25 @@ TcpStream::TcpStream(UdpLayer *lyr) congestThreshold(TCP_MAX_WIN), congestWinSize(MAX_SEG), congestUpdate(0), + peerKnown(false), udp(lyr) { return; } /* Stream Control! */ -int TcpStream::connect() +int TcpStream::connect(const struct sockaddr_in &raddr) { + tcpMtx.lock(); /********** LOCK MUTEX *********/ + + setRemoteAddress(raddr); + /* check state */ if (state != TCP_CLOSED) { if (state == TCP_ESTABLISHED) { + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ return 0; } else if (state < TCP_ESTABLISHED) @@ -108,10 +114,10 @@ int TcpStream::connect() // major issues! errorState = EFAULT; } + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ return -1; } - /* setup Seqnos */ outSeqno = genSequenceNo(); initOurSeqno = outSeqno; @@ -140,25 +146,68 @@ int TcpStream::connect() * This should help the firewalls along. */ - udp -> setTTL(1); + setTTL(1); toSend(pkt); /* change state */ state = TCP_SYN_SENT; std::cerr << "TcpStream STATE -> TCP_SYN_SENT" << std::endl; errorState = EAGAIN; + + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return -1; } + +int TcpStream::listenfor(const struct sockaddr_in &raddr) +{ + tcpMtx.lock(); /********** LOCK MUTEX *********/ + + setRemoteAddress(raddr); + + /* check state */ + if (state != TCP_CLOSED) + { + if (state == TCP_ESTABLISHED) + { + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return 0; + } + else if (state < TCP_ESTABLISHED) + { + errorState = EAGAIN; + } + else + { + // major issues! + errorState = EFAULT; + } + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return -1; + } + + errorState = EAGAIN; + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return -1; +} + + /* Stream Control! */ int TcpStream::close() { + tcpMtx.lock(); /********** LOCK MUTEX *********/ + cleanup(); + + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ return 0; } int TcpStream::closeWrite() { + tcpMtx.lock(); /********** LOCK MUTEX *********/ + /* check state */ /* will always close socket.... */ /* if in TCP_ESTABLISHED.... @@ -206,6 +255,7 @@ int TcpStream::closeWrite() #ifdef DEBUG_TCP_STREAM std::cerr << "TcpStream::close() Flag Set" << std::endl; #endif + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ return 0; } @@ -213,67 +263,28 @@ int TcpStream::closeWrite() std::cerr << "TcpStream::close() pending" << std::endl; #endif errorState = EAGAIN; + + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ return -1; } -int TcpStream::cleanup() -{ - // This shuts it all down! no matter what. - - outStreamActive = false; - inStreamActive = false; - state = TCP_CLOSED; - std::cerr << "TcpStream STATE -> TCP_CLOSED" << std::endl; - - /* Ensure that TTL -> STD for further STUN ATTEMPTS */ - if (udp) - { - udp -> setTTL(TCP_STD_TTL); - } - - // clear arrays. - inSize = 0; - while(inQueue.size() > 0) - { - dataBuffer *db = inQueue.front(); - inQueue.pop_front(); - delete db; - } - - while(outPkt.size() > 0) - { - TcpPacket *pkt = outPkt.front(); - outPkt.pop_front(); - delete pkt; - } - - - // clear arrays. - outSizeRead = 0; - outSizeNet = 0; - while(outQueue.size() > 0) - { - dataBuffer *db = outQueue.front(); - outQueue.pop_front(); - delete db; - } - - while(inPkt.size() > 0) - { - TcpPacket *pkt = inPkt.front(); - inPkt.pop_front(); - delete pkt; - } - return 1; -} - bool TcpStream::isConnected() { - return (state == TCP_ESTABLISHED); + tcpMtx.lock(); /********** LOCK MUTEX *********/ + + bool isConn = (state == TCP_ESTABLISHED); + + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + + return isConn; } int TcpStream::status(std::ostream &out) { + tcpMtx.lock(); /********** LOCK MUTEX *********/ + + int tmpstate = state; + // can leave the timestamp here as time()... rough but okay. out << "TcpStream::status @ (" << time(NULL) << ")" << std::endl; out << "TcpStream::state = " << (int) state << std::endl; @@ -295,53 +306,78 @@ int TcpStream::status(std::ostream &out) out << " winsize: " << inWinSize; out << std::endl; out << std::endl; - return state; + + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + + return tmpstate; } int TcpStream::write_allowed() { + tcpMtx.lock(); /********** LOCK MUTEX *********/ + + int ret = 1; if (state == TCP_CLOSED) { errorState = EBADF; - return -1; + ret = -1; } else if (state < TCP_ESTABLISHED) { errorState = EAGAIN; - return -1; + ret = -1; } else if (!outStreamActive) { errorState = EBADF; - return -1; + ret = -1; + } + + if (ret < 1) + { + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return ret; } int maxwrite = (kMaxQueueSize - inQueue.size()) * MAX_SEG; + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ return maxwrite; } - int TcpStream::read_pending() { + tcpMtx.lock(); /********** LOCK MUTEX *********/ + /* error should be detected next time */ - int maxread = outSizeRead + outQueue.size() * MAX_SEG + outSizeNet; + int maxread = int_read_pending(); if (state == TCP_CLOSED) { errorState = EBADF; - return -1; + maxread = -1; } else if (state < TCP_ESTABLISHED) { errorState = EAGAIN; - return -1; + maxread = -1; } + + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return maxread; } +/* INTERNAL */ +int TcpStream::int_read_pending() +{ + return outSizeRead + outQueue.size() * MAX_SEG + outSizeNet; +} + /* stream Interface */ int TcpStream::write(char *dta, int size) /* write -> pkt -> net */ { + tcpMtx.lock(); /********** LOCK MUTEX *********/ + int ret = 1; /* initial error checking */ #ifdef DEBUG_TCP_STREAM_EXTRA static uint32 TMPtotalwrite = 0; @@ -353,7 +389,7 @@ static uint32 TMPtotalwrite = 0; std::cerr << "TcpStream::write() Error TCP_CLOSED" << std::endl; #endif errorState = EBADF; - return -1; + ret = -1; } else if (state < TCP_ESTABLISHED) { @@ -361,7 +397,7 @@ static uint32 TMPtotalwrite = 0; std::cerr << "TcpStream::write() Error TCP Not Established" << std::endl; #endif errorState = EAGAIN; - return -1; + ret = -1; } else if (inQueue.size() > kMaxQueueSize) { @@ -369,7 +405,7 @@ static uint32 TMPtotalwrite = 0; std::cerr << "TcpStream::write() Error EAGAIN" << std::endl; #endif errorState = EAGAIN; - return -1; + ret = -1; } else if (!outStreamActive) { @@ -377,8 +413,15 @@ static uint32 TMPtotalwrite = 0; std::cerr << "TcpStream::write() Error TCP_CLOSED" << std::endl; #endif errorState = EBADF; - return -1; + ret = -1; } + + if (ret < 1) /* check for initial error */ + { + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return ret; + } + #ifdef DEBUG_TCP_STREAM_EXTRA std::cerr << "TcpStream::write() = Will Succeed " << size << std::endl; @@ -404,6 +447,8 @@ static uint32 TMPtotalwrite = 0; inSize += size; //std::cerr << "Small Packet - write to net:" << std::endl; //std::cerr << printPkt(dta, size) << std::endl; + + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ return size; } @@ -459,7 +504,6 @@ static uint32 TMPtotalwrite = 0; #endif memcpy((void *) inData, (void *) &(dta[size-remSize]), remSize); inSize = remSize; - return size; } else { @@ -470,11 +514,14 @@ static uint32 TMPtotalwrite = 0; inSize = 0; } + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ return size; } int TcpStream::read(char *dta, int size) /* net -> pkt -> read */ { + tcpMtx.lock(); /********** LOCK MUTEX *********/ + #ifdef DEBUG_TCP_STREAM_EXTRA static uint32 TMPtotalread = 0; #endif @@ -483,28 +530,34 @@ static uint32 TMPtotalread = 0; */ int maxread = outSizeRead + outQueue.size() * MAX_SEG + outSizeNet; + int ret = 1; /* used only for initial errors */ if (state == TCP_CLOSED) { errorState = EBADF; - return -1; + ret = -1; } else if (state < TCP_ESTABLISHED) { errorState = EAGAIN; - return -1; + ret = -1; } else if ((!inStreamActive) && (maxread == 0)) { // finished stream. - return 0; + ret = 0; } - - if (maxread == 0) + else if (maxread == 0) { /* must wait for more data */ errorState = EAGAIN; - return -1; + ret = -1; + } + + if (ret < 1) /* if ret has been changed */ + { + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return ret; } if (maxread < size) @@ -520,6 +573,7 @@ static uint32 TMPtotalread = 0; std::cerr << std::endl; #endif errorState = EAGAIN; + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ return -1; } #endif /* TCP_NO_PARTIAL_READ */ @@ -555,6 +609,7 @@ static uint32 TMPtotalread = 0; /* can allow more in! - update inWinSize */ UpdateInWinSize(); + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ return size; } @@ -618,6 +673,7 @@ static uint32 TMPtotalread = 0; /* can allow more in! - update inWinSize */ UpdateInWinSize(); + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ return size; } #ifdef DEBUG_TCP_STREAM_EXTRA @@ -672,6 +728,7 @@ static uint32 TMPtotalread = 0; UpdateInWinSize(); + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ return size; } @@ -689,23 +746,32 @@ static uint32 TMPtotalread = 0; /* can allow more in! - update inWinSize */ UpdateInWinSize(); + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ return size; } -int TcpStream::recv() + /* Callback from lower Layers */ +void TcpStream::recvPkt(void *data, int size) { - int maxsize = MAX_SEG + TCP_PSEUDO_HDR_SIZE; - int size = maxsize; - uint8 input[maxsize]; - double cts = getCurrentTS(); // fractional seconds. +#ifdef DEBUG_TCP_STREAM + std::cerr << "TcpStream::recvPkt()"; + std::cerr << std::endl; +#endif + tcpMtx.lock(); /********** LOCK MUTEX *********/ + uint8 *input = (uint8 *) data; + +#ifdef DEBUG_TCP_STREAM + std::cerr << "TcpStream::recvPkt() Past Lock!"; + std::cerr << std::endl; +#endif #ifdef DEBUG_TCP_STREAM if (state > TCP_SYN_RCVD) { int availRead = outSizeRead + outQueue.size() * MAX_SEG + outSizeNet; - std::cerr << "TcpStream::recv() CC: "; + std::cerr << "TcpStream::recvPkt() CC: "; std::cerr << " iWS: " << inWinSize; std::cerr << " aRead: " << availRead; std::cerr << " iAck: " << inAckno; @@ -718,29 +784,182 @@ int TcpStream::recv() } #endif - - for(;0 < udp -> readPkt(input, &size); size = maxsize) +#ifdef DEBUG_TCP_STREAM + std::cerr << "TcpStream::recv() ReadPkt(" << size << ")" << std::endl; + //std::cerr << printPkt(input, size); + //std::cerr << std::endl; +#endif + TcpPacket *pkt = new TcpPacket(); + if (0 < pkt -> readPacket(input, size)) + { + lastIncomingPkt = getCurrentTS(); + handleIncoming(pkt); + } + else { #ifdef DEBUG_TCP_STREAM - std::cerr << "TcpStream::recv() ReadPkt(" << size << ")" << std::endl; - //std::cerr << printPkt(input, size); - //std::cerr << std::endl; + std::cerr << "TcpStream::recv() Bad Packet Deleting!"; + std::cerr << std::endl; #endif - TcpPacket *pkt = new TcpPacket(); - if (0 < pkt -> readPacket(input, size)) - { - lastIncomingPkt = cts; - handleIncoming(pkt); - } - else - { -#ifdef DEBUG_TCP_STREAM - std::cerr << "TcpStream::recv() Bad Packet Deleting!"; - std::cerr << std::endl; -#endif - delete pkt; - } + delete pkt; } + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return; +} + + +int TcpStream::tick() +{ + tcpMtx.lock(); /********** LOCK MUTEX *********/ + + //std::cerr << "TcpStream::tick()" << std::endl; + recv_check(); /* recv is async */ + send(); + + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + + return 1; +} + +bool TcpStream::getRemoteAddress(struct sockaddr_in &raddr) +{ + tcpMtx.lock(); /********** LOCK MUTEX *********/ + + if (peerKnown) + { + raddr = peeraddr; + } + + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + + return peerKnown; +} + +uint8 TcpStream::TcpState() +{ + tcpMtx.lock(); /********** LOCK MUTEX *********/ + + uint8 err = state; + + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + + return err; +} + +int TcpStream::TcpErrorState() +{ + tcpMtx.lock(); /********** LOCK MUTEX *********/ + + int err = errorState; + + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + + return err; +} + + + +/********************* SOME EXPOSED DEBUGGING FNS ******************/ + +static int ilevel = 100; + +bool TcpStream::widle() +{ + tcpMtx.lock(); /********** LOCK MUTEX *********/ + /* init */ + if (!lastWriteTF) + { + lastWriteTF = int_wbytes(); + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return false; + } + + if ((lastWriteTF == int_wbytes()) && (inSize + inQueue.size() == 0)) + { + wcount++; + if (wcount > ilevel) + { + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return true; + } + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return false; + } + wcount = 0; + lastWriteTF = int_wbytes(); + + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return false; +} + + +bool TcpStream::ridle() +{ + tcpMtx.lock(); /********** LOCK MUTEX *********/ + /* init */ + if (!lastReadTF) + { + lastReadTF = int_rbytes(); + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return false; + } + + if ((lastReadTF == int_rbytes()) && (outSizeRead + outQueue.size() + outSizeNet== 0)) + { + rcount++; + if (rcount > ilevel) + { + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return true; + } + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return false; + } + rcount = 0; + lastReadTF = int_rbytes(); + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return false; +} + +uint32 TcpStream::wbytes() +{ + tcpMtx.lock(); /********** LOCK MUTEX *********/ + uint32 wb = int_wbytes(); + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return wb; +} + +uint32 TcpStream::rbytes() +{ + tcpMtx.lock(); /********** LOCK MUTEX *********/ + uint32 rb = int_rbytes(); + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + return rb; +} + +/********************* ALL BELOW HERE IS INTERNAL ****************** + ******************* AND ALWAYS PROTECTED BY A MUTEX ***************/ + +int TcpStream::recv_check() +{ + double cts = getCurrentTS(); // fractional seconds. + +#ifdef DEBUG_TCP_STREAM + if (state > TCP_SYN_RCVD) + { + int availRead = outSizeRead + outQueue.size() * MAX_SEG + outSizeNet; + std::cerr << "TcpStream::recv_check() CC: "; + std::cerr << " iWS: " << inWinSize; + std::cerr << " aRead: " << availRead; + std::cerr << " iAck: " << inAckno; + std::cerr << std::endl; + } + else + { + std::cerr << "TcpStream::recv_check() Not Connected"; + std::cerr << std::endl; + } +#endif // make sure we've rcvd something! if ((state > TCP_SYN_RCVD) && @@ -761,6 +980,56 @@ int TcpStream::recv() return 1; } +int TcpStream::cleanup() +{ + // This shuts it all down! no matter what. + + outStreamActive = false; + inStreamActive = false; + state = TCP_CLOSED; + std::cerr << "TcpStream STATE -> TCP_CLOSED" << std::endl; + + //peerKnown = false; //??? NOT SURE -> for a rapid reconnetion this might be key?? + + /* reset TTL */ + setTTL(TCP_STD_TTL); + + // clear arrays. + inSize = 0; + while(inQueue.size() > 0) + { + dataBuffer *db = inQueue.front(); + inQueue.pop_front(); + delete db; + } + + while(outPkt.size() > 0) + { + TcpPacket *pkt = outPkt.front(); + outPkt.pop_front(); + delete pkt; + } + + + // clear arrays. + outSizeRead = 0; + outSizeNet = 0; + while(outQueue.size() > 0) + { + dataBuffer *db = outQueue.front(); + outQueue.pop_front(); + delete db; + } + + while(inPkt.size() > 0) + { + TcpPacket *pkt = inPkt.front(); + inPkt.pop_front(); + delete pkt; + } + return 1; +} + int TcpStream::handleIncoming(TcpPacket *pkt) { #ifdef DEBUG_TCP_STREAM @@ -925,7 +1194,7 @@ int TcpStream::incoming_Closed(TcpPacket *pkt) /* seq + winsize set in toSend() */ /* as we have received something ... we can up the TTL */ - udp -> setTTL(TCP_STD_TTL); + setTTL(TCP_STD_TTL); #ifdef DEBUG_TCP_STREAM std::cerr << "TcpStream::incoming_Closed() Sending reply" << std::endl; @@ -984,7 +1253,7 @@ int TcpStream::incoming_SynSent(TcpPacket *pkt) * As they have sent something, and we have received * through the firewall, set to STD. */ - udp -> setTTL(TCP_STD_TTL); + setTTL(TCP_STD_TTL); /* ack the Syn Packet */ sendAck(); @@ -1065,7 +1334,7 @@ int TcpStream::incoming_SynRcvd(TcpPacket *pkt) /* As they have sent something, and we have received * through the firewall, set to STD. */ - udp -> setTTL(TCP_STD_TTL); + setTTL(TCP_STD_TTL); /* change state */ state = TCP_ESTABLISHED; @@ -1361,7 +1630,7 @@ int TcpStream::UpdateInWinSize() * */ - uint32 queuedData = read_pending(); + uint32 queuedData = int_read_pending(); if (queuedData < maxWinSize) { inWinSize = maxWinSize; @@ -1390,12 +1659,26 @@ int TcpStream::sendAck() return toSend(new TcpPacket(), false); } +void TcpStream::setRemoteAddress(const struct sockaddr_in &raddr) +{ + peeraddr = raddr; + peerKnown = true; +} + int TcpStream::toSend(TcpPacket *pkt, bool retrans) { int outPktSize = MAX_SEG + TCP_PSEUDO_HDR_SIZE; char tmpOutPkt[outPktSize]; + if (!peerKnown) + { + /* Major Error! */ + std::cerr << "TcpStream::toSend() peerUnknown ERROR!!!"; + std::cerr << std::endl; + exit(1); + } + /* get accurate timestamp */ double cts = getCurrentTS(); @@ -1446,7 +1729,7 @@ int TcpStream::toSend(TcpPacket *pkt, bool retrans) //std::cerr << printPkt(tmpOutPkt, outPktSize) << std::endl; #endif - udp -> sendPkt(tmpOutPkt, outPktSize); + udp -> sendPkt(tmpOutPkt, outPktSize, peeraddr, ttl); if (retrans) { @@ -1475,6 +1758,14 @@ int TcpStream::retrans() int outPktSize = MAX_SEG + TCP_PSEUDO_HDR_SIZE; char tmpOutPkt[outPktSize]; bool updateCongestion = true; + + if (!peerKnown) + { + /* Major Error! */ + std::cerr << "TcpStream::retrans() peerUnknown ERROR!!!"; + std::cerr << std::endl; + exit(1); + } /* now retrans */ double cts = getCurrentTS(); @@ -1571,9 +1862,9 @@ int TcpStream::retrans() * we should increase the ttl. */ - if ((pkt->hasSyn()) && (udp -> getTTL() < TCP_STD_TTL)) + if ((pkt->hasSyn()) && (getTTL() < TCP_STD_TTL)) { - udp -> setTTL(1 + pkt->retrans / + setTTL(1 + pkt->retrans / TCP_STARTUP_COUNT_PER_TTL); std::cerr << "TcpStream::retrans() Startup SYNs"; @@ -1609,7 +1900,7 @@ int TcpStream::retrans() } - udp -> sendPkt(tmpOutPkt, outPktSize); + udp -> sendPkt(tmpOutPkt, outPktSize, peeraddr, ttl); /* restart timers */ (*it) -> ts = cts; @@ -1770,8 +2061,6 @@ void TcpStream::acknowledge() } -/* somehow managed to delete everything.... second time lucky */ - int TcpStream::send() { /* handle network interface always */ @@ -1945,23 +2234,24 @@ int TcpStream::send() { sendAck(); } +#ifdef DEBUG_TCP_STREAM_EXTRA + else + { + std::cerr << "TcpStream::send() No Ack"; + std::cerr << std::endl; + } +#endif } +#ifdef DEBUG_TCP_STREAM_EXTRA + else + { + std::cerr << "TcpStream::send() Stuff Sent"; + std::cerr << std::endl; + } +#endif return 1; } - - - - - - -int TcpStream::tick() -{ - //std::cerr << "TcpStream::tick()" << std::endl; - recv(); - send(); - return 1; -} uint32 TcpStream::genSequenceNo() { @@ -2018,65 +2308,19 @@ static double getCurrentTS() - - -static int ilevel = 100; - -bool TcpStream::widle() -{ - /* init */ - if (!lastWriteTF) - { - lastWriteTF = wbytes(); - return false; - } - - if ((lastWriteTF == wbytes()) && (inSize + inQueue.size() == 0)) - { - wcount++; - if (wcount > ilevel) - return true; - return false; - } - wcount = 0; - lastWriteTF = wbytes(); - return false; -} - - -bool TcpStream::ridle() -{ - /* init */ - if (!lastReadTF) - { - lastReadTF = rbytes(); - return false; - } - - if ((lastReadTF == rbytes()) && (outSizeRead + outQueue.size() + outSizeNet== 0)) - { - rcount++; - if (rcount > ilevel) - return true; - return false; - } - rcount = 0; - lastReadTF = rbytes(); - return false; -} - -uint32 TcpStream::wbytes() +uint32 TcpStream::int_wbytes() { return outSeqno - initOurSeqno - 1; } -uint32 TcpStream::rbytes() +uint32 TcpStream::int_rbytes() { return inAckno - initPeerSeqno - 1; } + /********* Special debugging stuff *****/ #ifdef DEBUG_TCP_STREAM_EXTRA diff --git a/libretroshare/src/tcponudp/tcpstream.h b/libretroshare/src/tcponudp/tcpstream.h index 4463f3fec..0ae0d4a04 100644 --- a/libretroshare/src/tcponudp/tcpstream.h +++ b/libretroshare/src/tcponudp/tcpstream.h @@ -36,7 +36,7 @@ */ #include "tcppacket.h" -#include "udplayer.h" +#include "udpsorter.h" #define MAX_SEG 1500 #define TCP_MAX_SEQ UINT_MAX @@ -68,25 +68,23 @@ class dataBuffer #include -class TcpStream +class TcpStream: public UdpPeer { public: + /* Top-Level exposed */ - TcpStream(UdpLayer *lyr); + TcpStream(UdpSorter *lyr); /* user interface */ int status(std::ostream &out); -int connect(); +int connect(const struct sockaddr_in &raddr); +int listenfor(const struct sockaddr_in &raddr); bool isConnected(); -bool widle(); /* write idle */ -bool ridle(); /* read idle */ -uint32 wbytes(); -uint32 rbytes(); - - /* network interface (unreliable) */ -int receivePkt(void *dta, int size); -int sendPkt(void *dta, int *size); + /* get tcp information */ +bool getRemoteAddress(struct sockaddr_in &raddr); +uint8 TcpState(); +int TcpErrorState(); /* stream Interface */ int write(char *dta, int size); /* write -> pkt -> net */ @@ -100,12 +98,34 @@ int closeWrite(); /* non-standard, but for clean exit */ int close(); /* standard unix behaviour */ int tick(); /* check iface etc */ - /* internal */ + + /* Callback Funcion from UDP Layers */ +virtual void recvPkt(void *data, int size); /* overloaded */ + + + + /* Exposed Data Counting */ +bool widle(); /* write idle */ +bool ridle(); /* read idle */ +uint32 wbytes(); +uint32 rbytes(); + + private: + + /* Internal Functions - use the Mutex (not reentrant) */ + /* Internal Functions - that don't need mutex protection */ + +uint32 genSequenceNo(); +bool isOldSequence(uint32 tst, uint32 curr); + + RsMutex tcpMtx; + + /* Internal Functions - only called inside mutex protection */ int cleanup(); /* incoming data */ -int recv(); +int recv_check(); int handleIncoming(TcpPacket *pkt); int incoming_Closed(TcpPacket *pkt); int incoming_SynSent(TcpPacket *pkt); @@ -117,22 +137,26 @@ int incoming_TimedWait(TcpPacket *pkt); int incoming_Closing(TcpPacket *pkt); int incoming_CloseWait(TcpPacket *pkt); int incoming_LastAck(TcpPacket *pkt); - int check_InPkts(); int UpdateInWinSize(); +int int_read_pending(); /* outgoing data */ +int send(); int toSend(TcpPacket *pkt, bool retrans = true); void acknowledge(); -void calcWinSize(); -int send(); int retrans(); - int sendAck(); +void setRemoteAddress(const struct sockaddr_in &raddr); +int getTTL() { return ttl; } +void setTTL(int t) { ttl = t; } -uint32 genSequenceNo(); -bool isOldSequence(uint32 tst, uint32 curr); +/* data counting */ +uint32 int_wbytes(); +uint32 int_rbytes(); + + /* Internal Data - must have mutex to access! */ /* data (in -> pkts) && (pkts -> out) */ @@ -195,9 +219,15 @@ bool isOldSequence(uint32 tst, uint32 curr); uint32 congestWinSize; uint32 congestUpdate; + /* existing TTL for this stream (tweaked at startup) */ + int ttl; + + struct sockaddr_in peeraddr; + bool peerKnown; + + /* UdpSorter (has own Mutex!) */ + UdpSorter *udp; - /* UdpLayer */ - UdpLayer *udp; }; diff --git a/libretroshare/src/tcponudp/tou.cc b/libretroshare/src/tcponudp/tou.cc index 278c0460f..78c0eb28c 100644 --- a/libretroshare/src/tcponudp/tou.cc +++ b/libretroshare/src/tcponudp/tou.cc @@ -43,10 +43,7 @@ struct TcpOnUdp_t { int tou_fd; int lasterrno; - UdpLayer *udp; TcpStream *tcp; - bool know_eaddr; - struct sockaddr_in extaddr; bool idle; }; @@ -55,32 +52,66 @@ typedef struct TcpOnUdp_t TcpOnUdp; static std::vector tou_streams; static int tou_inited = 0; +static UdpSorter *udps = NULL; static int tou_tick_all(); -static int tou_init() + +/* tou_init - opens the udp port (universal bind) */ +int tou_init(const struct sockaddr *my_addr, socklen_t addrlen) { if (tou_inited) return 1; tou_streams.resize(kInitStreamTable); + udps = new UdpSorter( *((struct sockaddr_in *) my_addr)); + + /* check the bind succeeded */ + if (!(udps->okay())) + { + delete (udps); + udps = NULL; + return -1; + } + tou_inited = 1; return 1; } +/* tou_stunpeer supply tou with stun peers. */ +int tou_stunpeer(const struct sockaddr *my_addr, socklen_t addrlen, + const char *id) +{ + if (!tou_inited) + return -1; + + udps->addStunPeer(*(struct sockaddr_in *) my_addr, id); + return 0; +} + +int tou_extaddr(struct sockaddr *ext_addr, socklen_t *addrlen) +{ + if (!tou_inited) + return -1; + + return udps->externalAddr(*(struct sockaddr_in *) ext_addr); +} + /* open - which does nothing */ int tou_socket(int /*domain*/, int /*type*/, int /*protocol*/) { - tou_init(); + if (!tou_inited) + { + return -1; + } + for(unsigned int i = 1; i < tou_streams.size(); i++) { if (tou_streams[i] == NULL) { tou_streams[i] = new TcpOnUdp(); tou_streams[i] -> tou_fd = i; - tou_streams[i] -> know_eaddr = false; - tou_streams[i] -> udp = NULL; tou_streams[i] -> tcp = NULL; return i; } @@ -93,8 +124,6 @@ int tou_socket(int /*domain*/, int /*type*/, int /*protocol*/) if (tou == tou_streams[tou_streams.size() -1]) { tou -> tou_fd = tou_streams.size() -1; - tou -> know_eaddr = false; - tou -> udp = NULL; tou -> tcp = NULL; return tou->tou_fd; } @@ -109,10 +138,6 @@ int tou_socket(int /*domain*/, int /*type*/, int /*protocol*/) #endif } - - - - /* bind - opens the udp port */ int tou_bind(int sockfd, const struct sockaddr *my_addr, socklen_t addrlen) @@ -123,45 +148,9 @@ int tou_bind(int sockfd, const struct sockaddr *my_addr, } TcpOnUdp *tous = tou_streams[sockfd]; - if (tous->udp) - { - tous -> lasterrno = EADDRINUSE; - return -1; - } - - if (tous->tcp) - { - tous -> lasterrno = EADDRINUSE; - return -1; - } - - if (addrlen != sizeof(struct sockaddr_in)) - { - tous -> lasterrno = EADDRNOTAVAIL; - return -1; - } - - /* - * tous->udp = new UdpLayer( *((struct sockaddr_in *) my_addr)); - */ - // for testing - drop 5% of packets... */ - //tous->udp = new LossyUdpLayer( *((struct sockaddr_in *) my_addr), 0.05); - tous->udp = new UdpLayer( *((struct sockaddr_in *) my_addr)); - /* check the bind succeeded */ - if (!(tous->udp->okay())) - { - delete (tous->udp); - tous->udp = NULL; - tous -> lasterrno = EADDRINUSE; - return -1; - } - - tous->tcp = new TcpStream(tous->udp); - - tous->tcp->tick(); - tou_tick_all(); - - return 0; + /* this now always returns an error! */ + tous -> lasterrno = EADDRINUSE; + return -1; } /* records peers address, and sends syn pkt @@ -188,12 +177,15 @@ int tou_connect(int sockfd, const struct sockaddr *serv_addr, return -1; } - if (tous->tcp->state == 0) + /* create a TCP stream to connect with. */ + if (!tous->tcp) { - tous->udp->setRemoteAddr(*((struct sockaddr_in *) serv_addr)); + tous->tcp = new TcpStream(udps); + udps->addUdpPeer(tous->tcp, + *((const struct sockaddr_in *) serv_addr)); } - tous->tcp->connect(); + tous->tcp->connect(*(const struct sockaddr_in *) serv_addr); tous->tcp->tick(); tou_tick_all(); if (tous->tcp->isConnected()) @@ -205,6 +197,36 @@ int tou_connect(int sockfd, const struct sockaddr *serv_addr, return -1; } +int tou_listenfor(int sockfd, const struct sockaddr *serv_addr, + socklen_t addrlen) +{ + if (tou_streams[sockfd] == NULL) + { + return -1; + } + TcpOnUdp *tous = tou_streams[sockfd]; + + if (addrlen != sizeof(struct sockaddr_in)) + { + tous -> lasterrno = EINVAL; + return -1; + } + + /* create a TCP stream to connect with. */ + if (!tous->tcp) + { + tous->tcp = new TcpStream(udps); + udps->addUdpPeer(tous->tcp, + *((const struct sockaddr_in *) serv_addr)); + } + + tous->tcp->listenfor(*((struct sockaddr_in *) serv_addr)); + tous->tcp->tick(); + tou_tick_all(); + + return 0; +} + int tou_listen(int sockfd, int backlog) { tou_tick_all(); @@ -233,7 +255,7 @@ int tou_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) if (tous->tcp->isConnected()) { // should get remote address - tous->udp->getRemoteAddr(*((struct sockaddr_in *) addr)); + tous->tcp->getRemoteAddress(*((struct sockaddr_in *) addr)); return sockfd; } @@ -241,154 +263,6 @@ int tou_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen) return -1; } -int tou_listenfor(int sockfd, const struct sockaddr *serv_addr, - socklen_t addrlen) -{ - if (tou_streams[sockfd] == NULL) - { - return -1; - } - TcpOnUdp *tous = tou_streams[sockfd]; - - if (addrlen != sizeof(struct sockaddr_in)) - { - tous -> lasterrno = EINVAL; - return -1; - } - - if (tous->tcp->state == 0) - { - tous->udp->setRemoteAddr(*((struct sockaddr_in *) serv_addr)); - } - - tous->tcp->tick(); - tou_tick_all(); - - return 0; -} - -/* This is a udp socket - after all - * independent operation from all of the - * stream stuff. - */ -int tou_extudp(const struct sockaddr *ext, socklen_t tolen) -{ - /* request a udp to listen on, in a leachy kinda way */ - std::vector::iterator it; - for(it = tou_streams.begin(); it != tou_streams.end(); it++) - { - if ((*it) && ((*it)->udp)) - { - if ((*it)->know_eaddr) - { - *((struct sockaddr_in *) ext) = (*it)->extaddr; - return (*it)->tou_fd; - } - } - } - return -1; -} - -int tou_extaddr(int sockfd, const struct sockaddr *ext, socklen_t tolen) -{ - if (tou_streams[sockfd] == NULL) - { - return -1; - } - TcpOnUdp *tous = tou_streams[sockfd]; - tous -> know_eaddr = true; - tous -> extaddr = *((struct sockaddr_in *) ext); - return 0; -} - - -ssize_t tou_recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *from, socklen_t *fromlen) -{ - if (tou_streams[sockfd] == NULL) - { - std::cerr << "tou_recvfrom() Invalid sockfd:" << sockfd; - std::cerr << std::endl; - return -1; - } - TcpOnUdp *tous = tou_streams[sockfd]; - - if (*fromlen != sizeof(struct sockaddr_in)) - { - std::cerr << "tou_recvfrom() Invalid addr size"; - std::cerr << std::endl; - tous -> lasterrno = EINVAL; - return -1; - } - - /* extra checks */ - if ((!tous->tcp) || (!tous->udp)) - { - std::cerr << "tou_recvfrom() Bad sockfd (!udp) || (!tcp) :" << sockfd; - std::cerr << std::endl; - return -1; - } - - if (tous->tcp->state == 0) - { - //std::cerr << "tou_recvfrom() State = 0 .... Allow??"; - //std::cerr << std::endl; - } - - - if (tous->udp->okay()) - { - int ret = tous->udp->recvRndPktfrom(buf,len,flags, from, fromlen); - if (ret < 0) - { - //std::cerr << "tou_recvfrom() Sock Ok, Try Again later"; - //std::cerr << std::endl; - tous -> lasterrno = EAGAIN; - return -1; - } - std::cerr << "tou_recvfrom() Got a Packet on: " << sockfd; - std::cerr << std::endl; - tous -> lasterrno = 0; - return ret; - } - std::cerr << "tou_recvfrom() Socket Not Okay:" << sockfd; - std::cerr << std::endl; - - tous -> lasterrno = EAGAIN; - return -1; -} - - -ssize_t tou_sendto(int sockfd, const void *buf, size_t len, int flags, const struct - sockaddr *to, socklen_t tolen) -{ - if (tou_streams[sockfd] == NULL) - { - return -1; - } - TcpOnUdp *tous = tou_streams[sockfd]; - - if (tolen != sizeof(struct sockaddr_in)) - { - tous -> lasterrno = EINVAL; - return -1; - } - - if (tous->tcp->state == 0) - { - } - - - if (tous->udp->okay()) - { - tous->udp->sendToProxy(*((struct sockaddr_in *) to), - buf, len); - return len; - } - - tous -> lasterrno = EAGAIN; - return -1; -} - int tou_connected(int sockfd) { @@ -401,7 +275,7 @@ int tou_connected(int sockfd) tous->tcp->tick(); tou_tick_all(); - return (tous->tcp->state == 4); + return (tous->tcp->TcpState() == 4); } @@ -422,7 +296,7 @@ ssize_t tou_read(int sockfd, void *buf, size_t count) int err = tous->tcp->read((char *) buf, count); if (err < 0) { - tous->lasterrno = tous->tcp->errorState; + tous->lasterrno = tous->tcp->TcpErrorState(); return -1; } return err; @@ -440,7 +314,7 @@ ssize_t tou_write(int sockfd, const void *buf, size_t count) int err = tous->tcp->write((char *) buf, count); if (err < 0) { - tous->lasterrno = tous->tcp->errorState; + tous->lasterrno = tous->tcp->TcpErrorState(); tous->tcp->tick(); tou_tick_all(); return -1; @@ -464,7 +338,7 @@ int tou_maxread(int sockfd) int ret = tous->tcp->read_pending(); if (ret < 0) { - tous->lasterrno = tous->tcp->errorState; + tous->lasterrno = tous->tcp->TcpErrorState(); return 0; // error detected next time. } return ret; @@ -483,7 +357,7 @@ int tou_maxwrite(int sockfd) int ret = tous->tcp->write_allowed(); if (ret < 0) { - tous->lasterrno = tous->tcp->errorState; + tous->lasterrno = tous->tcp->TcpErrorState(); return 0; // error detected next time? } return ret; @@ -504,18 +378,19 @@ int tou_close(int sockfd) /* shut it down */ tous->tcp->close(); - tous->udp->close(); delete tous->tcp; - delete tous->udp; delete tous; tou_streams[sockfd] = NULL; return 1; } - /* get an error number */ int tou_errno(int sockfd) { + if (!udps) + { + return ENOTSOCK; + } if (tou_streams[sockfd] == NULL) { return ENOTSOCK; @@ -535,12 +410,10 @@ int tou_clear_error(int sockfd) return 0; } - /* unfortuately the library needs to be ticked. (not running a thread) * you can put it in a thread! */ - /* * Some helper functions for stuff. * diff --git a/libretroshare/src/tcponudp/tou.h b/libretroshare/src/tcponudp/tou.h index 73ee86525..df5d372d6 100644 --- a/libretroshare/src/tcponudp/tou.h +++ b/libretroshare/src/tcponudp/tou.h @@ -51,6 +51,47 @@ extern "C" { #endif + /* The modification to a single UDP socket means + * that the structure of the TOU interface must be changed. + * + * Init: + * (1) choose our local address. (a universal bind) + * int tou_init(const struct sockaddr *my_addr); + * (2) query if we have determined our external address. + * int tou_extaddr(struct sockaddr *ext_addr, socklen_t *addrlen); + * (3) offer more stunpeers, for external address determination. + * int tou_stunpeer(const struct sockaddr *ext_addr, socklen_t addrlen, const char *id); + * (4) repeat (2)+(3) until a valid extaddr is returned. + * + */ + +int tou_init(const struct sockaddr *my_addr, socklen_t addrlen); +int tou_extaddr(struct sockaddr *ext_addr, socklen_t *addrlen); +int tou_stunpeer(const struct sockaddr *ext_addr, socklen_t addrlen, const char *id); + + /* Connections are as similar to UNIX as possible + * (1) create a socket: tou_socket() this reserves a socket id. + * (2) connect: active: tou_connect() or passive: tou_listenfor(). + * (3) use as a normal socket. + * + * tou_bind() is not valid. tou_init performs this role. + * tou_listen() is not valid. (must listen for a specific address) use tou_listenfor() instead. + * tou_accept() can still be used. + */ + + /* creation/connections */ +int tou_socket(int domain, int type, int protocol); +int tou_bind(int sockfd, const struct sockaddr *my_addr, socklen_t addrlen); /* null op now */ +int tou_listen(int sockfd, int backlog); /* null op now */ +int tou_connect(int sockfd, const struct sockaddr *serv_addr, socklen_t addrlen); +int tou_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen); + +/* non-standard bonuses */ +int tou_connected(int sockfd); +int tou_listenfor(int sockfd, const struct sockaddr *serv_addr, socklen_t addrlen); + + + /* UNIX interface: minimum for the SSL BIO interface */ ssize_t tou_read(int sockfd, void *buf, size_t count); ssize_t tou_write(int sockfd, const void *buf, size_t count); @@ -65,28 +106,6 @@ int tou_maxread(int sockfd); int tou_maxwrite(int sockfd); - /* creation/connections */ -int tou_socket(int domain, int type, int protocol); -int tou_bind(int sockfd, const struct sockaddr *my_addr, socklen_t addrlen); -int tou_connect(int sockfd, const struct sockaddr *serv_addr, socklen_t addrlen); -int tou_listen(int sockfd, int backlog); -int tou_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen); - - /* udp interface */ - /* request an external udp port to use - returns sockfd */ -int tou_extudp(const struct sockaddr *ext, socklen_t tolen); -int tou_extaddr(int sockfd, const struct sockaddr *to, socklen_t tolen); - -ssize_t tou_recvfrom(int sockfd, void *buf, size_t len, int flags, struct sockaddr *from, socklen_t *fromlen); -ssize_t tou_sendto(int s, const void *buf, size_t len, int flags, const struct - sockaddr *to, socklen_t tolen); - - -/* non-standard bonuses */ -int tou_connected(int sockfd); -int tou_listenfor(int sockfd, const struct sockaddr *serv_addr, - socklen_t addrlen); - #ifdef __cplusplus } #endif diff --git a/libretroshare/src/tcponudp/udp_server.cc b/libretroshare/src/tcponudp/udp_server.cc index 66d706312..05fb141fa 100644 --- a/libretroshare/src/tcponudp/udp_server.cc +++ b/libretroshare/src/tcponudp/udp_server.cc @@ -115,33 +115,36 @@ int main(int argc, char **argv) std::cerr << "Local Address: " << laddr << std::endl; std::cerr << "Remote Address: " << raddr << std::endl; - //LossyUdpLayer udpl(laddr, 0.01); - UdpLayer udpl(laddr); - if (!udpl.openSocket()) + UdpSorter udps(laddr); + if (!udps.okay()) { - std::cerr << "Cannot Open Local Address: " << laddr << std::endl; + std::cerr << "UdpSorter not Okay (Cannot Open Local Address): " << laddr << std::endl; exit(1); } - udpl.setRemoteAddr(raddr); - TcpStream tcp(&udpl); + TcpStream tcp(&udps); + udps.addUdpPeer(&tcp, raddr); if (toConnect) { - tcp.connect(); + tcp.connect(raddr); + } + else + { + tcp.listenfor(raddr); } while(!tcp.isConnected()) { sleep(1); std::cerr << "Waiting for TCP to Connect!" << std::endl; - udpl.status(std::cerr); + udps.status(std::cerr); tcp.status(std::cerr); tcp.tick(); } std::cerr << "TCP Connected***************************" << std::endl; - udpl.status(std::cerr); + udps.status(std::cerr); tcp.status(std::cerr); std::cerr << "TCP Connected***************************" << std::endl; @@ -161,7 +164,7 @@ int main(int argc, char **argv) while(!done) { //sleep(1); - usleep(10000); + usleep(100000); //usleep(1000); if (blockread != true) { @@ -196,8 +199,8 @@ int main(int argc, char **argv) while(!tcp.widle()) { - sleep(1); - //usleep(10000); + //sleep(1); + usleep(100000); //usleep(1000); tcp.tick(); if (count++ % 10 == 0) @@ -220,7 +223,7 @@ int main(int argc, char **argv) while(1) { //sleep(1); - usleep(10000); + usleep(100000); //usleep(1000); //int writesize = bufsize; int ret; @@ -258,7 +261,9 @@ int main(int argc, char **argv) while((stayOpen) || (!tcp.ridle())) { tcp.tick(); - sleep(1); + //sleep(1); + usleep(100000); + //usleep(1000); if (count++ % 10 == 0) { std::cerr << "Waiting for Idle()" << std::endl; diff --git a/libretroshare/src/tcponudp/udplayer.cc b/libretroshare/src/tcponudp/udplayer.cc index c96ae09c5..0357c8b74 100644 --- a/libretroshare/src/tcponudp/udplayer.cc +++ b/libretroshare/src/tcponudp/udplayer.cc @@ -45,8 +45,8 @@ */ /* - * #define DEBUG_UDP_LAYER 1 - */ + */ #define DEBUG_UDP_LAYER 1 + /**/ static const int UDP_DEF_TTL = 64; @@ -76,7 +76,7 @@ class udpPacket }; -std::ostream &operator<<(std::ostream &out, struct sockaddr_in &addr) +std::ostream &operator<<(std::ostream &out, const struct sockaddr_in &addr) { out << "[" << inet_ntoa(addr.sin_addr) << ":"; out << htons(addr.sin_port) << "]"; @@ -84,7 +84,7 @@ std::ostream &operator<<(std::ostream &out, struct sockaddr_in &addr) } -bool operator==(struct sockaddr_in &addr, struct sockaddr_in &addr2) +bool operator==(const struct sockaddr_in &addr, const struct sockaddr_in &addr2) { if (addr.sin_family != addr2.sin_family) return false; @@ -95,6 +95,18 @@ bool operator==(struct sockaddr_in &addr, struct sockaddr_in &addr2) return true; } + +bool operator<(const struct sockaddr_in &addr, const struct sockaddr_in &addr2) +{ + if (addr.sin_family != addr2.sin_family) + return (addr.sin_family < addr2.sin_family); + if (addr.sin_addr.s_addr != addr2.sin_addr.s_addr) + return (addr.sin_addr.s_addr < addr2.sin_addr.s_addr); + if (addr.sin_port != addr2.sin_port) + return (addr.sin_port < addr2.sin_port); + return false; +} + std::string printPkt(void *d, int size) { std::ostringstream out; @@ -148,8 +160,8 @@ std::string printPktOffset(unsigned int offset, void *d, unsigned int size) -UdpLayer::UdpLayer(struct sockaddr_in &local) - :laddr(local), raddrKnown(false), errorState(0), ttl(UDP_DEF_TTL) +UdpLayer::UdpLayer(UdpReceiver *udpr, struct sockaddr_in &local) + :recv(udpr), laddr(local), errorState(0), ttl(UDP_DEF_TTL) { openSocket(); return; @@ -159,14 +171,6 @@ int UdpLayer::status(std::ostream &out) { out << "UdpLayer::status()" << std::endl; out << "localaddr: " << laddr << std::endl; - if (raddrKnown) - { - out << "remoteaddr: " << raddr << std::endl; - } - else - { - out << "remoteaddr unKnown!" << std::endl; - } out << "sockfd: " << sockfd << std::endl; out << std::endl; return 1; @@ -175,79 +179,96 @@ int UdpLayer::status(std::ostream &out) int UdpLayer::close() { /* close socket if open */ + sockMtx.lock(); /********** LOCK MUTEX *********/ + if (sockfd > 0) { tounet_close(sockfd); } + + sockMtx.unlock(); /******** UNLOCK MUTEX *********/ return 1; } +void UdpLayer::run() +{ + return recv_loop(); +} /* higher level interface */ -int UdpLayer::readPkt(void *data, int *size) +void UdpLayer::recv_loop() { - int nsize = *size; - struct sockaddr_in from; - if (0 >= receiveUdpPacket(data, &nsize, from)) + int maxsize = 16000; + void *inbuf = malloc(maxsize); + + int status; + struct timeval timeout; + + while(1) { + /* select on the socket TODO */ + fd_set rset; + for(;;) { + FD_ZERO(&rset); + FD_SET(sockfd, &rset); + timeout.tv_sec = 0; + timeout.tv_usec = 500000; /* 500 ms timeout */ + status = select(sockfd+1, &rset, NULL, NULL, &timeout); + if (status > 0) + { + break; /* data available, go read it */ + } + else if (status < 0) + { + std::cerr << "Error: " << tounet_errno() << std::endl; + } + }; + + int nsize = maxsize; + struct sockaddr_in from; + if (0 < receiveUdpPacket(inbuf, &nsize, from)) + { #ifdef DEBUG_UDP_LAYER - //std::cerr << "UdpLayer::readPkt() not ready" << from; - //std::cerr << std::endl; + std::cerr << "UdpLayer::readPkt() from : " << from << std::endl; + std::cerr << printPkt(inbuf, nsize); #endif - return -1; + // send to reciever. + recv -> recvPkt(inbuf, nsize, from); + } + else + { +#ifdef DEBUG_UDP_LAYER + std::cerr << "UdpLayer::readPkt() not ready" << from; + std::cerr << std::endl; +#endif + } } - -#ifdef DEBUG_UDP_LAYER - //std::cerr << "UdpLayer::readPkt() from : " << from << std::endl; - //std::cerr << printPkt(data, nsize); -#endif - - if ((raddrKnown) && (from == raddr)) - { -#ifdef DEBUG_UDP_LAYER - std::cerr << "UdpLayer::readPkt() from RemoteAddr: " << from; - std::cerr << std::endl; -#endif - *size = nsize; - return nsize; - } - -#ifdef DEBUG_UDP_LAYER - std::cerr << "UdpLayer::readPkt() from unknown remote addr: " << from; - std::cerr << std::endl; -#endif - std::cerr << "UdpLayer::readPkt() storing Random packet from: " << from; - std::cerr << std::endl; - randomPkts.push_back(new udpPacket(&from,data, nsize)); - return -1; + return; } -int UdpLayer::sendPkt(void *data, int size) -{ - if (raddrKnown) - { -#ifdef DEBUG_UDP_LAYER - std::cerr << "UdpLayer::sendPkt() to: " << raddr << std::endl; - //std::cerr << printPkt(data, size); -#endif - sendUdpPacket(data, size, raddr); - return size; - } - else - { -#ifdef DEBUG_UDP_LAYER - std::cerr << "UdpLayer::sendPacket() unknown remote addr!"; - std::cerr << std::endl; -#endif - return -1; - } - return 1; -} +int UdpLayer::sendPkt(void *data, int size, sockaddr_in &to, int ttl) +{ + /* if ttl is different -> set it */ + if (ttl != getTTL()) + { + setTTL(ttl); + } + + /* and send! */ +#ifdef DEBUG_UDP_LAYER + std::cerr << "UdpLayer::sendPkt() to: " << to << std::endl; + std::cerr << printPkt(data, size); +#endif + sendUdpPacket(data, size, to); + return size; +} /* setup connections */ int UdpLayer::openSocket() { + sockMtx.lock(); /********** LOCK MUTEX *********/ + /* make a socket */ sockfd = tounet_socket(PF_INET, SOCK_DGRAM, 0); #ifdef DEBUG_UDP_LAYER @@ -262,6 +283,8 @@ int UdpLayer::openSocket() #endif errorState = EADDRINUSE; //exit(1); + + sockMtx.unlock(); /******** UNLOCK MUTEX *********/ return -1; } @@ -272,24 +295,32 @@ int UdpLayer::openSocket() #endif } + errorState = 0; + #ifdef DEBUG_UDP_LAYER std::cerr << "Socket Bound to : " << laddr << std::endl; #endif + + sockMtx.unlock(); /******** UNLOCK MUTEX *********/ + #ifdef DEBUG_UDP_LAYER std::cerr << "Setting TTL to " << UDP_DEF_TTL << std::endl; #endif setTTL(UDP_DEF_TTL); - errorState = 0; return 1; } int UdpLayer::setTTL(int t) { + sockMtx.lock(); /********** LOCK MUTEX *********/ + int err = tounet_setsockopt(sockfd, IPPROTO_IP, IP_TTL, &t, sizeof(int)); ttl = t; + sockMtx.unlock(); /******** UNLOCK MUTEX *********/ + #ifdef DEBUG_UDP_LAYER std::cerr << "UdpLayer::setTTL(" << t << ") returned: " << err; std::cerr << std::endl; @@ -300,42 +331,26 @@ int UdpLayer::setTTL(int t) int UdpLayer::getTTL() { - return ttl; + sockMtx.lock(); /********** LOCK MUTEX *********/ + + int t = ttl; + + sockMtx.unlock(); /******** UNLOCK MUTEX *********/ + + return t; } - - -int UdpLayer::sendToProxy(struct sockaddr_in &proxy, const void *data, int size) -{ - sendUdpPacket(data, size, proxy); - return 1; -} - -int UdpLayer::setRemoteAddr(struct sockaddr_in &remote) -{ - raddr = remote; - raddrKnown = true; - return 1; -} - - -int UdpLayer::getRemoteAddr(struct sockaddr_in &remote) -{ - if (raddrKnown) - { - remote = raddr; - return 1; - } - return 0; -} - /* monitoring / updates */ int UdpLayer::okay() { + sockMtx.lock(); /********** LOCK MUTEX *********/ + bool nonFatalError = ((errorState == 0) || (errorState == EAGAIN) || (errorState == EINPROGRESS)); + sockMtx.unlock(); /******** UNLOCK MUTEX *********/ + #ifdef DEBUG_UDP_LAYER if (!nonFatalError) { @@ -354,91 +369,23 @@ int UdpLayer::tick() #endif return 1; } -/******************* Internals *************************************/ - - -ssize_t UdpLayer::recvRndPktfrom(void *buf, size_t len, int flags, - struct sockaddr *from, socklen_t *fromlen) -{ -#ifdef DEBUG_UDP_LAYER - std::cerr << "UdpLayer::recvRndPktfrom()" << std::endl; -#endif - - if (*fromlen != sizeof(struct sockaddr_in)) - { - -#ifdef DEBUG_UDP_LAYER - std::cerr << "UdpLayer::recvRndPktfrom() bad address length" << std::endl; -#endif - return -1; - } - - /* if raddr not known -> then we're not connected - * at a higher level and therefore our queue - * will not be filled (no ticking).... - * so feel free the get data. - */ - - if (randomPkts.size() == 0) - { - if (!raddrKnown) - { -#ifdef DEBUG_UDP_LAYER - std::cerr << "UdpLayer::recvRndPktfrom() Checking Directly" << std::endl; -#endif - int size = len; - int ret = receiveUdpPacket(buf, &size, *((struct sockaddr_in *) from)); - if (ret > 0) - { -#ifdef DEBUG_UDP_LAYER - std::cerr << "UdpLayer::recvRndPktfrom() Got Pkt directly" << std::endl; - std::cerr << "Pkt from:" << inet_ntoa(((struct sockaddr_in *) from)->sin_addr); - std::cerr << ":" << ntohs(((struct sockaddr_in *) from)->sin_port) << std::endl; -#endif - return ret; - } - } - -#ifdef DEBUG_UDP_LAYER - std::cerr << "UdpLayer::recvRndPktfrom() Nothing in the Queue" << std::endl; -#endif - return -1; - } - - udpPacket *pkt = randomPkts.front(); - randomPkts.pop_front(); - - *((struct sockaddr_in *) from) = pkt->raddr; - unsigned int size = pkt->len; - if (len < size) - { - size = len; - } - - memcpy(buf, pkt->data, size); - *((struct sockaddr_in *) from) = pkt->raddr; - -#ifdef DEBUG_UDP_LAYER - std::cerr << "UdpLayer::recvRndPktfrom() returning stored Pkt" << std::endl; - std::cerr << "Pkt from:" << inet_ntoa(pkt->raddr.sin_addr); - std::cerr << ":" << ntohs(pkt->raddr.sin_port) << std::endl; - std::cerr << "Length: " << pkt->len << std::endl; -#endif - - delete pkt; - return size; -} /******************* Internals *************************************/ - int UdpLayer::receiveUdpPacket(void *data, int *size, struct sockaddr_in &from) { struct sockaddr_in fromaddr; socklen_t fromsize = sizeof(fromaddr); int insize = *size; - if (0<(insize=tounet_recvfrom(sockfd,data,insize,0, - (struct sockaddr*)&fromaddr,&fromsize))) + + sockMtx.lock(); /********** LOCK MUTEX *********/ + + insize = tounet_recvfrom(sockfd,data,insize,0, + (struct sockaddr*)&fromaddr,&fromsize); + + sockMtx.unlock(); /******** UNLOCK MUTEX *********/ + + if (0 < insize) { #ifdef DEBUG_UDP_LAYER std::cerr << "receiveUdpPacket() from: " << fromaddr; @@ -461,9 +408,13 @@ int UdpLayer::sendUdpPacket(const void *data, int size, struct sockaddr_in &to) #endif struct sockaddr_in toaddr = to; + sockMtx.lock(); /********** LOCK MUTEX *********/ + tounet_sendto(sockfd, data, size, 0, (struct sockaddr *) &(toaddr), sizeof(toaddr)); + + sockMtx.unlock(); /******** UNLOCK MUTEX *********/ return 1; } diff --git a/libretroshare/src/tcponudp/udplayer.h b/libretroshare/src/tcponudp/udplayer.h index 338ba5342..02333f2b3 100644 --- a/libretroshare/src/tcponudp/udplayer.h +++ b/libretroshare/src/tcponudp/udplayer.h @@ -35,6 +35,9 @@ #include */ + +#include "util/rsthreads.h" + /* universal networking functions */ #include "tou_net.h" @@ -42,43 +45,44 @@ #include #include -std::ostream &operator<<(std::ostream &out, struct sockaddr_in &addr); -bool operator==(struct sockaddr_in &addr, struct sockaddr_in &addr2); +std::ostream &operator<<(std::ostream &out, const struct sockaddr_in &addr); +bool operator==(const struct sockaddr_in &addr, const struct sockaddr_in &addr2); +bool operator<(const struct sockaddr_in &addr, const struct sockaddr_in &addr2); + std::string printPkt(void *d, int size); std::string printPktOffset(unsigned int offset, void *d, unsigned int size); -/* So the UdpLayer ..... has a couple of roles - * - * Firstly Send Proxy Packet() (for address determination). - * all the rest of this functionality is handled elsewhere. - * - * Secondly support TcpStreamer.... +/* UdpLayer ..... is the bottom layer which + * just sends and receives Udp packets. */ -class udpPacket; +class UdpReceiver +{ + public: +virtual void recvPkt(void *data, int size, struct sockaddr_in &from) = 0; +}; -class UdpLayer +class UdpLayer: public RsThread { public: - UdpLayer(struct sockaddr_in &local); + UdpLayer(UdpReceiver *recv, struct sockaddr_in &local); virtual ~UdpLayer() { return; } int status(std::ostream &out); /* setup connections */ int openSocket(); - int setTTL(int t); - int getTTL(); - int sendToProxy(struct sockaddr_in &proxy, const void *data, int size); - int setRemoteAddr(struct sockaddr_in &remote); - int getRemoteAddr(struct sockaddr_in &remote); + /* RsThread functions */ +virtual void run(); /* called once the thread is started */ + +void recv_loop(); /* uses callback to UdpReceiver */ /* Higher Level Interface */ - int readPkt(void *data, int *size); - int sendPkt(void *data, int size); + //int readPkt(void *data, int *size, struct sockaddr_in &from); + int sendPkt(void *data, int size, struct sockaddr_in &to, int ttl); /* monitoring / updates */ int okay(); @@ -86,12 +90,6 @@ int status(std::ostream &out); int close(); - /* unix like interface for recving packets not part - * of the tcp stream - */ -ssize_t recvRndPktfrom(void *buf, size_t len, int flags, - struct sockaddr *from, socklen_t *fromlen); - /* data */ /* internals */ protected: @@ -99,25 +97,21 @@ ssize_t recvRndPktfrom(void *buf, size_t len, int flags, virtual int receiveUdpPacket(void *data, int *size, struct sockaddr_in &from); virtual int sendUdpPacket(const void *data, int size, struct sockaddr_in &to); + int setTTL(int t); + int getTTL(); + /* low level */ - /* - * int rwSocket(); - */ private: + UdpReceiver *recv; - struct sockaddr_in paddr; /* proxy addr */ - struct sockaddr_in raddr; /* remote addr */ struct sockaddr_in laddr; /* local addr */ - bool raddrKnown; int errorState; int sockfd; - int ttl; - std::deque randomPkts; - + RsMutex sockMtx; }; #include @@ -126,8 +120,8 @@ class LossyUdpLayer: public UdpLayer { public: - LossyUdpLayer(struct sockaddr_in &local, double frac) - :UdpLayer(local), lossFraction(frac) + LossyUdpLayer(UdpReceiver *udpr, struct sockaddr_in &local, double frac) + :UdpLayer(udpr, local), lossFraction(frac) { return; } diff --git a/libretroshare/src/tcponudp/udpsock_test.cc b/libretroshare/src/tcponudp/udpsock_test.cc new file mode 100644 index 000000000..fe87e4cf9 --- /dev/null +++ b/libretroshare/src/tcponudp/udpsock_test.cc @@ -0,0 +1,91 @@ +/* + * libretroshare/src/tcponudp: udpsock_test.cc + * + * TCP-on-UDP (tou) network interface for RetroShare. + * + * Copyright 2007-2008 by Robert Fernie. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License Version 2 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + * USA. + * + * Please report all bugs and problems to "retroshare@lunamutt.com". + * + */ + +#include "udptestfn.h" +#include "udplayer.h" + +#define MAX_PEERS 16 + +int main(int argc, char **argv) +{ + /* get local and remote addresses */ + struct sockaddr_in local; + struct sockaddr_in peers[MAX_PEERS]; + int numpeers = 0; + int i,j; + + local.sin_family = AF_INET; + inet_aton("127.0.0.1", &(local.sin_addr)); + local.sin_port = htons(8767); + + for(i = 0; i < MAX_PEERS; i++) + { + peers[i].sin_family = AF_INET; + inet_aton("127.0.0.1", &(peers[i].sin_addr)); + peers[i].sin_port = htons(8768); + } + + if (argc < 3) + { + std::cerr << "Usage: " << argv[0] << " [ [ +#include +#include + +static const int STUN_TTL = 64; + +/* + * #define DEBUG_UDP_SORTER 1 + */ + +#define DEBUG_UDP_SORTER 1 + +UdpSorter::UdpSorter(struct sockaddr_in &local) + : udpLayer(NULL), laddr(local) +{ + openSocket(); + return; +} + + +/* higher level interface */ +void UdpSorter::recvPkt(void *data, int size, struct sockaddr_in &from) +{ + /* print packet information */ +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::recvPkt(" << size << ") from: " << from; + std::cerr << std::endl; +#endif + + sortMtx.lock(); /********** LOCK MUTEX *********/ + + /* look for a peer */ + std::map::iterator it; + it = streams.find(from); + + /* check for STUN packet */ + if (isStunPacket(data, size)) + { + std::cerr << "UdpSorter::recvPkt() is Stun Packet"; + std::cerr << std::endl; + + /* respond */ + handleStunPkt(data, size, from); + } + else if (it == streams.end()) + { + /* peer unknown */ + std::cerr << "UdpSorter::recvPkt() Peer Unknown!"; + std::cerr << std::endl; + } + else + { + /* forward to them */ + std::cerr << "UdpSorter::recvPkt() Sending to UdpPeer: "; + std::cerr << it->first; + std::cerr << std::endl; + (it->second)->recvPkt(data, size); + } + + sortMtx.unlock(); /******** UNLOCK MUTEX *********/ + /* done */ +} + + +int UdpSorter::sendPkt(void *data, int size, struct sockaddr_in &to, int ttl) +{ + /* print packet information */ +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::sendPkt(" << size << ") ttl: " << ttl; + std::cerr << " to: " << to; + std::cerr << std::endl; +#endif + + /* send to udpLayer */ + return udpLayer->sendPkt(data, size, to, ttl); +} + +int UdpSorter::status(std::ostream &out) +{ + sortMtx.lock(); /********** LOCK MUTEX *********/ + + out << "UdpSorter::status()" << std::endl; + out << "localaddr: " << laddr << std::endl; + out << "UdpSorter::peers:" << std::endl; + std::map::iterator it; + for(it = streams.begin(); it != streams.end(); it++) + { + out << "\t" << it->first << std::endl; + } + out << std::endl; + + sortMtx.unlock(); /******** UNLOCK MUTEX *********/ + + udpLayer->status(out); + + return 1; +} + +/* setup connections */ +int UdpSorter::openSocket() +{ + udpLayer = new UdpLayer(this, laddr); + udpLayer->start(); + + return 1; +} + +/* monitoring / updates */ +int UdpSorter::okay() +{ + return udpLayer->okay(); +} + +int UdpSorter::tick() +{ +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::tick()" << std::endl; +#endif + return 1; +} + + +int UdpSorter::close() +{ + /* TODO */ + return 1; +} + + + /* add a TCPonUDP stream */ +int UdpSorter::addUdpPeer(UdpPeer *peer, const struct sockaddr_in &raddr) +{ + sortMtx.lock(); /********** LOCK MUTEX *********/ + + + /* check for duplicate */ + std::map::iterator it; + it = streams.find(raddr); + bool ok = (it == streams.end()); + if (!ok) + { +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::addUdpPeer() Peer already exists!" << std::endl; + std::cerr << "UdpSorter::addUdpPeer() ERROR" << std::endl; +#endif + } + else + { + streams[raddr] = peer; + } + + sortMtx.unlock(); /******** UNLOCK MUTEX *********/ + return ok; +} + + +/******************************* STUN Handling ********************************/ + + /* respond */ +bool UdpSorter::handleStunPkt(void *data, int size, struct sockaddr_in &from) +{ + if (size == 20) /* request */ + { +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::handleStunPkt() got Request"; + std::cerr << std::endl; +#endif + + /* generate a response */ + int len; + void *pkt = generate_stun_reply(&from, &len); + if (!pkt) + return false; + + int sentlen = sendPkt(pkt, len, from, STUN_TTL); + free(pkt); + +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::handleStunPkt() sent Response size:" << sentlen; + std::cerr << std::endl; +#endif + + return (len == sentlen); + } + else if (size == 28) + { +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::handleStunPkt() got Response"; + std::cerr << std::endl; +#endif + /* got response */ + struct sockaddr_in eAddr; + bool good = response(data, size, eAddr); + if (good) + { +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::handleStunPkt() got Ext Addr: "; + std::cerr << inet_ntoa(eAddr.sin_addr) << ":" << ntohs(eAddr.sin_port); + std::cerr << std::endl; +#endif + eaddrKnown = true; + eaddr = eAddr; + return true; + } + } + +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::handleStunPkt() Bad Packet"; + std::cerr << std::endl; +#endif + return false; +} + + +bool UdpSorter::addStunPeer(const struct sockaddr_in &remote, const char *peerid) +{ + /* add to the list */ +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::addStunPeer()"; + std::cerr << std::endl; + + std::cerr << "UdpSorter::addStunPeer() - just stun it!"; + std::cerr << std::endl; +#endif + + doStun(remote); + return false; +} + +bool UdpSorter::externalAddr(struct sockaddr_in &external) +{ + if (eaddrKnown) + { + external = eaddr; + return true; + } + return false; +} + + +int UdpSorter::doStun(struct sockaddr_in stun_addr) +{ +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::doStun()"; + std::cerr << std::endl; +#endif + + /* send out a stun packet -> save in the local variable */ + if (!okay()) + { +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::doStun() Not Active"; + std::cerr << std::endl; +#endif + } + +#define MAX_STUN_SIZE 64 + char stundata[MAX_STUN_SIZE]; + int tmplen = MAX_STUN_SIZE; + bool done = generate_stun_pkt(stundata, &tmplen); + if (!done) + { +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::doStun() Failed"; + std::cerr << std::endl; +#endif + //pqioutput(PQL_ALERT, pqistunzone, "pqistunner::stun() Failed!"); + return 0; + } + + /* send it off */ + int sentlen = sendPkt(stundata, tmplen, stun_addr, STUN_TTL); + +#ifdef DEBUG_UDP_SORTER + std::ostringstream out; + out << "UdpSorter::doStun() Sent Stun Packet(" << sentlen << ") from:"; + out << inet_ntoa(laddr.sin_addr) << ":" << ntohs(laddr.sin_port); + out << " to:"; + out << inet_ntoa(stun_addr.sin_addr) << ":" << ntohs(stun_addr.sin_port); + + std::cerr << out.str() << std::endl; + + //pqioutput(PQL_ALERT, pqistunzone, out.str()); +#endif + + return 1; +} + +bool UdpSorter::response(void *stun_pkt, int size, struct sockaddr_in &addr) +{ + /* check what type it is */ + if (size < 28) + { + return false; + } + + if (((uint16_t *) stun_pkt)[0] != 0x0101) + { + /* not a response */ + return false; + } + + /* iterate through the packet */ + /* for now assume the address follows the header directly */ + /* all stay in netbyteorder! */ + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = ((uint32_t *) stun_pkt)[6]; + addr.sin_port = ((uint16_t *) stun_pkt)[11]; + + +#ifdef DEBUG_UDP_SORTER + std::ostringstream out; + out << "UdpSorter::response() Recvd a Stun Response, ext_addr: "; + out << inet_ntoa(addr.sin_addr) << ":" << ntohs(addr.sin_port); + std::cerr << out.str() << std::endl; +#endif + + return true; + +} + +bool UdpSorter::generate_stun_pkt(void *stun_pkt, int *len) +{ + if (*len < 20) + { + return false; + } + + /* just the header */ + ((uint16_t *) stun_pkt)[0] = 0x0001; + ((uint16_t *) stun_pkt)[1] = 20; /* only header */ + /* transaction id - should be random */ + ((uint32_t *) stun_pkt)[1] = 0x0020; + ((uint32_t *) stun_pkt)[2] = 0x0121; + ((uint32_t *) stun_pkt)[3] = 0x0111; + ((uint32_t *) stun_pkt)[4] = 0x1010; + *len = 20; + return true; +} + + +void *UdpSorter::generate_stun_reply(struct sockaddr_in *stun_addr, int *len) +{ + /* just the header */ + void *stun_pkt = malloc(28); + ((uint16_t *) stun_pkt)[0] = 0x0101; + ((uint16_t *) stun_pkt)[1] = 28; /* only header + 8 byte addr */ + /* transaction id - should be random */ + ((uint32_t *) stun_pkt)[1] = 0x0020; + ((uint32_t *) stun_pkt)[2] = 0x0121; + ((uint32_t *) stun_pkt)[3] = 0x0111; + ((uint32_t *) stun_pkt)[4] = 0x1010; + /* now add address + * 0 1 2 3 + * + * + */ + + ((uint32_t *) stun_pkt)[6] = stun_addr->sin_addr.s_addr; + ((uint16_t *) stun_pkt)[11] = stun_addr->sin_port; + + *len = 28; + return stun_pkt; +} + +bool UdpSorter::isStunPacket(void *data, int size) +{ +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::isStunPacket() ?"; + std::cerr << std::endl; +#endif + + if (size < 20) + { +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::isStunPacket() (size < 20) -> false"; + std::cerr << std::endl; +#endif + return false; + } + + /* match size field */ + uint16_t pktsize = ((uint16_t *) data)[1]; + if (size != pktsize) + { +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::isStunPacket() (size != pktsize) -> false"; + std::cerr << std::endl; +#endif + return false; + } + + if ((size == 20) && (0x0001 == ((uint16_t *) data)[0])) + { +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::isStunPacket() (size=20 & data[0]=0x0001) -> true"; + std::cerr << std::endl; +#endif + /* request */ + return true; + } + + if ((size == 28) && (0x0101 == ((uint16_t *) data)[0])) + { +#ifdef DEBUG_UDP_SORTER + std::cerr << "UdpSorter::isStunPacket() (size=28 & data[0]=0x0101) -> true"; + std::cerr << std::endl; +#endif + /* response */ + return true; + } + return false; +} + diff --git a/libretroshare/src/tcponudp/udpsorter.h b/libretroshare/src/tcponudp/udpsorter.h new file mode 100644 index 000000000..d2c5f680c --- /dev/null +++ b/libretroshare/src/tcponudp/udpsorter.h @@ -0,0 +1,102 @@ +/* + * libretroshare/src/tcponudp: udpsorter.h + * + * TCP-on-UDP (tou) network interface for RetroShare. + * + * Copyright 2007-2008 by Robert Fernie. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License Version 2 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + * USA. + * + * Please report all bugs and problems to "retroshare@lunamutt.com". + * + */ + + +#ifndef TOU_UDP_SORTER_H +#define TOU_UDP_SORTER_H + +/* universal networking functions */ +#include "tou_net.h" + +#include +#include + +#include "udplayer.h" +/* UdpSorter ..... filters the UDP packets. + */ + +class UdpPeer +{ + public: +virtual void recvPkt(void *data, int size) = 0; +}; + +class UdpSorter: public UdpReceiver +{ + public: + + UdpSorter(struct sockaddr_in &local); +virtual ~UdpSorter() { return; } + + /* add a TCPonUDP stream */ +int addUdpPeer(UdpPeer *peer, const struct sockaddr_in &raddr); +bool addStunPeer(const struct sockaddr_in &remote, const char *peerid); +bool externalAddr(struct sockaddr_in &remote); + + /* Packet IO */ + /* pass-through send packets */ + int sendPkt(void *data, int size, struct sockaddr_in &to, int ttl); + /* callback for recved data (overloaded from UdpReceiver) */ +virtual void recvPkt(void *data, int size, struct sockaddr_in &from); + +int status(std::ostream &out); + + /* setup connections */ + int openSocket(); + + /* monitoring / updates */ + int okay(); + int tick(); + + int close(); + + private: + + /* STUN handling */ +bool isStunPacket(void *data, int size); +bool handleStunPkt(void *data, int size, struct sockaddr_in &from); + +int doStun(struct sockaddr_in stun_addr); +bool response(void *stun_pkt, int size, struct sockaddr_in &addr); + +void *generate_stun_reply(struct sockaddr_in *stun_addr, int *len); +bool generate_stun_pkt(void *stun_pkt, int *len); + + + + UdpLayer *udpLayer; + + RsMutex sortMtx; /* for all class data (below) */ + + struct sockaddr_in laddr; /* local addr */ + struct sockaddr_in eaddr; /* external addr */ + bool eaddrKnown; + + std::list stunPeers; /* potentials */ + + std::map streams; +}; + +#endif diff --git a/libretroshare/src/tcponudp/udptestfn.cc b/libretroshare/src/tcponudp/udptestfn.cc new file mode 100644 index 000000000..00608216d --- /dev/null +++ b/libretroshare/src/tcponudp/udptestfn.cc @@ -0,0 +1,48 @@ +/* + * libretroshare/src/tcponudp: udptestfn.cc + * + * TCP-on-UDP (tou) network interface for RetroShare. + * + * Copyright 2007-2008 by Robert Fernie. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License Version 2 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + * USA. + * + * Please report all bugs and problems to "retroshare@lunamutt.com". + * + */ + +#include "udptestfn.h" + +void UdpRecvTest::recvPkt(void *data, int size, struct sockaddr_in &from) +{ + /* print packet information */ + std::cerr << "UdpRecvTest::recvPkt(" << size << ") from: " << from; + std::cerr << std::endl; +} + +UdpPeerTest::UdpPeerTest(struct sockaddr_in &addr) + :raddr(addr) +{ + return; +} + +void UdpPeerTest::recvPkt(void *data, int size) +{ + /* print packet information */ + std::cerr << "UdpPeerTest::recvPkt(" << size << ") from: " << raddr; + std::cerr << std::endl; +} + + diff --git a/libretroshare/src/tcponudp/udptestfn.h b/libretroshare/src/tcponudp/udptestfn.h new file mode 100644 index 000000000..feb997dde --- /dev/null +++ b/libretroshare/src/tcponudp/udptestfn.h @@ -0,0 +1,49 @@ +/* + * libretroshare/src/tcponudp: udptestfn.h + * + * TCP-on-UDP (tou) network interface for RetroShare. + * + * Copyright 2007-2008 by Robert Fernie. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License Version 2 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + * USA. + * + * Please report all bugs and problems to "retroshare@lunamutt.com". + * + */ + +#include "udplayer.h" +#include "udpsorter.h" + +#ifndef TOU_UDP_TEST_FN_H +#define TOU_UDP_TEST_FN_H + +class UdpRecvTest: public UdpReceiver +{ + public: +virtual void recvPkt(void *data, int size, struct sockaddr_in &from); +}; + + +class UdpPeerTest: public UdpPeer +{ + public: + UdpPeerTest(struct sockaddr_in &addr); +virtual void recvPkt(void *data, int size); + + struct sockaddr_in raddr; +}; + + +#endif