Add heartbeat (an empty packet, wich is 100 bytes long) in p3disc every 4 seconds

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@1929 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
joss17 2009-12-22 10:41:19 +00:00
parent e6c8e57927
commit 6b9392fcae
7 changed files with 249 additions and 38 deletions

View File

@ -26,6 +26,7 @@
#include "pqi/pqi.h" #include "pqi/pqi.h"
#include "pqi/pqiperson.h" #include "pqi/pqiperson.h"
#include "pqi/pqipersongrp.h" #include "pqi/pqipersongrp.h"
#include "services/p3disc.h"
const int pqipersonzone = 82371; const int pqipersonzone = 82371;
#include "util/rsdebug.h" #include "util/rsdebug.h"
@ -100,10 +101,24 @@ int pqiperson::status()
return -1; return -1;
} }
int pqiperson::receiveHeartbeat()
{
pqioutput(PQL_DEBUG_ALERT, pqipersonzone, "pqiperson::receiveHeartbeat() from peer : " + PeerId());
lastHeartbeatReceived = time(NULL);
}
// tick...... // tick......
int pqiperson::tick() int pqiperson::tick()
{ {
int activeTick = 0; //if lastHeartbeatReceived is 0, it might be not activated so don't do a net reset.
if (active &&
lastHeartbeatReceived != 0 &&
(time(NULL) - lastHeartbeatReceived) > HEARTBEAT_REPEAT_TIME * 3) {
pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::tick() No heartbeat from the peer, assume connection is dead.");
this->reset();
}
int activeTick = 0;
{ {
std::ostringstream out; std::ostringstream out;
@ -218,6 +233,7 @@ int pqiperson::notifyEvent(NetInterface *ni, int newState)
"CONNECT_SUCCESS->marking so! (resetting others)"); "CONNECT_SUCCESS->marking so! (resetting others)");
// mark as active. // mark as active.
active = true; active = true;
lastHeartbeatReceived = 0;
activepqi = pqi; activepqi = pqi;
inConnectAttempt = false; inConnectAttempt = false;
@ -297,6 +313,7 @@ int pqiperson::reset()
activepqi = NULL; activepqi = NULL;
active = false; active = false;
lastHeartbeatReceived = 0;
return 1; return 1;
} }

View File

@ -41,6 +41,8 @@ static const int CONNECT_UNREACHABLE = 3;
static const int CONNECT_FIREWALLED = 4; static const int CONNECT_FIREWALLED = 4;
static const int CONNECT_FAILED = 5; static const int CONNECT_FAILED = 5;
static const int HEARTBEAT_REPEAT_TIME = 4;
#include "pqi/pqistreamer.h" #include "pqi/pqistreamer.h"
class pqiconnect: public pqistreamer, public NetInterface class pqiconnect: public pqistreamer, public NetInterface
@ -105,7 +107,7 @@ int reset();
int listen(); int listen();
int stoplistening(); int stoplistening();
int connect(uint32_t type, struct sockaddr_in raddr, uint32_t delay, uint32_t period, uint32_t timeout); int connect(uint32_t type, struct sockaddr_in raddr, uint32_t delay, uint32_t period, uint32_t timeout);
int receiveHeartbeat();
// add in connection method. // add in connection method.
int addChildInterface(uint32_t type, pqiconnect *pqi); int addChildInterface(uint32_t type, pqiconnect *pqi);
@ -132,6 +134,7 @@ pqiconnect *getKid(uint32_t type);
pqiconnect *activepqi; pqiconnect *activepqi;
bool inConnectAttempt; bool inConnectAttempt;
int waittimes; int waittimes;
time_t lastHeartbeatReceived;//use to track connection failure
private: /* Helper functions */ private: /* Helper functions */

View File

@ -1973,7 +1973,7 @@ int RsServer::StartupRetroShare()
mGeneralConfig = new p3GeneralConfig(); mGeneralConfig = new p3GeneralConfig();
/* create Services */ /* create Services */
ad = new p3disc(mAuthMgr, mConnMgr); ad = new p3disc(mAuthMgr, mConnMgr, pqih);
msgSrv = new p3MsgService(mConnMgr); msgSrv = new p3MsgService(mConnMgr);
chatSrv = new p3ChatService(mConnMgr); chatSrv = new p3ChatService(mConnMgr);

