diff --git a/libretroshare/src/pqi/pqiperson.cc b/libretroshare/src/pqi/pqiperson.cc index 572845295..99f9768bc 100644 --- a/libretroshare/src/pqi/pqiperson.cc +++ b/libretroshare/src/pqi/pqiperson.cc @@ -26,6 +26,7 @@ #include "pqi/pqi.h" #include "pqi/pqiperson.h" #include "pqi/pqipersongrp.h" +#include "services/p3disc.h" const int pqipersonzone = 82371; #include "util/rsdebug.h" @@ -100,10 +101,24 @@ int pqiperson::status() return -1; } +int pqiperson::receiveHeartbeat() +{ + pqioutput(PQL_DEBUG_ALERT, pqipersonzone, "pqiperson::receiveHeartbeat() from peer : " + PeerId()); + lastHeartbeatReceived = time(NULL); +} + // tick...... int pqiperson::tick() { - int activeTick = 0; + //if lastHeartbeatReceived is 0, it might be not activated so don't do a net reset. + if (active && + lastHeartbeatReceived != 0 && + (time(NULL) - lastHeartbeatReceived) > HEARTBEAT_REPEAT_TIME * 3) { + pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::tick() No heartbeat from the peer, assume connection is dead."); + this->reset(); + } + + int activeTick = 0; { std::ostringstream out; @@ -218,6 +233,7 @@ int pqiperson::notifyEvent(NetInterface *ni, int newState) "CONNECT_SUCCESS->marking so! (resetting others)"); // mark as active. active = true; + lastHeartbeatReceived = 0; activepqi = pqi; inConnectAttempt = false; @@ -297,6 +313,7 @@ int pqiperson::reset() activepqi = NULL; active = false; + lastHeartbeatReceived = 0; return 1; } diff --git a/libretroshare/src/pqi/pqiperson.h b/libretroshare/src/pqi/pqiperson.h index 90db6a771..cd7bb7210 100644 --- a/libretroshare/src/pqi/pqiperson.h +++ b/libretroshare/src/pqi/pqiperson.h @@ -41,6 +41,8 @@ static const int CONNECT_UNREACHABLE = 3; static const int CONNECT_FIREWALLED = 4; static const int CONNECT_FAILED = 5; +static const int HEARTBEAT_REPEAT_TIME = 4; + #include "pqi/pqistreamer.h" class pqiconnect: public pqistreamer, public NetInterface @@ -105,7 +107,7 @@ int reset(); int listen(); int stoplistening(); int connect(uint32_t type, struct sockaddr_in raddr, uint32_t delay, uint32_t period, uint32_t timeout); - +int receiveHeartbeat(); // add in connection method. int addChildInterface(uint32_t type, pqiconnect *pqi); @@ -132,6 +134,7 @@ pqiconnect *getKid(uint32_t type); pqiconnect *activepqi; bool inConnectAttempt; int waittimes; + time_t lastHeartbeatReceived;//use to track connection failure private: /* Helper functions */ diff --git a/libretroshare/src/rsserver/rsinit.cc b/libretroshare/src/rsserver/rsinit.cc index 9680c1dca..05f2dded5 100644 --- a/libretroshare/src/rsserver/rsinit.cc +++ b/libretroshare/src/rsserver/rsinit.cc @@ -1973,7 +1973,7 @@ int RsServer::StartupRetroShare() mGeneralConfig = new p3GeneralConfig(); /* create Services */ - ad = new p3disc(mAuthMgr, mConnMgr); + ad = new p3disc(mAuthMgr, mConnMgr, pqih); msgSrv = new p3MsgService(mConnMgr); chatSrv = new p3ChatService(mConnMgr); diff --git a/libretroshare/src/serialiser/rsdiscitems.cc b/libretroshare/src/serialiser/rsdiscitems.cc index 7a4526954..cf2c6d41d 100644 --- a/libretroshare/src/serialiser/rsdiscitems.cc +++ b/libretroshare/src/serialiser/rsdiscitems.cc @@ -49,6 +49,7 @@ uint32_t RsDiscSerialiser::size(RsItem *i) RsDiscReply *rdr; RsDiscIssuer *rds; RsDiscVersion *rdv; + RsDiscHeartbeat *rdt; /* do reply first - as it is derived from Item */ if (NULL != (rdr = dynamic_cast(i))) @@ -67,6 +68,10 @@ uint32_t RsDiscSerialiser::size(RsItem *i) { return sizeVersion(rdv); } + else if (NULL != (rdt = dynamic_cast(i))) + { + return sizeHeartbeat(rdt); + } return 0; } @@ -78,6 +83,7 @@ bool RsDiscSerialiser::serialise(RsItem *i, void *data, uint32_t *pktsize) RsDiscReply *rdr; RsDiscIssuer *rds; RsDiscVersion *rdv; + RsDiscHeartbeat *rdt; /* do reply first - as it is derived from Item */ if (NULL != (rdr = dynamic_cast(i))) @@ -96,6 +102,10 @@ bool RsDiscSerialiser::serialise(RsItem *i, void *data, uint32_t *pktsize) { return serialiseVersion(rdv, data, pktsize); } + else if (NULL != (rdt = dynamic_cast(i))) + { + return serialiseHeartbeat(rdt, data, pktsize); + } return false; } @@ -126,7 +136,10 @@ RsItem *RsDiscSerialiser::deserialise(void *data, uint32_t *pktsize) case RS_PKT_SUBTYPE_DISC_VERSION: return deserialiseVersion(data, pktsize); break; - default: + case RS_PKT_SUBTYPE_DISC_HEARTBEAT: + return deserialiseHeartbeat(data, pktsize); + break; + default: return NULL; break; } @@ -841,4 +854,129 @@ RsDiscVersion *RsDiscSerialiser::deserialiseVersion(void *data, uint32_t *pktsiz } +/*************************************************************************/ + + +RsDiscHeartbeat::~RsDiscHeartbeat() +{ + return; +} +void RsDiscHeartbeat::clear() +{ +} + +std::ostream &RsDiscHeartbeat::print(std::ostream &out, uint16_t indent) +{ + printRsItemBase(out, "RsDiscHeartbeat", indent); + uint16_t int_Indent = indent + 2; + + printRsItemEnd(out, "RsDiscHeartbeat", indent); + return out; +} + +uint32_t RsDiscSerialiser::sizeHeartbeat(RsDiscHeartbeat *item) +{ + uint32_t s = 8; /* header */ + + return s; +} + +/* serialise the data to the buffer */ +bool RsDiscSerialiser::serialiseHeartbeat(RsDiscHeartbeat *item, void *data, uint32_t *pktsize) +{ + uint32_t tlvsize = sizeHeartbeat(item); + uint32_t offset = 0; + + if (*pktsize < tlvsize) + return false; /* not enough space */ + + *pktsize = tlvsize; + + bool ok = true; + + ok &= setRsItemHeader(data, *pktsize, item->PacketId(), *pktsize); + +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDiscSerialiser::serialiseHeartbeat() Header: " << ok << std::endl; + std::cerr << "RsDiscSerialiser::serialiseHeartbeat() Size: " << tlvsize << std::endl; +#endif + + /* skip the header */ + offset += 8; + + if (offset != tlvsize) + { + ok = false; +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDiscSerialiser::serialiseHeartbeat() Size Error! " << std::endl; + std::cerr << "Offset: " << offset << " tlvsize: " << tlvsize << std::endl; +#endif + } + + return ok; +} + +RsDiscHeartbeat *RsDiscSerialiser::deserialiseHeartbeat(void *data, uint32_t *pktsize) +{ + /* get the type and size */ + uint32_t rstype = getRsItemId(data); + uint32_t rssize = getRsItemSize(data); + + uint32_t offset = 0; + + if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) || + (RS_SERVICE_TYPE_DISC != getRsItemService(rstype)) || + (RS_PKT_SUBTYPE_DISC_HEARTBEAT != getRsItemSubType(rstype))) + { +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDiscSerialiser::deserialiseHeartbeat() Wrong Type" << std::endl; +#endif + return NULL; /* wrong type */ + } + + if (*pktsize < rssize) /* check size */ + { +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDiscSerialiser::deserialiseHeartbeat() pktsize != rssize" << std::endl; + std::cerr << "Pktsize: " << *pktsize << " Rssize: " << rssize << std::endl; +#endif + return NULL; /* not enough data */ + } + + /* set the packet length */ + *pktsize = rssize; + + bool ok = true; + + /* ready to load */ + RsDiscHeartbeat *item = new RsDiscHeartbeat(); + item->clear(); + + /* skip the header */ + offset += 8; + + if (offset != rssize) + { +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDiscSerialiser::deserialiseHeartbeat() offset != rssize" << std::endl; + std::cerr << "Offset: " << offset << " Rssize: " << rssize << std::endl; +#endif + /* error */ + delete item; + return NULL; + } + + if (!ok) + { +#ifdef RSSERIAL_DEBUG + std::cerr << "RsDiscSerialiser::deserialiseHeartbeat() ok = false" << std::endl; +#endif + delete item; + return NULL; + } + + return item; +} + + /*************************************************************************/ diff --git a/libretroshare/src/serialiser/rsdiscitems.h b/libretroshare/src/serialiser/rsdiscitems.h index fef35ea63..ea6829f5d 100644 --- a/libretroshare/src/serialiser/rsdiscitems.h +++ b/libretroshare/src/serialiser/rsdiscitems.h @@ -38,6 +38,7 @@ const uint8_t RS_PKT_SUBTYPE_DISC_OWN = 0x01; const uint8_t RS_PKT_SUBTYPE_DISC_REPLY = 0x02; const uint8_t RS_PKT_SUBTYPE_DISC_ISSUER = 0x03; const uint8_t RS_PKT_SUBTYPE_DISC_VERSION = 0x05; +const uint8_t RS_PKT_SUBTYPE_DISC_HEARTBEAT = 0x06; class RsDiscItem: public RsItem { @@ -131,6 +132,18 @@ public: std::string version; }; +class RsDiscHeartbeat: public RsDiscItem +{ +public: + RsDiscHeartbeat() :RsDiscItem(RS_PKT_SUBTYPE_DISC_HEARTBEAT) + { return; } + + virtual ~RsDiscHeartbeat(); + + virtual void clear(); + virtual std::ostream &print(std::ostream &out, uint16_t indent = 0); +}; + class RsDiscSerialiser: public RsSerialType { public: @@ -162,6 +175,10 @@ virtual uint32_t sizeVersion(RsDiscVersion *); virtual bool serialiseVersion(RsDiscVersion *item, void *data, uint32_t *size); virtual RsDiscVersion *deserialiseVersion(void *data, uint32_t *size); +virtual uint32_t sizeHeartbeat(RsDiscHeartbeat *); +virtual bool serialiseHeartbeat(RsDiscHeartbeat *item, void *data, uint32_t *size); +virtual RsDiscHeartbeat *deserialiseHeartbeat(void *data, uint32_t *size); + }; diff --git a/libretroshare/src/services/p3disc.cc b/libretroshare/src/services/p3disc.cc index 9b95c3ce3..84198fa0a 100644 --- a/libretroshare/src/services/p3disc.cc +++ b/libretroshare/src/services/p3disc.cc @@ -79,8 +79,8 @@ const uint32_t P3DISC_FLAGS_ASK_VERSION = 0x0080; ****************************************************************************************** *****************************************************************************************/ -p3disc::p3disc(p3AuthMgr *am, p3ConnectMgr *cm) - :p3Service(RS_SERVICE_TYPE_DISC), mAuthMgr(am), mConnMgr(cm) +p3disc::p3disc(p3AuthMgr *am, p3ConnectMgr *cm, pqipersongrp *pqih) + :p3Service(RS_SERVICE_TYPE_DISC), mAuthMgr(am), mConnMgr(cm), mPqiPersonGrp(pqih) { RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ @@ -88,6 +88,7 @@ p3disc::p3disc(p3AuthMgr *am, p3ConnectMgr *cm) mRemoteDisc = true; mLocalDisc = false; + lastSentHeartbeatTime = 0; //add own version to versions map versions[mAuthMgr->OwnId()] = RsUtil::retroshareVersion(); @@ -106,6 +107,19 @@ static int count = 0; idServers(); } #endif + //send a heartbeat to all connected peers + if (time(NULL) - lastSentHeartbeatTime > HEARTBEAT_REPEAT_TIME) { + #ifdef P3DISC_DEBUG + std::cerr << "p3disc::tick() sending heartbeat to all peers" << std::endl; + #endif + lastSentHeartbeatTime = time(NULL); + std::list peers; + mConnMgr->getOnlineList(peers); + for (std::list::const_iterator pit = peers.begin(); pit != peers.end(); ++pit) { + sendHeartbeat(*pit); + } + } + return handleIncoming(); } @@ -154,34 +168,13 @@ int p3disc::handleIncoming() RsDiscReply *dri = NULL; RsDiscIssuer *dii = NULL; RsDiscVersion *dvi = NULL; + RsDiscHeartbeat *dta = NULL; -#ifdef TO_REMOVE - if (NULL == (di = dynamic_cast (item))) - { - -#ifdef P3DISC_DEBUG - std::ostringstream out; - out << "p3disc::handleIncoming()"; - out << "Deleting Non RsDiscItem Msg" << std::endl; - item -> print(out); - - std::cerr << out.str() << std::endl; -#endif - - // delete and continue to next loop. - delete item; - - continue; - } -#endif { #ifdef P3DISC_DEBUG - std::ostringstream out; - out << "p3disc::handleIncoming()"; - out << " Received Message!" << std::endl; - item -> print(out); - - std::cerr << out.str() << std::endl; + std::cerr << "p3disc::handleIncoming() Received Message!" << std::endl; + item -> print(std::cerr); + std::cerr << std::endl; #endif } @@ -211,6 +204,11 @@ int p3disc::handleIncoming() recvPeerOwnMsg(dio); nhandled++; } + else if (NULL != (dta = dynamic_cast (item))) + { + recvHeartbeatMsg(dta); + return 1; + } delete item; } return nhandled; @@ -618,6 +616,28 @@ void p3disc::sendOwnVersion(std::string to) #endif } +void p3disc::sendHeartbeat(std::string to) +{ + { +#ifdef P3DISC_DEBUG + std::ostringstream out; + out << "p3disc::sendHeartbeat()"; + out << " Sending tick to : " << to << std::endl; + std::cerr << out.str() << std::endl; +#endif + } + + RsDiscHeartbeat *di = new RsDiscHeartbeat(); + di->PeerId(to); + + /* send the message */ + sendItem(di); + +#ifdef P3DISC_DEBUG + std::cerr << "Sent tick Message" << std::endl; +#endif +} + /*************************************************************************************/ /* Input Network Msgs */ /*************************************************************************************/ @@ -777,14 +797,26 @@ void p3disc::recvPeerIssuerMsg(RsDiscIssuer *item) void p3disc::recvPeerVersionMsg(RsDiscVersion *item) { #ifdef P3DISC_DEBUG - std::cerr << "p3disc::recvPeerVersionMsg() From: " << item->PeerId(); - std::cerr << std::endl; + std::cerr << "p3disc::recvPeerVersionMsg() From: " << item->PeerId(); + std::cerr << std::endl; #endif - // dont need protection - versions[item->PeerId()] = item->version; + // dont need protection + versions[item->PeerId()] = item->version; - return; + return; +} + +void p3disc::recvHeartbeatMsg(RsDiscHeartbeat *item) +{ +#ifdef P3DISC_DEBUG + std::cerr << "p3disc::recvHeartbeatMsg() From: " << item->PeerId(); + std::cerr << std::endl; +#endif + + mPqiPersonGrp->getPeer(item->PeerId())->receiveHeartbeat(); + + return; } /*************************************************************************************/ diff --git a/libretroshare/src/services/p3disc.h b/libretroshare/src/services/p3disc.h index 509a40ef3..6122e944e 100644 --- a/libretroshare/src/services/p3disc.h +++ b/libretroshare/src/services/p3disc.h @@ -35,6 +35,7 @@ #include "pqi/pqinetwork.h" #include "pqi/pqi.h" +#include "pqi/pqipersongrp.h" class p3ConnectMgr; class p3AuthMgr; @@ -80,7 +81,7 @@ class p3disc: public p3Service, public pqiMonitor public: - p3disc(p3AuthMgr *am, p3ConnectMgr *cm); + p3disc(p3AuthMgr *am, p3ConnectMgr *cm, pqipersongrp *persGrp); /************* from pqiMonitor *******************/ virtual void statusChange(const std::list &plist); @@ -102,6 +103,7 @@ void sendOwnDetails(std::string to); void sendOwnVersion(std::string to); void sendPeerDetails(std::string to, std::string about); void sendPeerIssuer(std::string to, std::string about); +void sendHeartbeat(std::string to); /* Network Input */ int handleIncoming(); @@ -109,6 +111,7 @@ void recvPeerOwnMsg(RsDiscOwnItem *item); void recvPeerFriendMsg(RsDiscReply *item); void recvPeerIssuerMsg(RsDiscIssuer *item); void recvPeerVersionMsg(RsDiscVersion *item); +void recvHeartbeatMsg(RsDiscHeartbeat *item); /* handle network shape */ int addDiscoveryData(std::string fromId, std::string aboutId, @@ -122,7 +125,8 @@ int idServers(); p3AuthMgr *mAuthMgr; p3ConnectMgr *mConnMgr; - + pqipersongrp *mPqiPersonGrp; + time_t lastSentHeartbeatTime; /* data */ RsMutex mDiscMtx;