From 2bf94b909a4709cadd3267fd158f4a912ea3f6e2 Mon Sep 17 00:00:00 2001 From: drbob Date: Tue, 4 Aug 2009 23:22:44 +0000 Subject: [PATCH] Reworking of networking code to enable Net Restart. * Stun code now runs continually - to check external network state. * Udpsorter controls DHT stun is on/off. (via p3ConnectMgr) * added code to enable threads to join/restart * enabled NetRestart for UDP and TCP. * tweaked networking code for faster startup (now ~30 seconds - can still be improved). * tweaked debug messages for testing networking * Added test for checking external IP address determination. git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@1492 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/pqi/Makefile | 2 +- libretroshare/src/pqi/authgpg.cc | 2 - libretroshare/src/pqi/authssl.cc | 1 - libretroshare/src/pqi/p3connmgr.cc | 222 ++++++++++++++++---- libretroshare/src/pqi/p3connmgr.h | 8 + libretroshare/src/pqi/p3dhtmgr.cc | 23 ++- libretroshare/src/pqi/pqimonitor.h | 10 + libretroshare/src/pqi/pqipersongrp.cc | 7 + libretroshare/src/pqi/pqipersongrp.h | 4 +- libretroshare/src/pqi/pqissllistener.cc | 22 ++ libretroshare/src/rsserver/p3peers.cc | 1 - libretroshare/src/rsserver/rsinit.cc | 1 + libretroshare/src/tcponudp/tou.cc | 42 ++++ libretroshare/src/tcponudp/tou.h | 4 + libretroshare/src/tcponudp/udplayer.cc | 50 ++++- libretroshare/src/tcponudp/udplayer.h | 5 +- libretroshare/src/tcponudp/udpsorter.cc | 121 +++++++++-- libretroshare/src/tcponudp/udpsorter.h | 24 ++- libretroshare/src/tests/Makefile | 21 ++ libretroshare/src/tests/netsetup_test.cc | 253 +++++++++++++++++++++++ libretroshare/src/util/rsthreads.cc | 30 ++- libretroshare/src/util/rsthreads.h | 2 + 22 files changed, 774 insertions(+), 81 deletions(-) create mode 100644 libretroshare/src/tests/Makefile create mode 100644 libretroshare/src/tests/netsetup_test.cc diff --git a/libretroshare/src/pqi/Makefile b/libretroshare/src/pqi/Makefile index e90080539..27fd638a4 100644 --- a/libretroshare/src/pqi/Makefile +++ b/libretroshare/src/pqi/Makefile @@ -20,7 +20,7 @@ UDP_OBJ = pqissludp.o OTHER_OBJ = p3notify.o -TESTOBJ = net_test.o dht_test.o net_test1.o +TESTOBJ = net_test.o dht_test.o net_test1.o #conn_test.o TESTS = net_test dht_test net_test1 diff --git a/libretroshare/src/pqi/authgpg.cc b/libretroshare/src/pqi/authgpg.cc index 7188fc0a8..f8ef2ecea 100644 --- a/libretroshare/src/pqi/authgpg.cc +++ b/libretroshare/src/pqi/authgpg.cc @@ -57,8 +57,6 @@ #include #include -#define AUTHGPG_DEBUG 1 - /* Turn a set of parameters into a string */ static std::string setKeyPairParams(bool useRsa, unsigned int blen, diff --git a/libretroshare/src/pqi/authssl.cc b/libretroshare/src/pqi/authssl.cc index 4419201a7..0f3106a5e 100644 --- a/libretroshare/src/pqi/authssl.cc +++ b/libretroshare/src/pqi/authssl.cc @@ -53,7 +53,6 @@ static int verify_x509_callback(int preverify_ok, X509_STORE_CTX *ctx); /*********** ** #define AUTHSSL_DEBUG 1 **********/ -#define AUTHSSL_DEBUG 1 #ifdef PQI_USE_SSLONLY diff --git a/libretroshare/src/pqi/p3connmgr.cc b/libretroshare/src/pqi/p3connmgr.cc index 6007d382f..e82e003cd 100644 --- a/libretroshare/src/pqi/p3connmgr.cc +++ b/libretroshare/src/pqi/p3connmgr.cc @@ -57,6 +57,7 @@ const uint32_t MAX_UPNP_INIT = 10; /* seconds UPnP timeout */ /**** * #define CONN_DEBUG 1 ***/ +#define CONN_DEBUG 1 /**** * #define P3CONNMGR_NO_TCP_CONNECTIONS 1 ***/ @@ -104,6 +105,22 @@ peerConnectState::peerConnectState() return; } +std::string textPeerConnectState(peerConnectState &state) +{ + std::ostringstream out; + out << "Id: " << state.id << std::endl; + out << "NetMode: " << state.netMode << std::endl; + out << "VisState: " << state.visState << std::endl; + out << "laddr: " << inet_ntoa(state.localaddr.sin_addr) + << ":" << ntohs(state.localaddr.sin_port) << std::endl; + out << "eaddr: " << inet_ntoa(state.serveraddr.sin_addr) + << ":" << ntohs(state.serveraddr.sin_port) << std::endl; + + std::string output = out.str(); + return output; +} + + p3ConnectMgr::p3ConnectMgr(p3AuthMgr *am) :p3Config(CONFIG_TYPE_PEERS), @@ -246,6 +263,62 @@ void p3ConnectMgr::setOwnNetConfig(uint32_t netMode, uint32_t visState) * */ +/* Called to reseet the whole network stack. this call is + * triggered by udp stun address tracking. + * + * must: + * - reset UPnP and DHT. + * - + */ + +void p3ConnectMgr::netReset() +{ + std::cerr << "p3ConnectMgr::netReset()" << std::endl; + std::cerr << "p3ConnectMgr::netReset() shutdown" << std::endl; + + shutdown(); /* blocking shutdown call */ + + std::cerr << "p3ConnectMgr::netReset() reset NetStatus" << std::endl; + { + RsStackMutex stack(connMtx); /****** STACK LOCK MUTEX *******/ + mNetStatus = RS_NET_UNKNOWN; + } + + std::cerr << "p3ConnectMgr::netReset() checkNetAddress" << std::endl; + /* check Network Address */ + checkNetAddress(); + + /* reset udp network - handled by tou_init! */ + /* reset tcp network - if necessary */ + { + /* NOTE: nNetListeners should be protected via the Mutex. + * HOWEVER, as we NEVER change this list - once its setup + * we can get away without it - and assume its constant. + * + * NB: (*it)->reset_listener must be out of the mutex, + * as it calls back to p3ConnMgr. + */ + + std::cerr << "p3ConnectMgr::netReset() resetting listeners" << std::endl; + std::list::const_iterator it; + for(it = mNetListeners.begin(); it != mNetListeners.end(); it++) + { + std::cerr << "p3ConnectMgr::netReset() reset listener" << std::endl; + (*it)->reset_listener(); + } + } + + std::cerr << "p3ConnectMgr::netReset() done" << std::endl; +} + +/* to allow resets of network stuff */ +void p3ConnectMgr::addNetListener(pqiNetListener *listener) +{ + RsStackMutex stack(connMtx); /****** STACK LOCK MUTEX *******/ + mNetListeners.push_back(listener); +} + + void p3ConnectMgr::netStatusReset() { netFlagOk = true; @@ -284,6 +357,7 @@ void p3ConnectMgr::netStartup() std::cerr << "p3ConnectMgr::netStartup() tou_stunkeepalive() enabled" << std::endl; #endif tou_stunkeepalive(1); + mStunMoreRequired = true; ownState.netMode &= ~(RS_NET_MODE_ACTUAL); @@ -293,12 +367,6 @@ void p3ConnectMgr::netStartup() case RS_NET_MODE_TRY_EXT: /* v similar to UDP */ ownState.netMode |= RS_NET_MODE_EXT; mNetStatus = RS_NET_UDP_SETUP; -#ifdef CONN_DEBUG - std::cerr << "p3ConnectMgr::netStartup() disabling stunkeepalive() cos EXT" << std::endl; -#endif - tou_stunkeepalive(0); - mStunMoreRequired = false; /* only need to validate address (EXT) */ - break; case RS_NET_MODE_TRY_UDP: @@ -552,10 +620,6 @@ void p3ConnectMgr::netUpnpCheck() /* UPnP Failed us! */ mUpnpAddrValid = false; mNetStatus = RS_NET_UDP_SETUP; -#ifdef CONN_DEBUG - std::cerr << "p3ConnectMgr::netUpnpCheck() enabling stunkeepalive() cos UDP" << std::endl; -#endif - tou_stunkeepalive(1); connMtx.unlock(); /* UNLOCK MUTEX */ } @@ -577,11 +641,6 @@ void p3ConnectMgr::netUpnpCheck() mNetStatus = RS_NET_UDP_SETUP; /* Fix netMode & Clear others! */ ownState.netMode = RS_NET_MODE_TRY_UPNP | RS_NET_MODE_UPNP; -#ifdef CONN_DEBUG - std::cerr << "p3ConnectMgr::netUpnpCheck() disabling stunkeepalive() cos uPnP" << std::endl; -#endif - tou_stunkeepalive(0); - mStunMoreRequired = false; /* only need to validate address (UPNP) */ connMtx.unlock(); /* UNLOCK MUTEX */ } @@ -645,8 +704,6 @@ void p3ConnectMgr::netUdpCheck() #endif ownState.netMode &= ~(RS_NET_MODE_ACTUAL); ownState.netMode |= RS_NET_MODE_UNREACHABLE; - tou_stunkeepalive(0); - mStunMoreRequired = false; /* no point -> unreachable (EXT) */ /* send a system warning message */ pqiNotify *notify = getPqiNotify(); @@ -882,44 +939,137 @@ void p3ConnectMgr::stunInit() mStunMoreRequired = true; } +/* This is continually called + * + * checks whether the ext address is consistent + * + * checks if UDP needs more stun peers - or not + * The status is passed onto the DHT. + * + */ + bool p3ConnectMgr::stunCheck() { - /* check if we've got a Stun result */ - bool stunOk = false; #ifdef CONN_DEBUG - //std::cerr << "p3ConnectMgr::stunCheck()" << std::endl; + std::cerr << "p3ConnectMgr::stunCheck()" << std::endl; #endif - { - RsStackMutex stack(connMtx); /********* LOCK STACK MUTEX ******/ + /* check udp address stability */ + + bool netDone = false; + bool doNetReset = false; - /* if DONE -> return */ - if (mStunStatus == RS_STUN_DONE) { - return true; + RsStackMutex stack(connMtx); /****** STACK LOCK MUTEX *******/ + mStunStatus = RS_STUN_DHT; + netDone = (mNetStatus == RS_NET_DONE); } - if (mStunFound >= RS_STUN_FOUND_MIN) + struct sockaddr_in raddr; + socklen_t rlen = sizeof(raddr); + struct sockaddr_in eaddr; + socklen_t elen = sizeof(eaddr); + uint8_t stable; + uint32_t failCount; + time_t lastSent; + time_t now = time(NULL); + + if (netDone) { - mStunMoreRequired = false; +#ifdef CONN_DEBUG + std::cerr << "NetSetupDone: Checking if network is same" << std::endl; +#endif + + if (0 < tou_extaddr((struct sockaddr *) &raddr, &rlen, &stable)) + { + RsStackMutex stack(connMtx); /****** STACK LOCK MUTEX *******/ + + if ((mStunExtAddr.sin_addr.s_addr != raddr.sin_addr.s_addr) || + (mStunAddrStable != stable)) + { +#ifdef CONN_DEBUG + std::cerr << "Ext Address Changed -> netReset" << std::endl; +#endif + + doNetReset = true; + } + else + { +#ifdef CONN_DEBUG + std::cerr << "Ext Address Same: ok!" << std::endl; +#endif + } + } + else + { +#ifdef CONN_DEBUG + std::cerr << "No Ext Address -> netReset" << std::endl; +#endif + + doNetReset = true; + } } - stunOk = (!mStunMoreRequired); - } - - if (udpExtAddressCheck() && (stunOk)) + if (doNetReset) { - /* set external UDP address */ - netAssistStun(false); +#ifdef CONN_DEBUG + std::cerr << "Resetting Network" << std::endl; +#endif + netReset(); + } + + int i = 0; + for(i = 0; tou_getstunpeer(i, (struct sockaddr *) &raddr, &rlen, + (struct sockaddr *) &eaddr, &elen, + &failCount, &lastSent); i++) + { + std::cerr << "STUN PEERS: "; + std::cerr << " raddr: " << inet_ntoa(raddr.sin_addr) << ":" << ntohs(raddr.sin_port); + std::cerr << " eaddr: " << inet_ntoa(eaddr.sin_addr) << ":" << ntohs(eaddr.sin_port); + if (lastSent) + { + std::cerr << " failCount: " << failCount << " lastSent: " << now-lastSent; + } + else + { + std::cerr << " Unused "; + } + std::cerr << std::endl; + } + + /* pass on udp status to dht */ + if (tou_needstunpeers()) + { RsStackMutex stack(connMtx); /****** STACK LOCK MUTEX *******/ - mStunStatus = RS_STUN_DONE; + if (!mStunMoreRequired) + { +#ifdef CONN_DEBUG + std::cerr << "Telling DHT More Stun Required" << std::endl; +#endif - return true; + netAssistStun(true); + mStunMoreRequired = true; + } } - return false; + else + { + RsStackMutex stack(connMtx); /****** STACK LOCK MUTEX *******/ + + if (mStunMoreRequired) + { +#ifdef CONN_DEBUG + std::cerr << "Telling DHT No More Stun Required" << std::endl; +#endif + + netAssistStun(false); + mStunMoreRequired = false; + } + } + + return true; } void p3ConnectMgr::stunStatus(std::string id, struct sockaddr_in raddr, uint32_t type, uint32_t flags) diff --git a/libretroshare/src/pqi/p3connmgr.h b/libretroshare/src/pqi/p3connmgr.h index 4141207d7..1afede8cf 100644 --- a/libretroshare/src/pqi/p3connmgr.h +++ b/libretroshare/src/pqi/p3connmgr.h @@ -162,6 +162,8 @@ class peerConnectState }; +std::string textPeerConnectState(peerConnectState &state); + class p3ConnectMgr: public pqiConnectCb, public p3Config { @@ -177,6 +179,8 @@ void addNetAssistFirewall(uint32_t type, pqiNetAssistFirewall *); bool checkNetAddress(); /* check our address is sensible */ +void addNetListener(pqiNetListener *listener); + /*************** External Control ****************/ bool shutdown(); /* blocking shutdown call */ @@ -273,6 +277,8 @@ virtual bool netAssistSetAddress( struct sockaddr_in &laddr, /* Internal Functions */ +void netReset(); + void statusTick(); void netTick(); void netStartup(); @@ -342,6 +348,8 @@ private: RsMutex connMtx; /* protects below */ + std::list mNetListeners; + time_t mNetInitTS; uint32_t mNetStatus; diff --git a/libretroshare/src/pqi/p3dhtmgr.cc b/libretroshare/src/pqi/p3dhtmgr.cc index b46af7d33..d29cea438 100644 --- a/libretroshare/src/pqi/p3dhtmgr.cc +++ b/libretroshare/src/pqi/p3dhtmgr.cc @@ -66,7 +66,7 @@ const int p3dhtzone = 3892; #define DHT_DEFAULT_WAITTIME 1 /* Std sleep break period */ #define DHT_NUM_BOOTSTRAP_BINS 8 -#define DHT_MIN_BOOTSTRAP_REQ_PERIOD 30 +#define DHT_MIN_BOOTSTRAP_REQ_PERIOD 5 void printDhtPeerEntry(dhtPeerEntry *ent, std::ostream &out); @@ -1027,20 +1027,21 @@ int p3DhtMgr::checkStunState() } else if (mDhtState == DHT_STATE_FIND_STUN) { - /* if we run out of stun peers -> get some more */ - if (stunIds.size() < 1) - { + } + + /* if we run out of stun peers -> get some more */ + if (stunIds.size() < 1) + { #ifdef DHT_DEBUG - std::cerr << "WARNING: out of Stun Peers - switching to Active Now" << std::endl; + std::cerr << "WARNING: out of Stun Peers - Fetching some more" << std::endl; #endif - mDhtState = DHT_STATE_ACTIVE; - dhtMtx.unlock(); /* UNLOCK MUTEX */ + mDhtState = DHT_STATE_ACTIVE; + dhtMtx.unlock(); /* UNLOCK MUTEX */ - /* this is a locked function */ - getDhtBootstrapList(); + /* this is a locked function */ + getDhtBootstrapList(); - dhtMtx.lock(); /* LOCK MUTEX */ - } + dhtMtx.lock(); /* LOCK MUTEX */ } dhtMtx.unlock(); /* UNLOCK MUTEX */ diff --git a/libretroshare/src/pqi/pqimonitor.h b/libretroshare/src/pqi/pqimonitor.h index 55c8e0cab..86fd091ff 100644 --- a/libretroshare/src/pqi/pqimonitor.h +++ b/libretroshare/src/pqi/pqimonitor.h @@ -134,5 +134,15 @@ virtual void peerConnectRequest(std::string id, virtual void stunStatus(std::string id, struct sockaddr_in raddr, uint32_t type, uint32_t flags); }; + +/* network listener interface - used to reset network addresses */ +class pqiNetListener +{ + public: +virtual int reset_listener() = 0; + +}; + + #endif // PQI_MONITOR_H diff --git a/libretroshare/src/pqi/pqipersongrp.cc b/libretroshare/src/pqi/pqipersongrp.cc index 4a78a0358..9de6298a9 100644 --- a/libretroshare/src/pqi/pqipersongrp.cc +++ b/libretroshare/src/pqi/pqipersongrp.cc @@ -186,6 +186,8 @@ int pqipersongrp::init_listener() int pqipersongrp::restart_listener() { + std::cerr << "pqipersongrp::restart_listener()" << std::endl; + // stop it, // change the address. // restart. @@ -197,6 +199,8 @@ int pqipersongrp::restart_listener() if (haveListener) { + std::cerr << "pqipersongrp::restart_listener() haveListener" << std::endl; + peerConnectState state; mConnMgr->getOwnNetStatus(state); @@ -205,6 +209,9 @@ int pqipersongrp::restart_listener() pqil -> resetlisten(); pqil -> setListenAddr(state.localaddr); pqil -> setuplisten(); + + std::cerr << "pqipersongrp::restart_listener() done!" << std::endl; + } return 1; } diff --git a/libretroshare/src/pqi/pqipersongrp.h b/libretroshare/src/pqi/pqipersongrp.h index ddfca87f7..f5a9d2486 100644 --- a/libretroshare/src/pqi/pqipersongrp.h +++ b/libretroshare/src/pqi/pqipersongrp.h @@ -46,13 +46,15 @@ const unsigned long PQIPERSON_NO_LISTENER = 0x0001; const unsigned long PQIPERSON_ALL_BW_LIMITED = 0x0010; -class pqipersongrp: public pqihandler, public pqiMonitor, public p3ServiceServer +class pqipersongrp: public pqihandler, public pqiMonitor, public p3ServiceServer, public pqiNetListener { public: pqipersongrp(SecurityPolicy *, unsigned long flags); /*************************** Setup *************************/ /* pqilistener */ + +virtual int reset_listener() { return restart_listener(); } int init_listener(); int restart_listener(); diff --git a/libretroshare/src/pqi/pqissllistener.cc b/libretroshare/src/pqi/pqissllistener.cc index 3856aad9f..144f58599 100644 --- a/libretroshare/src/pqi/pqissllistener.cc +++ b/libretroshare/src/pqi/pqissllistener.cc @@ -171,7 +171,27 @@ int pqissllistenbase::setuplisten() out << "\tSetup Port: " << ntohs(laddr.sin_port); pqioutput(PQL_DEBUG_BASIC, pqissllistenzone, out.str()); + std::cerr << out.str() << std::endl; } + + /* added a call to REUSEADDR, so that we can re-open an existing socket + * when we restart_listener. + */ + + { + int on = 1; + if (setsockopt(lsock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) + { + std::ostringstream out; + out << "pqissllistenbase::setuplisten()"; + out << " Cannot setsockopt SO_REUSEADDR!" << std::endl; + showSocketError(out); + pqioutput(PQL_ALERT, pqissllistenzone, out.str()); + std::cerr << out.str() << std::endl; + + exit(1); + } + } if (0 != (err = bind(lsock, (struct sockaddr *) &laddr, sizeof(laddr)))) { @@ -180,6 +200,7 @@ int pqissllistenbase::setuplisten() out << " Cannot Bind to Local Address!" << std::endl; showSocketError(out); pqioutput(PQL_ALERT, pqissllistenzone, out.str()); + std::cerr << out.str() << std::endl; exit(1); return -1; @@ -198,6 +219,7 @@ int pqissllistenbase::setuplisten() out << err << std::endl; showSocketError(out); pqioutput(PQL_ALERT, pqissllistenzone, out.str()); + std::cerr << out.str() << std::endl; exit(1); return -1; diff --git a/libretroshare/src/rsserver/p3peers.cc b/libretroshare/src/rsserver/p3peers.cc index 42fb693e8..e3eb2dc83 100644 --- a/libretroshare/src/rsserver/p3peers.cc +++ b/libretroshare/src/rsserver/p3peers.cc @@ -52,7 +52,6 @@ RsPeers *rsPeers = NULL; /******* * #define P3PEERS_DEBUG 1 *******/ -#define P3PEERS_DEBUG 1 static uint32_t RsPeerTranslateTrust(uint32_t trustLvl); int ensureExtension(std::string &name, std::string def_ext); diff --git a/libretroshare/src/rsserver/rsinit.cc b/libretroshare/src/rsserver/rsinit.cc index ea540c3f2..dd3568db6 100644 --- a/libretroshare/src/rsserver/rsinit.cc +++ b/libretroshare/src/rsserver/rsinit.cc @@ -2166,6 +2166,7 @@ int RsServer::StartupRetroShare() /**************************************************************************/ pqih->init_listener(); + mConnMgr->addNetListener(pqih); /* add listener so we can reset all sockets later */ diff --git a/libretroshare/src/tcponudp/tou.cc b/libretroshare/src/tcponudp/tou.cc index 379380ad2..1c08addbf 100644 --- a/libretroshare/src/tcponudp/tou.cc +++ b/libretroshare/src/tcponudp/tou.cc @@ -63,7 +63,19 @@ static int tou_tick_all(); int tou_init(const struct sockaddr *my_addr, socklen_t addrlen) { if (tou_inited) + { + struct sockaddr_in *addr = (struct sockaddr_in *) my_addr; + udps->resetAddress(*addr); + if (!(udps->okay())) + { + std::cerr << "tou_init() FATAL ERROR: Cannot reset Udp Socket to: " + << inet_ntoa(addr->sin_addr) << ":" << ntohs(addr->sin_port); + std::cerr << std::endl; + + exit(1); + } return 1; + } tou_streams.resize(kInitStreamTable); @@ -101,6 +113,36 @@ int tou_stunkeepalive(int required) return 1; } + +int tou_getstunpeer(int i, struct sockaddr *remote_addr, socklen_t *raddrlen, + struct sockaddr *ext_addr, socklen_t *eaddrlen, + uint32_t *failCount, time_t *lastSend) +{ + if (!tou_inited) + return -1; + + std::string id; + + bool ret = udps->getStunPeer(i, id, + *((struct sockaddr_in *) remote_addr), + *((struct sockaddr_in *) ext_addr), + *failCount, *lastSend); + + return ret; +} + +int tou_needstunpeers() +{ + if (!tou_inited) + return -1; + + if (udps->needStunPeers()) + return 1; + return 0; +} + + + int tou_tick_stunkeepalive() { if (!tou_inited) diff --git a/libretroshare/src/tcponudp/tou.h b/libretroshare/src/tcponudp/tou.h index d53afa8ad..1f4540949 100644 --- a/libretroshare/src/tcponudp/tou.h +++ b/libretroshare/src/tcponudp/tou.h @@ -79,6 +79,10 @@ int tou_stunpeer(const struct sockaddr *ext_addr, socklen_t addrlen, const char int tou_stunkeepalive(int required); int tou_tick_stunkeepalive(); +int tou_getstunpeer(int i, struct sockaddr *remote_addr, socklen_t *raddrlen, + struct sockaddr *ext_addr, socklen_t *eaddrlen, + uint32_t *failCount, time_t *lastSend); +int tou_needstunpeers(); /* Connections are as similar to UNIX as possible diff --git a/libretroshare/src/tcponudp/udplayer.cc b/libretroshare/src/tcponudp/udplayer.cc index 8de945dad..a1658703f 100644 --- a/libretroshare/src/tcponudp/udplayer.cc +++ b/libretroshare/src/tcponudp/udplayer.cc @@ -178,6 +178,33 @@ int UdpLayer::status(std::ostream &out) return 1; } +int UdpLayer::reset(struct sockaddr_in &local) +{ + std::cerr << "UdpLayer::reset()" << std::endl; + + /* stop the old thread */ + { + RsStackMutex stack(sockMtx); /********** LOCK MUTEX *********/ + std::cerr << "UdpLayer::reset() setting stopThread flag" << std::endl; + stopThread = true; + } + + std::cerr << "UdpLayer::reset() joining" << std::endl; + join(); + + std::cerr << "UdpLayer::reset() closing socket" << std::endl; + close(); + + std::cerr << "UdpLayer::reset() resetting variables" << std::endl; + laddr = local; + errorState = 0; + ttl = UDP_DEF_TTL; + + std::cerr << "UdpLayer::reset() opening socket" << std::endl; + openSocket(); +} + + int UdpLayer::close() { /* close socket if open */ @@ -210,7 +237,21 @@ void UdpLayer::recv_loop() { /* select on the socket TODO */ fd_set rset; - for(;;) { + for(;;) + { + /* check if we need to stop */ + bool toStop = false; + { + RsStackMutex stack(sockMtx); /********** LOCK MUTEX *********/ + toStop = stopThread; + } + + if (toStop) + { + std::cerr << "UdpLayer::recv_loop() stopping thread" << std::endl; + stop(); + } + FD_ZERO(&rset); FD_SET(sockfd, &rset); timeout.tv_sec = 0; @@ -311,6 +352,13 @@ int UdpLayer::openSocket() #endif setTTL(UDP_DEF_TTL); + // start up our thread. + { + RsStackMutex stack(sockMtx); /********** LOCK MUTEX *********/ + stopThread = false; + } + start(); + return 1; } diff --git a/libretroshare/src/tcponudp/udplayer.h b/libretroshare/src/tcponudp/udplayer.h index d8d1dedb9..c5f347d3a 100644 --- a/libretroshare/src/tcponudp/udplayer.h +++ b/libretroshare/src/tcponudp/udplayer.h @@ -70,9 +70,12 @@ class UdpLayer: public RsThread UdpLayer(UdpReceiver *recv, struct sockaddr_in &local); virtual ~UdpLayer() { return; } +int reset(struct sockaddr_in &local); /* calls join, close, openSocket */ + int status(std::ostream &out); /* setup connections */ + int close(); int openSocket(); /* RsThread functions */ @@ -88,7 +91,6 @@ void recv_loop(); /* uses callback to UdpReceiver */ int okay(); int tick(); - int close(); /* data */ /* internals */ @@ -110,6 +112,7 @@ virtual int sendUdpPacket(const void *data, int size, struct sockaddr_in &to); int errorState; int sockfd; int ttl; + bool stopThread; RsMutex sockMtx; }; diff --git a/libretroshare/src/tcponudp/udpsorter.cc b/libretroshare/src/tcponudp/udpsorter.cc index 0dfef9d79..c1e141935 100644 --- a/libretroshare/src/tcponudp/udpsorter.cc +++ b/libretroshare/src/tcponudp/udpsorter.cc @@ -37,11 +37,20 @@ const int rsudpsorterzone = 28477; static const int STUN_TTL = 64; +#define TOU_STUN_MIN_PEERS 5 + /* * #define DEBUG_UDP_SORTER 1 */ +const int32_t TOU_STUN_MAX_FAIL_COUNT = 3; /* 3 tries (could be higher?) */ +const int32_t TOU_STUN_MAX_SEND_RATE = 5; /* every 5 seconds */ +const int32_t TOU_STUN_MAX_RECV_RATE = 25; /* every 25 seconds */ +const int32_t TOU_STUN_ADDR_MAX_AGE = 120; /* 2 minutes */ + + + UdpSorter::UdpSorter(struct sockaddr_in &local) :udpLayer(NULL), laddr(local), eaddrKnown(false), eaddrStable(false), mStunKeepAlive(false), mStunLastRecv(0), mStunLastSend(0) @@ -54,6 +63,12 @@ UdpSorter::UdpSorter(struct sockaddr_in &local) return; } +bool UdpSorter::resetAddress(struct sockaddr_in &local) +{ + return udpLayer->reset(local); +} + + /* higher level interface */ void UdpSorter::recvPkt(void *data, int size, struct sockaddr_in &from) @@ -148,7 +163,9 @@ int UdpSorter::status(std::ostream &out) int UdpSorter::openSocket() { udpLayer = new UdpLayer(this, laddr); - udpLayer->start(); + // start is called by udpLayer now, for consistency + // with reset! + //udpLayer->start(); return 1; } @@ -301,8 +318,19 @@ bool UdpSorter::locked_handleStunPkt(void *data, int size, struct sockaddr_in &f bool UdpSorter::externalAddr(struct sockaddr_in &external, uint8_t &stable) { + RsStackMutex stack(sortMtx); /********** LOCK MUTEX *********/ + if (eaddrKnown) { + /* address timeout */ + if (time(NULL) - eaddrTime > TOU_STUN_ADDR_MAX_AGE) + { + std::cerr << "UdpSorter::externalAddr() eaddr expired"; + std::cerr << std::endl; + + return false; + } + external = eaddr; if (eaddrStable) @@ -310,8 +338,15 @@ bool UdpSorter::externalAddr(struct sockaddr_in &external, uint8_t &stable) else stable = 0; + std::cerr << "UdpSorter::externalAddr() eaddr:" << inet_ntoa(external.sin_addr); + std::cerr << ":" << ntohs(external.sin_port) << " stable: " << (int) stable; + std::cerr << std::endl; + return true; } + std::cerr << "UdpSorter::externalAddr() eaddr unknown"; + std::cerr << std::endl; + return false; } @@ -503,14 +538,8 @@ bool UdpStun_isStunPacket(void *data, int size) /******************************* STUN Handling ******************************** * The KeepAlive part - slightly more complicated - * - * */ -const int32_t TOU_STUN_MAX_FAIL_COUNT = 10; /* 10 tries (could be higher?) */ -const int32_t TOU_STUN_MAX_SEND_RATE = 5; /* every 5 seconds */ -const int32_t TOU_STUN_MAX_RECV_RATE = 25; /* every 25 seconds */ - /******************************* STUN Handling ********************************/ bool UdpSorter::setStunKeepAlive(uint32_t required) @@ -536,12 +565,13 @@ bool UdpSorter::addStunPeer(const struct sockaddr_in &remote, const char *pee std::cerr << std::endl; #endif - storeStunPeer(remote, peerid); - sortMtx.lock(); /********** LOCK MUTEX *********/ bool needStun = (!eaddrKnown); sortMtx.unlock(); /******** UNLOCK MUTEX *********/ + storeStunPeer(remote, peerid, needStun); + + if (needStun) { doStun(remote); @@ -550,7 +580,7 @@ bool UdpSorter::addStunPeer(const struct sockaddr_in &remote, const char *pee return true; } -bool UdpSorter::storeStunPeer(const struct sockaddr_in &remote, const char *peerid) +bool UdpSorter::storeStunPeer(const struct sockaddr_in &remote, const char *peerid, bool sent) { #ifdef DEBUG_UDP_SORTER @@ -571,11 +601,22 @@ bool UdpSorter::storeStunPeer(const struct sockaddr_in &remote, const char *p std::cerr << std::endl; #endif /* already there */ + if (sent) + { + it->failCount += 1; + it->lastsend = time(NULL); + } return false; } } TouStunPeer peer(std::string(peerid), remote); + if (sent) + { + peer.failCount += 1; + peer.lastsend = time(NULL); + } + mStunList.push_back(peer); #ifdef DEBUG_UDP_SORTER @@ -704,10 +745,7 @@ bool UdpSorter::locked_recvdStun(const struct sockaddr_in &remote, const stru locked_printStunList(); #endif - if (!eaddrKnown) - { - locked_checkExternalAddress(); - } + locked_checkExternalAddress(); return found; } @@ -722,13 +760,23 @@ bool UdpSorter::locked_checkExternalAddress() bool found1 = false; bool found2 = false; - - std::list::iterator it; - std::list::iterator p1; - std::list::iterator p2; - for(it = mStunList.begin(); it != mStunList.end(); it++) + time_t now = time(NULL); + /* iterator backwards - as these are the most recent */ + std::list::reverse_iterator it; + std::list::reverse_iterator p1; + std::list::reverse_iterator p2; + for(it = mStunList.rbegin(); it != mStunList.rend(); it++) { - if (it->response && isExternalNet(&(it->eaddr.sin_addr))) + /* check: + 1) have response. + 2) have eaddr. + 3) no fails. + 4) recent age. + */ + + time_t age = (now - it->lastsend); + if (it->response && isExternalNet(&(it->eaddr.sin_addr)) && + (it->failCount == 0) && (age < TOU_STUN_ADDR_MAX_AGE)) { if (!found1) { @@ -757,6 +805,7 @@ bool UdpSorter::locked_checkExternalAddress() } eaddrKnown = true; eaddr = p1->eaddr; + eaddrTime = now; #ifdef DEBUG_UDP_SORTER std::cerr << "UdpSorter::locked_checkExternalAddress() Found State:"; @@ -803,6 +852,36 @@ bool UdpSorter::locked_printStunList() return true; } - +bool UdpSorter::getStunPeer(int idx, std::string &id, + struct sockaddr_in &remote, struct sockaddr_in &eaddr, + uint32_t &failCount, time_t &lastSend) +{ + RsStackMutex stack(sortMtx); /********** LOCK MUTEX *********/ + + std::list::iterator it; + int i; + for(i=0, it=mStunList.begin(); (iid); + remote = it->remote; + eaddr = it->eaddr; + failCount = it->failCount; + lastSend = it->lastsend; + return true; + } + + return false; +} + + +bool UdpSorter::needStunPeers() +{ + RsStackMutex stack(sortMtx); /********** LOCK MUTEX *********/ + + return (mStunList.size() < TOU_STUN_MIN_PEERS); +} + diff --git a/libretroshare/src/tcponudp/udpsorter.h b/libretroshare/src/tcponudp/udpsorter.h index 43e6c86c7..e8dd51f22 100644 --- a/libretroshare/src/tcponudp/udpsorter.h +++ b/libretroshare/src/tcponudp/udpsorter.h @@ -49,10 +49,20 @@ class TouStunPeer { public: TouStunPeer() - :response(false), lastsend(0), failCount(0) { return; } + :response(false), lastsend(0), failCount(0) + { + eaddr.sin_addr.s_addr = 0; + eaddr.sin_port = 0; + return; + } TouStunPeer(std::string id_in, const struct sockaddr_in &addr) - :id(id_in), remote(addr), response(false), lastsend(0), failCount(0) { return; } + :id(id_in), remote(addr), response(false), lastsend(0), failCount(0) + { + eaddr.sin_addr.s_addr = 0; + eaddr.sin_port = 0; + return; + } std::string id; struct sockaddr_in remote, eaddr; @@ -69,13 +79,20 @@ class UdpSorter: public UdpReceiver UdpSorter(struct sockaddr_in &local); virtual ~UdpSorter() { return; } +bool resetAddress(struct sockaddr_in &local); + /* add a TCPonUDP stream */ int addUdpPeer(UdpPeer *peer, const struct sockaddr_in &raddr); int removeUdpPeer(UdpPeer *peer); bool setStunKeepAlive(uint32_t required); bool addStunPeer(const struct sockaddr_in &remote, const char *peerid); +bool getStunPeer(int idx, std::string &id, + struct sockaddr_in &remote, struct sockaddr_in &eaddr, + uint32_t &failCount, time_t &lastSend); + bool checkStunKeepAlive(); +bool needStunPeers(); bool externalAddr(struct sockaddr_in &remote, uint8_t &stable); @@ -109,7 +126,7 @@ bool locked_printStunList(); bool locked_recvdStun(const struct sockaddr_in &remote, const struct sockaddr_in &extaddr); bool locked_checkExternalAddress(); -bool storeStunPeer(const struct sockaddr_in &remote, const char *peerid); +bool storeStunPeer(const struct sockaddr_in &remote, const char *peerid, bool sent); UdpLayer *udpLayer; @@ -120,6 +137,7 @@ bool storeStunPeer(const struct sockaddr_in &remote, const char *peerid); struct sockaddr_in eaddr; /* external addr */ bool eaddrKnown; bool eaddrStable; /* if true then usable. if false -> Symmettric NAT */ + time_t eaddrTime; bool mStunKeepAlive; time_t mStunLastRecv; diff --git a/libretroshare/src/tests/Makefile b/libretroshare/src/tests/Makefile new file mode 100644 index 000000000..22f0a38cb --- /dev/null +++ b/libretroshare/src/tests/Makefile @@ -0,0 +1,21 @@ + +RS_TOP_DIR = .. +##### Define any flags that are needed for this section ####### +############################################################### + +############################################################### +include $(RS_TOP_DIR)/scripts/config.mk +############################################################### + +TESTOBJ = netsetup_test.o +TESTS = netsetup_test + +all: tests + +netsetup_test: netsetup_test.o + $(CC) $(CFLAGS) -o netsetup_test netsetup_test.o $(LIBS) + +############################################################### +include $(RS_TOP_DIR)/scripts/rules.mk +############################################################### + diff --git a/libretroshare/src/tests/netsetup_test.cc b/libretroshare/src/tests/netsetup_test.cc new file mode 100644 index 000000000..decfebd73 --- /dev/null +++ b/libretroshare/src/tests/netsetup_test.cc @@ -0,0 +1,253 @@ + +#include "pqi/p3connmgr.h" +#include "pqi/p3authmgr.h" +#include "util/utest.h" + +#include "upnp/upnphandler.h" +#include "dht/opendhtmgr.h" +#include "tcponudp/tou.h" + + +INITTEST(); + +int end_test() +{ + FINALREPORT("net_test1"); + + exit(TESTRESULT()); +} + + +void printNetworkStatus(p3ConnectMgr *connMgr) +{ + std::cerr << "network status for : " << connMgr->getOwnId() << std::endl; + std::cerr << "Net Ok:" << connMgr->getNetStatusOk() << std::endl; + std::cerr << "Upnp Ok:" << connMgr->getNetStatusUpnpOk() << std::endl; + std::cerr << "DHT Ok:" << connMgr->getNetStatusDhtOk() << std::endl; + std::cerr << "Ext Ok:" << connMgr->getNetStatusExtOk() << std::endl; + std::cerr << "Udp Ok:" << connMgr->getNetStatusUdpOk() << std::endl; + std::cerr << "Tcp Ok:" << connMgr->getNetStatusTcpOk() << std::endl; + std::cerr << "network status for : " << connMgr->getOwnId() << std::endl; + + peerConnectState state; + if (connMgr->getOwnNetStatus(state)) + { + std::string txt = textPeerConnectState(state); + std::cerr << "State: " << txt << std::endl; + } + else + { + std::cerr << "No Net Status" << std::endl; + } +} + + +void setupTest(int i, p3ConnectMgr *cMgr) +{ + switch(i) + { + case 1: + { + /* Test One */ + + } + break; + + case 10: + { + /* Test One */ + + } + break; + + case 15: + { + /* Test One */ + + } + break; + + case 20: + { + /* Test One */ + + } + break; + + case 13: + { + /* Test One */ + + } + break; + + default: + std::cerr << "setupTest(" << i << ") no test here" << std::endl; + } +} + +void checkResults(int i, p3ConnectMgr *cMgr) +{ + switch(i) + { + /* Test One: Setup - without any support */ + case 1: + { + /* Expect UDP ports to be established by now */ + + //CHECK(isExternalNet(&loopback_addr)==false); + } + break; + + case 10: + { + /* Expect Local IP Address to be known */ + + } + break; + + case 15: + { + + REPORT("Basic Networking Setup"); + } + break; + + /* Test Two: DHT Running */ + case 111: + { + /* Expect UDP ports to be established by now */ + + //CHECK(isExternalNet(&loopback_addr)==false); + } + break; + + case 110: + { + /* Expect Local IP Address to be known */ + + } + break; + + /* Test 3: */ + case 145: + { + + } + break; + + case 100: + { + /* Test One */ + + } + break; + + case 5000: + { + /* End of Tests */ + end_test(); + } + break; + + default: + { + std::cerr << "checkResults(" << i << ") no test here" << std::endl; + printNetworkStatus(cMgr); + + } + } +} + +class TestMonitor: public pqiMonitor +{ + public: +virtual void statusChange(const std::list &plist) +{ + std::cerr << "TestMonitor::statusChange()"; + std::cerr << std::endl; + std::list::const_iterator it; + for(it = plist.begin(); it != plist.end(); it++) + { + std::cerr << "Event!"; + std::cerr << std::endl; + } +} + +}; + +int main(int argc, char **argv) +{ + /* options */ + bool enable_upnp = true; + bool enable_dht = true; + bool enable_forward = true; + + /* handle options */ + + int testtype = 1; + switch(testtype) + { + case 1: + /* udp test */ + enable_upnp = false; + enable_forward = false; + enable_dht = true; + break; + + } + + + std::string ownId = "OWNID"; + + /* create a dummy auth mgr */ + p3AuthMgr *authMgr = new p3DummyAuthMgr(); + p3ConnectMgr *connMgr = new p3ConnectMgr(authMgr); + + /* Setup Notify Early - So we can use it. */ + //rsNotify = new p3Notify(); + + pqiNetAssistFirewall *upnpMgr = NULL; + p3DhtMgr *dhtMgr = NULL; + + if (enable_upnp) + { + std::cerr << "Switching on UPnP" << std::endl; + upnpMgr = new upnphandler(); + connMgr->addNetAssistFirewall(1, upnpMgr); + } + + if (enable_dht) + { + p3DhtMgr *dhtMgr = new OpenDHTMgr(ownId, connMgr, "./"); + connMgr->addNetAssistConnect(1, dhtMgr); + + dhtMgr->start(); + std::cerr << "Switching on DHT" << std::endl; + dhtMgr->enable(true); + } + + + /**************************************************************************/ + /* need to Monitor too! */ + + TestMonitor *testmonitor = new TestMonitor(); + connMgr->addMonitor(testmonitor); + + connMgr->checkNetAddress(); + int i; + for(i=0; 1; i++) + { + connMgr->tick(); + + setupTest(i, connMgr); + + sleep(1); + connMgr->tick(); + + checkResults(i, connMgr); + + tou_tick_stunkeepalive(); + } +} + diff --git a/libretroshare/src/util/rsthreads.cc b/libretroshare/src/util/rsthreads.cc index abcdd86db..0a8c24357 100644 --- a/libretroshare/src/util/rsthreads.cc +++ b/libretroshare/src/util/rsthreads.cc @@ -55,8 +55,24 @@ pthread_t createThread(RsThread &thread) thread.mMutex.lock(); { - pthread_create(&tid, 0, &rsthread_init, data); - thread.mTid = tid; + +#if 0 + int ret; + ret = pthread_attr_init(&tattr); + if (doDetached) + { + ret = pthread_attr_setdetachstate(&tattr,PTHREAD_CREATE_DETACHED); + } + else + { + ret = pthread_attr_setdetachstate(&tattr,PTHREAD_CREATE_JOINABLE); + } + + pthread_create(&tid, &tattr, &rsthread_init, data); +#endif + + pthread_create(&tid, 0, &rsthread_init, data); + thread.mTid = tid; } thread.mMutex.unlock(); @@ -64,6 +80,16 @@ pthread_t createThread(RsThread &thread) } +void RsThread::join() /* waits for the the mTid thread to stop */ +{ + void *ptr; + pthread_join(mTid, &ptr); +} + +void RsThread::stop() +{ + pthread_exit(NULL); +} RsQueueThread::RsQueueThread(uint32_t min, uint32_t max, double relaxFactor ) :mMinSleep(min), mMaxSleep(max), mRelaxFactor(relaxFactor) diff --git a/libretroshare/src/util/rsthreads.h b/libretroshare/src/util/rsthreads.h index 2a613ab77..ad63ffe48 100644 --- a/libretroshare/src/util/rsthreads.h +++ b/libretroshare/src/util/rsthreads.h @@ -70,6 +70,8 @@ virtual ~RsThread() { return; } virtual void start() { createThread(*this); } virtual void run() = 0; /* called once the thread is started */ +virtual void join(); /* waits for the the mTid thread to stop */ +virtual void stop(); /* calls pthread_exit() */ pthread_t mTid; RsMutex mMutex;