View File

@ -49,6 +49,7 @@ uint32_t RsDiscSerialiser::size(RsItem *i)
RsDiscReply *rdr; RsDiscReply *rdr;
RsDiscIssuer *rds; RsDiscIssuer *rds;
RsDiscVersion *rdv; RsDiscVersion *rdv;
RsDiscHeartbeat *rdt;
/* do reply first - as it is derived from Item */ /* do reply first - as it is derived from Item */
if (NULL != (rdr = dynamic_cast<RsDiscReply *>(i))) if (NULL != (rdr = dynamic_cast<RsDiscReply *>(i)))
@ -67,6 +68,10 @@ uint32_t RsDiscSerialiser::size(RsItem *i)
{ {
return sizeVersion(rdv); return sizeVersion(rdv);
} }
else if (NULL != (rdt = dynamic_cast<RsDiscHeartbeat *>(i)))
{
return sizeHeartbeat(rdt);
}
return 0; return 0;
} }
@ -78,6 +83,7 @@ bool RsDiscSerialiser::serialise(RsItem *i, void *data, uint32_t *pktsize)
RsDiscReply *rdr; RsDiscReply *rdr;
RsDiscIssuer *rds; RsDiscIssuer *rds;
RsDiscVersion *rdv; RsDiscVersion *rdv;
RsDiscHeartbeat *rdt;
/* do reply first - as it is derived from Item */ /* do reply first - as it is derived from Item */
if (NULL != (rdr = dynamic_cast<RsDiscReply *>(i))) if (NULL != (rdr = dynamic_cast<RsDiscReply *>(i)))
@ -96,6 +102,10 @@ bool RsDiscSerialiser::serialise(RsItem *i, void *data, uint32_t *pktsize)
{ {
return serialiseVersion(rdv, data, pktsize); return serialiseVersion(rdv, data, pktsize);
} }
else if (NULL != (rdt = dynamic_cast<RsDiscHeartbeat *>(i)))
{
return serialiseHeartbeat(rdt, data, pktsize);
}
return false; return false;
} }
@ -126,7 +136,10 @@ RsItem *RsDiscSerialiser::deserialise(void *data, uint32_t *pktsize)
case RS_PKT_SUBTYPE_DISC_VERSION: case RS_PKT_SUBTYPE_DISC_VERSION:
return deserialiseVersion(data, pktsize); return deserialiseVersion(data, pktsize);
break; break;
default: case RS_PKT_SUBTYPE_DISC_HEARTBEAT:
return deserialiseHeartbeat(data, pktsize);
break;
default:
return NULL; return NULL;
break; break;
} }
@ -841,4 +854,129 @@ RsDiscVersion *RsDiscSerialiser::deserialiseVersion(void *data, uint32_t *pktsiz
} }
/*************************************************************************/
RsDiscHeartbeat::~RsDiscHeartbeat()
{
return;
}
void RsDiscHeartbeat::clear()
{
}
std::ostream &RsDiscHeartbeat::print(std::ostream &out, uint16_t indent)
{
printRsItemBase(out, "RsDiscHeartbeat", indent);
uint16_t int_Indent = indent + 2;
printRsItemEnd(out, "RsDiscHeartbeat", indent);
return out;
}
uint32_t RsDiscSerialiser::sizeHeartbeat(RsDiscHeartbeat *item)
{
uint32_t s = 8; /* header */
return s;
}
/* serialise the data to the buffer */
bool RsDiscSerialiser::serialiseHeartbeat(RsDiscHeartbeat *item, void *data, uint32_t *pktsize)
{
uint32_t tlvsize = sizeHeartbeat(item);
uint32_t offset = 0;
if (*pktsize < tlvsize)
return false; /* not enough space */
*pktsize = tlvsize;
bool ok = true;
ok &= setRsItemHeader(data, *pktsize, item->PacketId(), *pktsize);
#ifdef RSSERIAL_DEBUG
std::cerr << "RsDiscSerialiser::serialiseHeartbeat() Header: " << ok << std::endl;
std::cerr << "RsDiscSerialiser::serialiseHeartbeat() Size: " << tlvsize << std::endl;
#endif
/* skip the header */
offset += 8;
if (offset != tlvsize)
{
ok = false;
#ifdef RSSERIAL_DEBUG
std::cerr << "RsDiscSerialiser::serialiseHeartbeat() Size Error! " << std::endl;
std::cerr << "Offset: " << offset << " tlvsize: " << tlvsize << std::endl;
#endif
}
return ok;
}
RsDiscHeartbeat *RsDiscSerialiser::deserialiseHeartbeat(void *data, uint32_t *pktsize)
{
/* get the type and size */
uint32_t rstype = getRsItemId(data);
uint32_t rssize = getRsItemSize(data);
uint32_t offset = 0;
if ((RS_PKT_VERSION_SERVICE != getRsItemVersion(rstype)) ||
(RS_SERVICE_TYPE_DISC != getRsItemService(rstype)) ||
(RS_PKT_SUBTYPE_DISC_HEARTBEAT != getRsItemSubType(rstype)))
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsDiscSerialiser::deserialiseHeartbeat() Wrong Type" << std::endl;
#endif
return NULL; /* wrong type */
}
if (*pktsize < rssize) /* check size */
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsDiscSerialiser::deserialiseHeartbeat() pktsize != rssize" << std::endl;
std::cerr << "Pktsize: " << *pktsize << " Rssize: " << rssize << std::endl;
#endif
return NULL; /* not enough data */
}
/* set the packet length */
*pktsize = rssize;
bool ok = true;
/* ready to load */
RsDiscHeartbeat *item = new RsDiscHeartbeat();
item->clear();
/* skip the header */
offset += 8;
if (offset != rssize)
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsDiscSerialiser::deserialiseHeartbeat() offset != rssize" << std::endl;
std::cerr << "Offset: " << offset << " Rssize: " << rssize << std::endl;
#endif
/* error */
delete item;
return NULL;
}
if (!ok)
{
#ifdef RSSERIAL_DEBUG
std::cerr << "RsDiscSerialiser::deserialiseHeartbeat() ok = false" << std::endl;
#endif
delete item;
return NULL;
}
return item;
}
/*************************************************************************/ /*************************************************************************/

