Rework of tcponudp/tcpstream should make UDP connections much more stable.

- Fixed retransmit algorithm. Much more closely matched to TCP standard: http://tools.ietf.org/html/rfc2988
	- This increases retransmit timeouts, and reduces the number of packets resent.
	- Added better debugging for retrans/close as separate #defines.
	- Further testing is required ;)



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@5252 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2012-06-24 02:06:24 +00:00
parent 6e536ae67e
commit bc7b42d81f
2 changed files with 430 additions and 192 deletions

View File

@ -29,6 +29,7 @@
#include "tcpstream.h"
#include <iostream>
#include <iomanip>
#include <assert.h>
#include <errno.h>
#include <math.h>
@ -47,8 +48,13 @@ const int rstcpstreamzone = 28455;
/*
* #define DEBUG_TCP_STREAM 1
* #define DEBUG_TCP_STREAM_RETRANS 1
* #define DEBUG_TCP_STREAM_CLOSE 1
*/
//#define DEBUG_TCP_STREAM_RETRANS 1
//#define DEBUG_TCP_STREAM_CLOSE 1
/*
*#define DEBUG_TCP_STREAM_EXTRA 1
*/
@ -63,13 +69,15 @@ int setupBinaryCheck(std::string fname);
#endif
static const uint32 kMaxQueueSize = 300; // Was 100, which means max packet size of 100k (smaller than max packet size).
static const uint32 kMaxPktRetransmit = 20;
static const uint32 kMaxPktRetransmit = 10;
static const uint32 kMaxSynPktRetransmit = 100; // 100 => 200secs = over 3 minutes startup
static const int TCP_STD_TTL = 64;
static const int TCP_DEFAULT_FIREWALL_TTL = 4;
static const double RTT_ALPHA = 0.875;
int dumpPacket(std::ostream &out, unsigned char *pkt, uint32_t size);
// platform independent fractional timestamp.
static double getCurrentTS();
@ -82,7 +90,9 @@ TcpStream::TcpStream(UdpSubReceiver *lyr)
inAckno(0), inWinSize(0),
maxWinSize(TCP_MAX_WIN),
keepAliveTimeout(TCP_ALIVE_TIMEOUT),
retransTimerOn(false),
retransTimeout(TCP_RETRANS_TIMEOUT),
retransTimerTs(0),
lastWriteTF(0),lastReadTF(0),
wcount(0), rcount(0),
errorState(0),
@ -304,6 +314,17 @@ int TcpStream::status(std::ostream &out)
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
int s = status_locked(out);
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return s;
}
int TcpStream::status_locked(std::ostream &out)
{
int tmpstate = state;
// can leave the timestamp here as time()... rough but okay.
@ -328,8 +349,6 @@ int TcpStream::status(std::ostream &out)
out << std::endl;
out << std::endl;
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return tmpstate;
}
@ -993,6 +1012,13 @@ int TcpStream::recv_check()
* for max efficiency
*/
#ifdef DEBUG_TCP_STREAM_CLOSE
std::cerr << "TcpStream::recv_check() state = CLOSED (NoPktTimeout)";
std::cerr << std::endl;
dumpstate_locked(std::cerr);
#endif
rslog(RSL_WARNING, rstcpstreamzone, "TcpStream::state => TCP_CLOSED (kNoPktTimeout)");
outStreamActive = false;
@ -1165,6 +1191,13 @@ int TcpStream::handleIncoming(TcpPacket *pkt)
* timeout of this state.
*
*/
#ifdef DEBUG_TCP_STREAM_CLOSE
std::cerr << "TcpStream::handleIncoming() state = CLOSED (TimedWait)";
std::cerr << std::endl;
dumpstate_locked(std::cerr);
#endif
state = TCP_CLOSED;
// return incoming_TimedWait(pkt);
{
@ -1746,12 +1779,14 @@ int TcpStream::check_InPkts()
}
else if (state == TCP_LAST_ACK)
{
state = TCP_CLOSED;
#ifdef DEBUG_TCP_STREAM
std::cerr << "TcpStream::state = TCP_CLOSED";
#ifdef DEBUG_TCP_STREAM_CLOSE
std::cerr << "TcpStream::state = TCP_CLOSED (LastAck)";
std::cerr << std::endl;
dumpstate_locked(std::cerr);
#endif
state = TCP_CLOSED;
rslog(RSL_WARNING, rstcpstreamzone, "TcpStream::state => TCP_CLOSED (LAST_ACK, recvd ACK)");
cleanup();
@ -1896,6 +1931,9 @@ int TcpStream::toSend(TcpPacket *pkt, bool retrans)
/* restart timers */
pkt -> ts = cts;
pkt -> retrans = 0;
startRetransmitTimer();
#ifdef DEBUG_TCP_STREAM
std::cerr << "TcpStream::toSend() Adding to outPkt --> Seqno: ";
std::cerr << pkt->seqno << " size: " << pkt->datasize;
@ -1912,12 +1950,90 @@ int TcpStream::toSend(TcpPacket *pkt, bool retrans)
}
/* single retransmit timer.
*
*/
void TcpStream::startRetransmitTimer()
{
if (retransTimerOn)
{
return;
}
retransTimerTs = getCurrentTS();
retransTimerOn = true;
#ifdef DEBUG_TCP_STREAM
std::cerr << "TcpStream::startRetransmitTimer() peer: " << peeraddr;
std::cerr << " retransTimeout: " << retransTimeout;
std::cerr << " retransTimerTs: " << std::setprecision(12) <<retransTimerTs;
std::cerr << std::endl;
#endif
}
void TcpStream::restartRetransmitTimer()
{
stopRetransmitTimer();
startRetransmitTimer();
}
void TcpStream::stopRetransmitTimer()
{
if (!retransTimerOn)
{
return;
}
#ifdef DEBUG_TCP_STREAM
std::cerr << "TcpStream::stopRetransmitTimer() peer: " << peeraddr;
std::cerr << std::endl;
#endif
retransTimerOn = false;
}
void TcpStream::resetRetransmitTimer()
{
retransTimerOn = false;
retransTimeout = 2.0 * (rtt_est + 4.0 * rtt_dev);
// happens too often for RETRANS debugging.
#ifdef DEBUG_TCP_STREAM
std::cerr << "TcpStream::resetRetransmitTimer() peer: " << peeraddr;
std::cerr << " retransTimeout: " << std::setprecision(12) << retransTimeout;
std::cerr << std::endl;
#endif
}
void TcpStream::incRetransmitTimeout()
{
retransTimeout = 2 * retransTimeout;
if (retransTimeout > TCP_RETRANS_MAX_TIMEOUT)
{
retransTimeout = TCP_RETRANS_MAX_TIMEOUT;
}
#ifdef DEBUG_TCP_STREAM_RETRANS
std::cerr << "TcpStream::incRetransmitTimer() peer: " << peeraddr;
std::cerr << " retransTimeout: " << std::setprecision(12) << retransTimeout;
std::cerr << std::endl;
#endif
}
int TcpStream::retrans()
{
int outPktSize = MAX_SEG + TCP_PSEUDO_HDR_SIZE;
char tmpOutPkt[outPktSize];
bool updateCongestion = true;
if (!peerKnown)
{
@ -1929,25 +2045,37 @@ int TcpStream::retrans()
return 0;
}
/* now retrans */
if (!retransTimerOn)
{
return 0;
}
double cts = getCurrentTS();
std::list<TcpPacket *>::iterator it;
for(it = outPkt.begin(); (it != outPkt.end()); it++)
if (cts - retransTimerTs < retransTimeout)
{
outPktSize = MAX_SEG + TCP_PSEUDO_HDR_SIZE;
TcpPacket *pkt = (*it);
if (cts - pkt->ts > retransTimeout)
return 0;
}
if (outPkt.begin() == outPkt.end())
{
resetRetransmitTimer();
return 0;
}
TcpPacket *pkt = outPkt.front();
if (!pkt)
{
/* error */
return 0;
}
/* retransmission -> adjust the congestWinSize and congestThreshold
* but only once per cycle
*/
if (updateCongestion)
{
congestThreshold = congestWinSize / 2;
congestWinSize = MAX_SEG;
congestUpdate = outAcked + congestWinSize; // point when we can up the winSize.
updateCongestion = false;
#ifdef DEBUG_TCP_STREAM
std::cerr << "TcpStream::retrans() Adjusting Congestion Parameters: ";
@ -1958,44 +2086,6 @@ int TcpStream::retrans()
std::cerr << std::endl;
#endif
}
/* before we can retranmit,
* we need to check that its within the congestWinSize
* -> actually only checking that the start (seqno) is within window!
*/
if (isOldSequence(outAcked + congestWinSize, pkt->seqno))
{
/* cannot send .... */
#ifdef DEBUG_TCP_STREAM
std::cerr << "TcpStream::retrans() Retranmission Delayed by CongestionWindow";
std::cerr << std::endl;
std::cerr << "\toutAcked: " << outAcked;
std::cerr << " CongestWinSize:" << congestWinSize;
std::cerr << std::endl;
std::cerr << "\tAttempted Packet: Seqno: ";
std::cerr << pkt->seqno << " size: " << pkt->datasize;
std::cerr << " retrans: " << (int) pkt->retrans;
std::cerr << " timeout: " << retransTimeout;
std::cerr << std::endl;
#endif
/* as packets in order, can drop out of the fn now */
return 0;
}
#ifdef DEBUG_TCP_STREAM
std::cerr << "TcpStream::retrans() Seqno: ";
std::cerr << pkt->seqno << " size: " << pkt->datasize;
std::cerr << " retrans: " << (int) pkt->retrans;
std::cerr << " timeout: " << retransTimeout;
std::cerr << std::endl;
#endif
/* update ackno and winsize */
if (!(pkt->hasSyn()))
{
@ -2008,17 +2098,23 @@ int TcpStream::retrans()
keepAliveTimer = cts;
(*it) -> writePacket(tmpOutPkt, outPktSize);
pkt->writePacket(tmpOutPkt, outPktSize);
#ifdef DEBUG_TCP_STREAM
std::cerr << "TcpStream::retrans() ReSending Pkt" << std::endl;
std::cerr << "TcpStream::retrans Seqno: ";
std::cerr << (*it)->seqno << " size: " << (*it)->datasize;
#ifdef DEBUG_TCP_STREAM_RETRANS
std::cerr << "TcpStream::retrans()";
std::cerr << " peer: " << peeraddr;
std::cerr << " hasSyn: " << pkt->hasSyn();
std::cerr << " Seqno: ";
std::cerr << pkt->seqno << " size: " << pkt->datasize;
std::cerr << " Ackno: ";
std::cerr << (*it)->ackno << " winsize: " << (*it)->winsize;
std::cerr << pkt->ackno << " winsize: " << pkt->winsize;
std::cerr << " retrans: " << (int) pkt->retrans;
std::cerr << " timeout: " << std::setprecision(12) << retransTimeout;
std::cerr << std::endl;
//std::cerr << printPkt(tmpOutPkt, outPktSize) << std::endl;
#endif
/* if its a syn packet ** thats been
* transmitting for a while, maybe
* we should increase the ttl.
@ -2044,8 +2140,6 @@ int TcpStream::retrans()
#ifdef DEBUG_TCP_STREAM
std::cerr << out.str() << std::endl;
#endif
}
/* catch excessive retransmits
@ -2065,14 +2159,17 @@ int TcpStream::retrans()
((!pkt->hasSyn()) && (pkt->retrans > kMaxPktRetransmit)))
{
/* too many attempts close stream */
#ifdef DEBUG_TCP_STREAM
#ifdef DEBUG_TCP_STREAM_CLOSE
std::cerr << "TcpStream::retrans() Too many Retransmission Attempts (";
std::cerr << (int) pkt->retrans << ") for Pkt" << std::endl;
std::cerr << (int) pkt->retrans << ") for Peer: " << peeraddr << std::endl;
std::cerr << "TcpStream::retrans() Closing Socket Connection";
std::cerr << std::endl;
#endif
//dumpPacket(std::cerr, (unsigned char *) tmpOutPkt, outPktSize);
dumpstate_locked(std::cerr);
#endif
rslog(RSL_WARNING,rstcpstreamzone,"TcpStream::state => TCP_CLOSED (Too Many Retransmits)");
@ -2087,26 +2184,25 @@ int TcpStream::retrans()
udp -> sendPkt(tmpOutPkt, outPktSize, peeraddr, ttl);
/* restart timers */
(*it) -> ts = cts;
(*it) -> retrans++;
pkt->ts = cts;
pkt->retrans++;
/* finally - double the retransTimeout ... (Karn's Algorithm)
* this ensures we don't retransmit all the packets that
* following a dropped packet!
*
* but if we have lots of dropped this ain't going to help much!
*
* not doubling retransTimeout, that is can go manic and result
* in excessive timeouts, and no data flow.
/*
* finally - double the retransTimeout ... (Karn's Algorithm)
* except if we are starting a connection... i.e. hasSyn()
*/
retransTimeout = 2.0 * (rtt_est + 4.0 * rtt_dev);
#ifdef DEBUG_TCP_STREAM
std::cerr << "TcpStream::retrans() Doubling std retranTimeout to:";
std::cerr << retransTimeout;
std::cerr << std::endl;
#endif
if (!pkt->hasSyn())
{
incRetransmitTimeout();
restartRetransmitTimer();
}
else
{
resetRetransmitTimer();
startRetransmitTimer();
}
return 1;
}
@ -2118,13 +2214,14 @@ void TcpStream::acknowledge()
std::list<TcpPacket *>::iterator it;
double cts = getCurrentTS();
bool updateRTT = true;
bool clearedPkts = false;
for(it = outPkt.begin(); (it != outPkt.end()) &&
(isOldSequence((*it)->seqno, outAcked));
it = outPkt.erase(it))
{
TcpPacket *pkt = (*it);
clearedPkts = true;
/* adjust the congestWinSize and congestThreshold
* congestUpdate <= outAcked
@ -2236,11 +2333,30 @@ void TcpStream::acknowledge()
* This will effectively trigger the retransmission of the next dropped packet.
*/
if (!updateRTT)
{
retransTimeout = rtt_est + 4.0 * rtt_dev;
}
/*
* if have acked all data - resetRetransTimer()
*/
if (it == outPkt.end())
{
#ifdef DEBUG_TCP_STREAM
std::cerr << "TcpStream::acknowledge() peer: " << peeraddr;
std::cerr << " Backlog cleared, resetRetransmitTimer";
std::cerr << std::endl;
#endif
resetRetransmitTimer();
}
else if (clearedPkts)
{
#ifdef DEBUG_TCP_STREAM
std::cerr << "TcpStream::acknowledge() peer: " << peeraddr;
std::cerr << " Cleared some packets -> resetRetransmitTimer + start";
std::cerr << std::endl;
#endif
resetRetransmitTimer();
startRetransmitTimer();
}
return;
}
@ -2565,3 +2681,107 @@ int checkData(uint8 *data, int size, int idx)
#endif
/***** Dump state of TCP Stream - to workout why it was closed ****/
int TcpStream::dumpstate_locked(std::ostream &out)
{
out << "TcpStream::dumpstate()";
out << "=======================================================";
out << std::endl;
out << "state: " << (int) state;
out << " errorState: " << (int) errorState;
out << std::endl;
out << "(Streams) inStreamActive: " << inStreamActive;
out << " outStreamActive: " << outStreamActive;
out << std::endl;
out << "(Timeouts) maxWinSize: " << maxWinSize;
out << " keepAliveTimeout: " << keepAliveTimeout;
out << " retransTimeout: " << retransTimeout;
out << std::endl;
out << "(Timers) keepAliveTimer: " << std::setprecision(12) << keepAliveTimer;
out << " lastIncomingPkt: " << std::setprecision(12) << lastIncomingPkt;
out << std::endl;
out << "(Tracking) lastSendAck: " << lastSentAck;
out << " lastSendWinSize: " << lastSentWinSize;
out << std::endl;
out << "(Init) initOutSeqno: " << initOurSeqno;
out << " initPeerSeqno: " << initPeerSeqno;
out << std::endl;
out << "(r/w) lastWriteTF: " << lastWriteTF;
out << " lastReadTF: " << lastReadTF;
out << " wcount: " << wcount;
out << " rcount: " << rcount;
out << std::endl;
out << "(rtt) rtt_est: " << rtt_est;
out << " rtt_dev: " << rtt_dev;
out << std::endl;
out << "(congestion) congestThreshold: " << congestThreshold;
out << " congestWinSize: " << congestWinSize;
out << " congestUpdate: " << congestUpdate;
out << std::endl;
out << "(TTL) mTTL_period: " << mTTL_period;
out << " mTTL_start: " << std::setprecision(12) << mTTL_start;
out << " mTTL_end: " << std::setprecision(12) << mTTL_end;
out << std::endl;
out << "(Peer) peerKnown: " << peerKnown;
out << " peerAddr: " << peeraddr;
out << std::endl;
out << "-------------------------------------------------------";
out << std::endl;
status_locked(out);
out << "=======================================================";
out << std::endl;
return 1;
}
int TcpStream::dumpstate(std::ostream &out)
{
tcpMtx.lock(); /********** LOCK MUTEX *********/
dumpstate_locked(out);
tcpMtx.unlock(); /******** UNLOCK MUTEX *********/
return 1;
}
int dumpPacket(std::ostream &out, unsigned char *pkt, uint32_t size)
{
uint32_t i;
out << "dumpPacket() Size: " << size;
out << std::endl;
out << "------------------------------------------------------";
for(i = 0; i < size; i++)
{
if (i % 16 == 0)
{
out << std::endl;
}
out << std::hex << std::setfill('0') << std::setw(2) << (int) pkt[i] << ":";
}
if ((i - 1) % 16 != 0)
{
out << std::endl;
}
out << "------------------------------------------------------";
out << std::dec << std::endl;
return 1;
}

View File

@ -49,6 +49,7 @@
#define TCP_MAX_WIN 65500
#define TCP_ALIVE_TIMEOUT 15 /* 15 sec ... < 20 sec UDP state limit on some firewalls */
#define TCP_RETRANS_TIMEOUT 1 /* 1 sec (Initial value) */
#define TCP_RETRANS_MAX_TIMEOUT 15 /* 15 secs */
#define kNoPktTimeout 60 /* 1 min */
@ -117,6 +118,9 @@ bool ridle(); /* read idle */
uint32 wbytes();
uint32 rbytes();
/* Exposed for debugging */
int dumpstate(std::ostream &out);
private:
/* Internal Functions - use the Mutex (not reentrant) */
@ -129,6 +133,9 @@ bool isOldSequence(uint32 tst, uint32 curr);
/* Internal Functions - only called inside mutex protection */
int dumpstate_locked(std::ostream &out);
int status_locked(std::ostream &out);
int cleanup();
/* incoming data */
@ -159,6 +166,14 @@ void setRemoteAddress(const struct sockaddr_in &raddr);
int getTTL() { return ttl; }
void setTTL(int t) { ttl = t; }
/* retransmission */
void startRetransmitTimer();
void restartRetransmitTimer();
void stopRetransmitTimer();
void resetRetransmitTimer();
void incRetransmitTimeout();
/* data counting */
uint32 int_wbytes();
uint32 int_rbytes();
@ -195,12 +210,15 @@ uint32 int_rbytes();
uint32 inAckno; /* next expected */
uint32 inWinSize; /* allowing other to send */
uint32 rrt;
/* some (initially) consts */
uint32 maxWinSize;
uint32 keepAliveTimeout;
/* retransmit */
bool retransTimerOn;
double retransTimeout;
double retransTimerTs;
/* some timers */
double keepAliveTimer;