diff --git a/libretroshare/src/tcponudp/tcpstream.cc b/libretroshare/src/tcponudp/tcpstream.cc index a46919429..a5aef2f9a 100644 --- a/libretroshare/src/tcponudp/tcpstream.cc +++ b/libretroshare/src/tcponudp/tcpstream.cc @@ -29,6 +29,7 @@ #include "tcpstream.h" #include +#include #include #include #include @@ -46,9 +47,14 @@ const int rstcpstreamzone = 28455; /* - * #define DEBUG_TCP_STREAM 1 + * #define DEBUG_TCP_STREAM 1 + * #define DEBUG_TCP_STREAM_RETRANS 1 + * #define DEBUG_TCP_STREAM_CLOSE 1 */ +//#define DEBUG_TCP_STREAM_RETRANS 1 +//#define DEBUG_TCP_STREAM_CLOSE 1 + /* *#define DEBUG_TCP_STREAM_EXTRA 1 */ @@ -63,13 +69,15 @@ int setupBinaryCheck(std::string fname); #endif static const uint32 kMaxQueueSize = 300; // Was 100, which means max packet size of 100k (smaller than max packet size). -static const uint32 kMaxPktRetransmit = 20; +static const uint32 kMaxPktRetransmit = 10; static const uint32 kMaxSynPktRetransmit = 100; // 100 => 200secs = over 3 minutes startup static const int TCP_STD_TTL = 64; static const int TCP_DEFAULT_FIREWALL_TTL = 4; static const double RTT_ALPHA = 0.875; +int dumpPacket(std::ostream &out, unsigned char *pkt, uint32_t size); + // platform independent fractional timestamp. static double getCurrentTS(); @@ -82,7 +90,9 @@ TcpStream::TcpStream(UdpSubReceiver *lyr) inAckno(0), inWinSize(0), maxWinSize(TCP_MAX_WIN), keepAliveTimeout(TCP_ALIVE_TIMEOUT), + retransTimerOn(false), retransTimeout(TCP_RETRANS_TIMEOUT), + retransTimerTs(0), lastWriteTF(0),lastReadTF(0), wcount(0), rcount(0), errorState(0), @@ -300,10 +310,21 @@ bool TcpStream::isConnected() return isConn; } -int TcpStream::status(std::ostream &out) +int TcpStream::status(std::ostream &out) { tcpMtx.lock(); /********** LOCK MUTEX *********/ + int s = status_locked(out); + + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + + return s; +} + + +int TcpStream::status_locked(std::ostream &out) +{ + int tmpstate = state; // can leave the timestamp here as time()... rough but okay. @@ -328,8 +349,6 @@ int TcpStream::status(std::ostream &out) out << std::endl; out << std::endl; - tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ - return tmpstate; } @@ -993,6 +1012,13 @@ int TcpStream::recv_check() * for max efficiency */ +#ifdef DEBUG_TCP_STREAM_CLOSE + std::cerr << "TcpStream::recv_check() state = CLOSED (NoPktTimeout)"; + std::cerr << std::endl; + dumpstate_locked(std::cerr); +#endif + + rslog(RSL_WARNING, rstcpstreamzone, "TcpStream::state => TCP_CLOSED (kNoPktTimeout)"); outStreamActive = false; @@ -1165,6 +1191,13 @@ int TcpStream::handleIncoming(TcpPacket *pkt) * timeout of this state. * */ + +#ifdef DEBUG_TCP_STREAM_CLOSE + std::cerr << "TcpStream::handleIncoming() state = CLOSED (TimedWait)"; + std::cerr << std::endl; + dumpstate_locked(std::cerr); +#endif + state = TCP_CLOSED; // return incoming_TimedWait(pkt); { @@ -1746,12 +1779,14 @@ int TcpStream::check_InPkts() } else if (state == TCP_LAST_ACK) { - state = TCP_CLOSED; -#ifdef DEBUG_TCP_STREAM - std::cerr << "TcpStream::state = TCP_CLOSED"; +#ifdef DEBUG_TCP_STREAM_CLOSE + std::cerr << "TcpStream::state = TCP_CLOSED (LastAck)"; std::cerr << std::endl; + dumpstate_locked(std::cerr); #endif + state = TCP_CLOSED; + rslog(RSL_WARNING, rstcpstreamzone, "TcpStream::state => TCP_CLOSED (LAST_ACK, recvd ACK)"); cleanup(); @@ -1896,6 +1931,9 @@ int TcpStream::toSend(TcpPacket *pkt, bool retrans) /* restart timers */ pkt -> ts = cts; pkt -> retrans = 0; + + startRetransmitTimer(); + #ifdef DEBUG_TCP_STREAM std::cerr << "TcpStream::toSend() Adding to outPkt --> Seqno: "; std::cerr << pkt->seqno << " size: " << pkt->datasize; @@ -1912,12 +1950,90 @@ int TcpStream::toSend(TcpPacket *pkt, bool retrans) } +/* single retransmit timer. + * + */ +void TcpStream::startRetransmitTimer() +{ + if (retransTimerOn) + { + return; + } + + retransTimerTs = getCurrentTS(); + retransTimerOn = true; + +#ifdef DEBUG_TCP_STREAM + std::cerr << "TcpStream::startRetransmitTimer() peer: " << peeraddr; + std::cerr << " retransTimeout: " << retransTimeout; + std::cerr << " retransTimerTs: " << std::setprecision(12) < TCP_RETRANS_MAX_TIMEOUT) + { + retransTimeout = TCP_RETRANS_MAX_TIMEOUT; + } + +#ifdef DEBUG_TCP_STREAM_RETRANS + std::cerr << "TcpStream::incRetransmitTimer() peer: " << peeraddr; + std::cerr << " retransTimeout: " << std::setprecision(12) << retransTimeout; + std::cerr << std::endl; +#endif + +} + + + + int TcpStream::retrans() { int outPktSize = MAX_SEG + TCP_PSEUDO_HDR_SIZE; char tmpOutPkt[outPktSize]; - bool updateCongestion = true; if (!peerKnown) { @@ -1928,185 +2044,165 @@ int TcpStream::retrans() #endif return 0; } - - /* now retrans */ - double cts = getCurrentTS(); - std::list::iterator it; - for(it = outPkt.begin(); (it != outPkt.end()); it++) + + if (!retransTimerOn) { - outPktSize = MAX_SEG + TCP_PSEUDO_HDR_SIZE; - TcpPacket *pkt = (*it); - if (cts - pkt->ts > retransTimeout) - { - - /* retransmission -> adjust the congestWinSize and congestThreshold - * but only once per cycle - */ - if (updateCongestion) - { - congestThreshold = congestWinSize / 2; - congestWinSize = MAX_SEG; - congestUpdate = outAcked + congestWinSize; // point when we can up the winSize. - updateCongestion = false; - -#ifdef DEBUG_TCP_STREAM - std::cerr << "TcpStream::retrans() Adjusting Congestion Parameters: "; - std::cerr << std::endl; - std::cerr << "\tcongestWinSize: " << congestWinSize; - std::cerr << " congestThreshold: " << congestThreshold; - std::cerr << " congestUpdate: " << congestUpdate; - std::cerr << std::endl; -#endif - - } - - /* before we can retranmit, - * we need to check that its within the congestWinSize - * -> actually only checking that the start (seqno) is within window! - */ - - - if (isOldSequence(outAcked + congestWinSize, pkt->seqno)) - { - /* cannot send .... */ -#ifdef DEBUG_TCP_STREAM - std::cerr << "TcpStream::retrans() Retranmission Delayed by CongestionWindow"; - std::cerr << std::endl; - - std::cerr << "\toutAcked: " << outAcked; - std::cerr << " CongestWinSize:" << congestWinSize; - std::cerr << std::endl; - - std::cerr << "\tAttempted Packet: Seqno: "; - std::cerr << pkt->seqno << " size: " << pkt->datasize; - std::cerr << " retrans: " << (int) pkt->retrans; - std::cerr << " timeout: " << retransTimeout; - std::cerr << std::endl; -#endif - /* as packets in order, can drop out of the fn now */ - return 0; - } - - -#ifdef DEBUG_TCP_STREAM - std::cerr << "TcpStream::retrans() Seqno: "; - std::cerr << pkt->seqno << " size: " << pkt->datasize; - std::cerr << " retrans: " << (int) pkt->retrans; - std::cerr << " timeout: " << retransTimeout; - std::cerr << std::endl; -#endif - - /* update ackno and winsize */ - if (!(pkt->hasSyn())) - { - pkt->setAck(inAckno); - lastSentAck = pkt -> ackno; - } - - pkt->winsize = inWinSize; - lastSentWinSize = pkt -> winsize; - - keepAliveTimer = cts; - - (*it) -> writePacket(tmpOutPkt, outPktSize); - -#ifdef DEBUG_TCP_STREAM - std::cerr << "TcpStream::retrans() ReSending Pkt" << std::endl; - std::cerr << "TcpStream::retrans Seqno: "; - std::cerr << (*it)->seqno << " size: " << (*it)->datasize; - std::cerr << " Ackno: "; - std::cerr << (*it)->ackno << " winsize: " << (*it)->winsize; - std::cerr << std::endl; - //std::cerr << printPkt(tmpOutPkt, outPktSize) << std::endl; -#endif - /* if its a syn packet ** thats been - * transmitting for a while, maybe - * we should increase the ttl. - */ - - if ((pkt->hasSyn()) && (getTTL() < TCP_STD_TTL)) - { - /* calculate a new TTL */ - if (mTTL_end > cts) - { - setTTL(TCP_DEFAULT_FIREWALL_TTL); - } - else - { - setTTL(getTTL() + 1); - } - - std::string out; - rs_sprintf(out, "TcpStream::retrans() Startup SYNs retrans count: %u New TTL: %d", pkt->retrans, getTTL()); - - rslog(RSL_WARNING, rstcpstreamzone, out); - -#ifdef DEBUG_TCP_STREAM - std::cerr << out.str() << std::endl; -#endif - - - } - - /* catch excessive retransmits - * - Allow Syn case more.... - * - if not SYN or TTL has reached STD then timeout quickly. - - * OLD 2nd Logic (below) has been replaced with lower logic. - * (((!pkt->hasSyn()) || (TCP_STD_TTL == getTTL())) - * && (pkt->retrans > kMaxPktRetransmit))) - * Problem was that the retransmit of Syn packet had STD_TTL, and was triggering Close (and SeqNo change). - * It seemed to work anyway.... But might cause coonnection failures. Will reduce the MaxSyn Retransmit - * so something more reasonable as well. - * ((!pkt->hasSyn()) && (pkt->retrans > kMaxPktRetransmit))) - */ - - if ((pkt->hasSyn() && (pkt->retrans > kMaxSynPktRetransmit)) || - ((!pkt->hasSyn()) && (pkt->retrans > kMaxPktRetransmit))) - - { - /* too many attempts close stream */ -#ifdef DEBUG_TCP_STREAM - std::cerr << "TcpStream::retrans() Too many Retransmission Attempts ("; - std::cerr << (int) pkt->retrans << ") for Pkt" << std::endl; - std::cerr << "TcpStream::retrans() Closing Socket Connection"; - std::cerr << std::endl; -#endif - - - rslog(RSL_WARNING,rstcpstreamzone,"TcpStream::state => TCP_CLOSED (Too Many Retransmits)"); - - outStreamActive = false; - inStreamActive = false; - state = TCP_CLOSED; - cleanup(); - return 0; - } - - - udp -> sendPkt(tmpOutPkt, outPktSize, peeraddr, ttl); - - /* restart timers */ - (*it) -> ts = cts; - (*it) -> retrans++; - - /* finally - double the retransTimeout ... (Karn's Algorithm) - * this ensures we don't retransmit all the packets that - * following a dropped packet! - * - * but if we have lots of dropped this ain't going to help much! - * - * not doubling retransTimeout, that is can go manic and result - * in excessive timeouts, and no data flow. - */ - retransTimeout = 2.0 * (rtt_est + 4.0 * rtt_dev); -#ifdef DEBUG_TCP_STREAM - std::cerr << "TcpStream::retrans() Doubling std retranTimeout to:"; - std::cerr << retransTimeout; - std::cerr << std::endl; -#endif - } + return 0; } + + double cts = getCurrentTS(); + if (cts - retransTimerTs < retransTimeout) + { + return 0; + } + + if (outPkt.begin() == outPkt.end()) + { + resetRetransmitTimer(); + return 0; + } + + TcpPacket *pkt = outPkt.front(); + + if (!pkt) + { + /* error */ + return 0; + } + + /* retransmission -> adjust the congestWinSize and congestThreshold + */ + + congestThreshold = congestWinSize / 2; + congestWinSize = MAX_SEG; + congestUpdate = outAcked + congestWinSize; // point when we can up the winSize. + +#ifdef DEBUG_TCP_STREAM + std::cerr << "TcpStream::retrans() Adjusting Congestion Parameters: "; + std::cerr << std::endl; + std::cerr << "\tcongestWinSize: " << congestWinSize; + std::cerr << " congestThreshold: " << congestThreshold; + std::cerr << " congestUpdate: " << congestUpdate; + std::cerr << std::endl; +#endif + + /* update ackno and winsize */ + if (!(pkt->hasSyn())) + { + pkt->setAck(inAckno); + lastSentAck = pkt -> ackno; + } + + pkt->winsize = inWinSize; + lastSentWinSize = pkt -> winsize; + + keepAliveTimer = cts; + + pkt->writePacket(tmpOutPkt, outPktSize); + +#ifdef DEBUG_TCP_STREAM_RETRANS + std::cerr << "TcpStream::retrans()"; + std::cerr << " peer: " << peeraddr; + std::cerr << " hasSyn: " << pkt->hasSyn(); + std::cerr << " Seqno: "; + std::cerr << pkt->seqno << " size: " << pkt->datasize; + std::cerr << " Ackno: "; + std::cerr << pkt->ackno << " winsize: " << pkt->winsize; + std::cerr << " retrans: " << (int) pkt->retrans; + std::cerr << " timeout: " << std::setprecision(12) << retransTimeout; + std::cerr << std::endl; + + //std::cerr << printPkt(tmpOutPkt, outPktSize) << std::endl; +#endif + + /* if its a syn packet ** thats been + * transmitting for a while, maybe + * we should increase the ttl. + */ + + if ((pkt->hasSyn()) && (getTTL() < TCP_STD_TTL)) + { + /* calculate a new TTL */ + if (mTTL_end > cts) + { + setTTL(TCP_DEFAULT_FIREWALL_TTL); + } + else + { + setTTL(getTTL() + 1); + } + + std::string out; + rs_sprintf(out, "TcpStream::retrans() Startup SYNs retrans count: %u New TTL: %d", pkt->retrans, getTTL()); + + rslog(RSL_WARNING, rstcpstreamzone, out); + +#ifdef DEBUG_TCP_STREAM + std::cerr << out.str() << std::endl; +#endif + } + + /* catch excessive retransmits + * - Allow Syn case more.... + * - if not SYN or TTL has reached STD then timeout quickly. + + * OLD 2nd Logic (below) has been replaced with lower logic. + * (((!pkt->hasSyn()) || (TCP_STD_TTL == getTTL())) + * && (pkt->retrans > kMaxPktRetransmit))) + * Problem was that the retransmit of Syn packet had STD_TTL, and was triggering Close (and SeqNo change). + * It seemed to work anyway.... But might cause coonnection failures. Will reduce the MaxSyn Retransmit + * so something more reasonable as well. + * ((!pkt->hasSyn()) && (pkt->retrans > kMaxPktRetransmit))) + */ + + if ((pkt->hasSyn() && (pkt->retrans > kMaxSynPktRetransmit)) || + ((!pkt->hasSyn()) && (pkt->retrans > kMaxPktRetransmit))) + + { + + /* too many attempts close stream */ +#ifdef DEBUG_TCP_STREAM_CLOSE + std::cerr << "TcpStream::retrans() Too many Retransmission Attempts ("; + std::cerr << (int) pkt->retrans << ") for Peer: " << peeraddr << std::endl; + std::cerr << "TcpStream::retrans() Closing Socket Connection"; + std::cerr << std::endl; + + //dumpPacket(std::cerr, (unsigned char *) tmpOutPkt, outPktSize); + dumpstate_locked(std::cerr); +#endif + + rslog(RSL_WARNING,rstcpstreamzone,"TcpStream::state => TCP_CLOSED (Too Many Retransmits)"); + + outStreamActive = false; + inStreamActive = false; + state = TCP_CLOSED; + cleanup(); + return 0; + } + + + udp -> sendPkt(tmpOutPkt, outPktSize, peeraddr, ttl); + + /* restart timers */ + pkt->ts = cts; + pkt->retrans++; + + /* + * finally - double the retransTimeout ... (Karn's Algorithm) + * except if we are starting a connection... i.e. hasSyn() + */ + + if (!pkt->hasSyn()) + { + incRetransmitTimeout(); + restartRetransmitTimer(); + } + else + { + resetRetransmitTimer(); + startRetransmitTimer(); + } + return 1; } @@ -2118,13 +2214,14 @@ void TcpStream::acknowledge() std::list::iterator it; double cts = getCurrentTS(); bool updateRTT = true; + bool clearedPkts = false; for(it = outPkt.begin(); (it != outPkt.end()) && (isOldSequence((*it)->seqno, outAcked)); it = outPkt.erase(it)) { TcpPacket *pkt = (*it); - + clearedPkts = true; /* adjust the congestWinSize and congestThreshold * congestUpdate <= outAcked @@ -2236,11 +2333,30 @@ void TcpStream::acknowledge() * This will effectively trigger the retransmission of the next dropped packet. */ - if (!updateRTT) - { - retransTimeout = rtt_est + 4.0 * rtt_dev; - } + /* + * if have acked all data - resetRetransTimer() + */ + if (it == outPkt.end()) + { + +#ifdef DEBUG_TCP_STREAM + std::cerr << "TcpStream::acknowledge() peer: " << peeraddr; + std::cerr << " Backlog cleared, resetRetransmitTimer"; + std::cerr << std::endl; +#endif + resetRetransmitTimer(); + } + else if (clearedPkts) + { +#ifdef DEBUG_TCP_STREAM + std::cerr << "TcpStream::acknowledge() peer: " << peeraddr; + std::cerr << " Cleared some packets -> resetRetransmitTimer + start"; + std::cerr << std::endl; +#endif + resetRetransmitTimer(); + startRetransmitTimer(); + } return; } @@ -2565,3 +2681,107 @@ int checkData(uint8 *data, int size, int idx) #endif +/***** Dump state of TCP Stream - to workout why it was closed ****/ +int TcpStream::dumpstate_locked(std::ostream &out) +{ + out << "TcpStream::dumpstate()"; + out << "======================================================="; + + out << std::endl; + out << "state: " << (int) state; + out << " errorState: " << (int) errorState; + out << std::endl; + + out << "(Streams) inStreamActive: " << inStreamActive; + out << " outStreamActive: " << outStreamActive; + out << std::endl; + + out << "(Timeouts) maxWinSize: " << maxWinSize; + out << " keepAliveTimeout: " << keepAliveTimeout; + out << " retransTimeout: " << retransTimeout; + out << std::endl; + + out << "(Timers) keepAliveTimer: " << std::setprecision(12) << keepAliveTimer; + out << " lastIncomingPkt: " << std::setprecision(12) << lastIncomingPkt; + out << std::endl; + + out << "(Tracking) lastSendAck: " << lastSentAck; + out << " lastSendWinSize: " << lastSentWinSize; + out << std::endl; + + out << "(Init) initOutSeqno: " << initOurSeqno; + out << " initPeerSeqno: " << initPeerSeqno; + out << std::endl; + + out << "(r/w) lastWriteTF: " << lastWriteTF; + out << " lastReadTF: " << lastReadTF; + out << " wcount: " << wcount; + out << " rcount: " << rcount; + out << std::endl; + + out << "(rtt) rtt_est: " << rtt_est; + out << " rtt_dev: " << rtt_dev; + out << std::endl; + + out << "(congestion) congestThreshold: " << congestThreshold; + out << " congestWinSize: " << congestWinSize; + out << " congestUpdate: " << congestUpdate; + out << std::endl; + + out << "(TTL) mTTL_period: " << mTTL_period; + out << " mTTL_start: " << std::setprecision(12) << mTTL_start; + out << " mTTL_end: " << std::setprecision(12) << mTTL_end; + out << std::endl; + + out << "(Peer) peerKnown: " << peerKnown; + out << " peerAddr: " << peeraddr; + out << std::endl; + + out << "-------------------------------------------------------"; + out << std::endl; + status_locked(out); + out << "======================================================="; + out << std::endl; + + return 1; +} + + +int TcpStream::dumpstate(std::ostream &out) +{ + tcpMtx.lock(); /********** LOCK MUTEX *********/ + + dumpstate_locked(out); + + tcpMtx.unlock(); /******** UNLOCK MUTEX *********/ + + return 1; +} + + +int dumpPacket(std::ostream &out, unsigned char *pkt, uint32_t size) +{ + uint32_t i; + + out << "dumpPacket() Size: " << size; + out << std::endl; + out << "------------------------------------------------------"; + for(i = 0; i < size; i++) + { + if (i % 16 == 0) + { + out << std::endl; + } + out << std::hex << std::setfill('0') << std::setw(2) << (int) pkt[i] << ":"; + } + if ((i - 1) % 16 != 0) + { + out << std::endl; + } + out << "------------------------------------------------------"; + out << std::dec << std::endl; + + return 1; +} + + diff --git a/libretroshare/src/tcponudp/tcpstream.h b/libretroshare/src/tcponudp/tcpstream.h index 1b9c841e5..eb79620cd 100644 --- a/libretroshare/src/tcponudp/tcpstream.h +++ b/libretroshare/src/tcponudp/tcpstream.h @@ -49,6 +49,7 @@ #define TCP_MAX_WIN 65500 #define TCP_ALIVE_TIMEOUT 15 /* 15 sec ... < 20 sec UDP state limit on some firewalls */ #define TCP_RETRANS_TIMEOUT 1 /* 1 sec (Initial value) */ +#define TCP_RETRANS_MAX_TIMEOUT 15 /* 15 secs */ #define kNoPktTimeout 60 /* 1 min */ @@ -117,6 +118,9 @@ bool ridle(); /* read idle */ uint32 wbytes(); uint32 rbytes(); + /* Exposed for debugging */ +int dumpstate(std::ostream &out); + private: /* Internal Functions - use the Mutex (not reentrant) */ @@ -129,6 +133,9 @@ bool isOldSequence(uint32 tst, uint32 curr); /* Internal Functions - only called inside mutex protection */ +int dumpstate_locked(std::ostream &out); +int status_locked(std::ostream &out); + int cleanup(); /* incoming data */ @@ -159,6 +166,14 @@ void setRemoteAddress(const struct sockaddr_in &raddr); int getTTL() { return ttl; } void setTTL(int t) { ttl = t; } +/* retransmission */ +void startRetransmitTimer(); +void restartRetransmitTimer(); +void stopRetransmitTimer(); +void resetRetransmitTimer(); +void incRetransmitTimeout(); + + /* data counting */ uint32 int_wbytes(); uint32 int_rbytes(); @@ -195,12 +210,15 @@ uint32 int_rbytes(); uint32 inAckno; /* next expected */ uint32 inWinSize; /* allowing other to send */ - uint32 rrt; /* some (initially) consts */ uint32 maxWinSize; uint32 keepAliveTimeout; + + /* retransmit */ + bool retransTimerOn; double retransTimeout; + double retransTimerTs; /* some timers */ double keepAliveTimer;