View File

@ -38,6 +38,7 @@ const uint8_t RS_PKT_SUBTYPE_DISC_OWN = 0x01;
const uint8_t RS_PKT_SUBTYPE_DISC_REPLY = 0x02; const uint8_t RS_PKT_SUBTYPE_DISC_REPLY = 0x02;
const uint8_t RS_PKT_SUBTYPE_DISC_ISSUER = 0x03; const uint8_t RS_PKT_SUBTYPE_DISC_ISSUER = 0x03;
const uint8_t RS_PKT_SUBTYPE_DISC_VERSION = 0x05; const uint8_t RS_PKT_SUBTYPE_DISC_VERSION = 0x05;
const uint8_t RS_PKT_SUBTYPE_DISC_HEARTBEAT = 0x06;
class RsDiscItem: public RsItem class RsDiscItem: public RsItem
{ {
@ -131,6 +132,18 @@ public:
std::string version; std::string version;
}; };
class RsDiscHeartbeat: public RsDiscItem
{
public:
RsDiscHeartbeat() :RsDiscItem(RS_PKT_SUBTYPE_DISC_HEARTBEAT)
{ return; }
virtual ~RsDiscHeartbeat();
virtual void clear();
virtual std::ostream &print(std::ostream &out, uint16_t indent = 0);
};
class RsDiscSerialiser: public RsSerialType class RsDiscSerialiser: public RsSerialType
{ {
public: public:
@ -162,6 +175,10 @@ virtual uint32_t sizeVersion(RsDiscVersion *);
virtual bool serialiseVersion(RsDiscVersion *item, void *data, uint32_t *size); virtual bool serialiseVersion(RsDiscVersion *item, void *data, uint32_t *size);
virtual RsDiscVersion *deserialiseVersion(void *data, uint32_t *size); virtual RsDiscVersion *deserialiseVersion(void *data, uint32_t *size);
virtual uint32_t sizeHeartbeat(RsDiscHeartbeat *);
virtual bool serialiseHeartbeat(RsDiscHeartbeat *item, void *data, uint32_t *size);
virtual RsDiscHeartbeat *deserialiseHeartbeat(void *data, uint32_t *size);
}; };

View File

@ -79,8 +79,8 @@ const uint32_t P3DISC_FLAGS_ASK_VERSION = 0x0080;
****************************************************************************************** ******************************************************************************************
*****************************************************************************************/ *****************************************************************************************/
p3disc::p3disc(p3AuthMgr *am, p3ConnectMgr *cm) p3disc::p3disc(p3AuthMgr *am, p3ConnectMgr *cm, pqipersongrp *pqih)
:p3Service(RS_SERVICE_TYPE_DISC), mAuthMgr(am), mConnMgr(cm) :p3Service(RS_SERVICE_TYPE_DISC), mAuthMgr(am), mConnMgr(cm), mPqiPersonGrp(pqih)
{ {
RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/
@ -88,6 +88,7 @@ p3disc::p3disc(p3AuthMgr *am, p3ConnectMgr *cm)
mRemoteDisc = true; mRemoteDisc = true;
mLocalDisc = false; mLocalDisc = false;
lastSentHeartbeatTime = 0;
//add own version to versions map //add own version to versions map
versions[mAuthMgr->OwnId()] = RsUtil::retroshareVersion(); versions[mAuthMgr->OwnId()] = RsUtil::retroshareVersion();
@ -106,6 +107,19 @@ static int count = 0;
idServers(); idServers();
} }
#endif #endif
//send a heartbeat to all connected peers
if (time(NULL) - lastSentHeartbeatTime > HEARTBEAT_REPEAT_TIME) {
#ifdef P3DISC_DEBUG
std::cerr << "p3disc::tick() sending heartbeat to all peers" << std::endl;
#endif
lastSentHeartbeatTime = time(NULL);
std::list <std::string> peers;
mConnMgr->getOnlineList(peers);
for (std::list<std::string>::const_iterator pit = peers.begin(); pit != peers.end(); ++pit) {
sendHeartbeat(*pit);
}
}
return handleIncoming(); return handleIncoming();
} }
@ -154,34 +168,13 @@ int p3disc::handleIncoming()
RsDiscReply *dri = NULL; RsDiscReply *dri = NULL;
RsDiscIssuer *dii = NULL; RsDiscIssuer *dii = NULL;
RsDiscVersion *dvi = NULL; RsDiscVersion *dvi = NULL;
RsDiscHeartbeat *dta = NULL;
#ifdef TO_REMOVE
if (NULL == (di = dynamic_cast<RsDiscItem *> (item)))
{
#ifdef P3DISC_DEBUG
std::ostringstream out;
out << "p3disc::handleIncoming()";
out << "Deleting Non RsDiscItem Msg" << std::endl;
item -> print(out);
std::cerr << out.str() << std::endl;
#endif
// delete and continue to next loop.
delete item;
continue;
}
#endif
{ {
#ifdef P3DISC_DEBUG #ifdef P3DISC_DEBUG
std::ostringstream out; std::cerr << "p3disc::handleIncoming() Received Message!" << std::endl;
out << "p3disc::handleIncoming()"; item -> print(std::cerr);
out << " Received Message!" << std::endl; std::cerr << std::endl;
item -> print(out);
std::cerr << out.str() << std::endl;
#endif #endif
} }
@ -211,6 +204,11 @@ int p3disc::handleIncoming()
recvPeerOwnMsg(dio); recvPeerOwnMsg(dio);
nhandled++; nhandled++;
} }
else if (NULL != (dta = dynamic_cast<RsDiscHeartbeat *> (item)))
{
recvHeartbeatMsg(dta);
return 1;
}
delete item; delete item;
} }
return nhandled; return nhandled;
@ -618,6 +616,28 @@ void p3disc::sendOwnVersion(std::string to)
#endif #endif
} }
void p3disc::sendHeartbeat(std::string to)
{
{
#ifdef P3DISC_DEBUG
std::ostringstream out;
out << "p3disc::sendHeartbeat()";
out << " Sending tick to : " << to << std::endl;
std::cerr << out.str() << std::endl;
#endif
}
RsDiscHeartbeat *di = new RsDiscHeartbeat();
di->PeerId(to);
/* send the message */
sendItem(di);
#ifdef P3DISC_DEBUG
std::cerr << "Sent tick Message" << std::endl;
#endif
}
/*************************************************************************************/ /*************************************************************************************/
/* Input Network Msgs */ /* Input Network Msgs */
/*************************************************************************************/ /*************************************************************************************/
@ -777,14 +797,26 @@ void p3disc::recvPeerIssuerMsg(RsDiscIssuer *item)
void p3disc::recvPeerVersionMsg(RsDiscVersion *item) void p3disc::recvPeerVersionMsg(RsDiscVersion *item)
{ {
#ifdef P3DISC_DEBUG #ifdef P3DISC_DEBUG
std::cerr << "p3disc::recvPeerVersionMsg() From: " << item->PeerId(); std::cerr << "p3disc::recvPeerVersionMsg() From: " << item->PeerId();
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
// dont need protection // dont need protection
versions[item->PeerId()] = item->version; versions[item->PeerId()] = item->version;
return; return;
}
void p3disc::recvHeartbeatMsg(RsDiscHeartbeat *item)
{
#ifdef P3DISC_DEBUG
std::cerr << "p3disc::recvHeartbeatMsg() From: " << item->PeerId();
std::cerr << std::endl;
#endif
mPqiPersonGrp->getPeer(item->PeerId())->receiveHeartbeat();
return;
} }
/*************************************************************************************/ /*************************************************************************************/

View File

@ -35,6 +35,7 @@
#include "pqi/pqinetwork.h" #include "pqi/pqinetwork.h"
#include "pqi/pqi.h" #include "pqi/pqi.h"
#include "pqi/pqipersongrp.h"
class p3ConnectMgr; class p3ConnectMgr;
class p3AuthMgr; class p3AuthMgr;
@ -80,7 +81,7 @@ class p3disc: public p3Service, public pqiMonitor
public: public:
p3disc(p3AuthMgr *am, p3ConnectMgr *cm); p3disc(p3AuthMgr *am, p3ConnectMgr *cm, pqipersongrp *persGrp);
/************* from pqiMonitor *******************/ /************* from pqiMonitor *******************/
virtual void statusChange(const std::list<pqipeer> &plist); virtual void statusChange(const std::list<pqipeer> &plist);
@ -102,6 +103,7 @@ void sendOwnDetails(std::string to);
void sendOwnVersion(std::string to); void sendOwnVersion(std::string to);
void sendPeerDetails(std::string to, std::string about); void sendPeerDetails(std::string to, std::string about);
void sendPeerIssuer(std::string to, std::string about); void sendPeerIssuer(std::string to, std::string about);
void sendHeartbeat(std::string to);
/* Network Input */ /* Network Input */
int handleIncoming(); int handleIncoming();
@ -109,6 +111,7 @@ void recvPeerOwnMsg(RsDiscOwnItem *item);
void recvPeerFriendMsg(RsDiscReply *item); void recvPeerFriendMsg(RsDiscReply *item);
void recvPeerIssuerMsg(RsDiscIssuer *item); void recvPeerIssuerMsg(RsDiscIssuer *item);
void recvPeerVersionMsg(RsDiscVersion *item); void recvPeerVersionMsg(RsDiscVersion *item);
void recvHeartbeatMsg(RsDiscHeartbeat *item);
/* handle network shape */ /* handle network shape */
int addDiscoveryData(std::string fromId, std::string aboutId, int addDiscoveryData(std::string fromId, std::string aboutId,
@ -122,7 +125,8 @@ int idServers();
p3AuthMgr *mAuthMgr; p3AuthMgr *mAuthMgr;
p3ConnectMgr *mConnMgr; p3ConnectMgr *mConnMgr;
pqipersongrp *mPqiPersonGrp;
time_t lastSentHeartbeatTime;
/* data */ /* data */
RsMutex mDiscMtx; RsMutex mDiscMtx;