diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index 72a078075..48b12caa5 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -274,7 +274,7 @@ NXS_NET_DEBUG_8 gxs distant sync ***/ -//#define NXS_NET_DEBUG_0 1 +#define NXS_NET_DEBUG_0 1 //#define NXS_NET_DEBUG_1 1 //#define NXS_NET_DEBUG_2 1 //#define NXS_NET_DEBUG_3 1 @@ -306,6 +306,7 @@ static const uint32_t GROUP_STATS_UPDATE_DELAY = 240; // static const uint32_t GROUP_STATS_UPDATE_NB_PEERS = 2; // number of peers to which the group stats are asked static const uint32_t MAX_ALLOWED_GXS_MESSAGE_SIZE = 199000; // 200,000 bytes including signature and headers static const uint32_t MIN_DELAY_BETWEEN_GROUP_SEARCH = 40; // dont search same group more than every 40 secs. +static const uint32_t SAFETY_DELAY_FOR_UNSUCCESSFUL_UPDATE = 1800; // avoid re-sending the same msg list to a peer who asks twice for the same update in less than this time static const uint32_t RS_NXS_ITEM_ENCRYPTION_STATUS_UNKNOWN = 0x00 ; static const uint32_t RS_NXS_ITEM_ENCRYPTION_STATUS_NO_ERROR = 0x01 ; @@ -3993,7 +3994,7 @@ bool RsGxsNetService::locked_CanReceiveUpdate(const RsNxsSyncGrpReqItem *item) GXSNETDEBUG_P_(item->PeerId()) << " local modification time stamp: " << std::dec<< time(NULL) - mGrpServerUpdate.grpUpdateTS << " secs ago. Update sent: " << ((item->updateTS < mGrpServerUpdate.grpUpdateTS)?"YES":"NO") << std::endl; #endif - return item->updateTS < mGrpServerUpdate.grpUpdateTS; + return item->updateTS < mGrpServerUpdate.grpUpdateTS && locked_checkResendingOfUpdates(item->PeerId(),RsGxsGroupId(),item->updateTS,mGrpServerUpdate.grpUpdateTsRecords[item->PeerId()]) ; } void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrpReqItem *item) @@ -4238,10 +4239,33 @@ bool RsGxsNetService::checkCanRecvMsgFromPeer(const RsPeerId& sslId, const RsGxs return true; } +bool RsGxsNetService::locked_checkResendingOfUpdates(const RsPeerId& pid,const RsGxsGroupId& grpId,rstime_t incoming_ts,RsPeerUpdateTsRecord& rec) +{ + rstime_t now = time(NULL); + + // Now we check if the peer is sending the same outdated TS for the same time in a short while. This would mean the peer + // hasn't finished processing the updates we're sending and we shouldn't send new data anymore. Of course the peer might + // have disconnected or so, which means that we need to be careful about not sending. As a compromise we still send, but + // after waiting for a while (See + + if(rec.mLastTsReceived == incoming_ts && rec.mTs + SAFETY_DELAY_FOR_UNSUCCESSFUL_UPDATE > now) + { +#ifdef NXS_NET_DEBUG_0 + GXSNETDEBUG_PG(pid,grpId) << "(II) peer " << pid << " already sent the same TS " << (long int)now-(long int)rec.mTs << " secs ago for that group ID. Will not send msg list again for a while to prevent clogging..." << std::endl; +#endif + return false; + } + + rec.mLastTsReceived = incoming_ts; + rec.mTs = now; + + return true; +} + bool RsGxsNetService::locked_CanReceiveUpdate(RsNxsSyncMsgReqItem *item,bool& grp_is_known) { // Do we have new updates for this peer? - // Here we compare times in the same clock: the friend's clock, so it should be fine. + // Here we compare times in the same clock: our own clock, so it should be fine. grp_is_known = false ; @@ -4250,7 +4274,7 @@ bool RsGxsNetService::locked_CanReceiveUpdate(RsNxsSyncMsgReqItem *item,bool& gr // Item contains the hashed group ID in order to protect is from friends who don't know it. So we de-hash it using bruteforce over known group IDs for this peer. // We could save the de-hash result. But the cost is quite light, since the number of encrypted groups per service is usually low. - for(ServerMsgMap::const_iterator it(mServerMsgUpdateMap.begin());it!=mServerMsgUpdateMap.end();++it) + for(ServerMsgMap::iterator it(mServerMsgUpdateMap.begin());it!=mServerMsgUpdateMap.end();++it) if(item->grpId == hashGrpId(it->first,item->PeerId())) { item->grpId = it->first ; @@ -4260,20 +4284,25 @@ bool RsGxsNetService::locked_CanReceiveUpdate(RsNxsSyncMsgReqItem *item,bool& gr #endif grp_is_known = true ; - return item->updateTS < it->second.msgUpdateTS ; + // The order of tests below is important because we want to only modify the map of requests records if the request actually is a valid requests instead of + // a simple check that nothing's changed. + + return item->updateTS < it->second.msgUpdateTS && locked_checkResendingOfUpdates(item->PeerId(),item->grpId,item->updateTS,it->second.msgUpdateTsRecords[item->PeerId()]) ; } return false ; } - ServerMsgMap::const_iterator cit = mServerMsgUpdateMap.find(item->grpId); + ServerMsgMap::iterator cit = mServerMsgUpdateMap.find(item->grpId); + if(cit != mServerMsgUpdateMap.end()) { #ifdef NXS_NET_DEBUG_0 GXSNETDEBUG_PG(item->PeerId(),item->grpId) << " local time stamp: " << std::dec<< time(NULL) - cit->second.msgUpdateTS << " secs ago. Update sent: " << (item->updateTS < cit->second.msgUpdateTS) << std::endl; #endif grp_is_known = true ; - return item->updateTS < cit->second.msgUpdateTS ; + + return item->updateTS < cit->second.msgUpdateTS && locked_checkResendingOfUpdates(item->PeerId(),item->grpId,item->updateTS,cit->second.msgUpdateTsRecords[item->PeerId()]) ; } #ifdef NXS_NET_DEBUG_0 @@ -4294,6 +4323,9 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item,bool item_ bool grp_is_known = false; bool was_circle_protected = item_was_encrypted || bool(item->flag & RsNxsSyncMsgReqItem::FLAG_USE_HASHED_GROUP_ID); + // This call determines if the peer can receive updates from us, meaning that our last TS is larger than what the peer sent. + // It also changes the items' group id into the un-hashed group ID if the group is a distant group. + bool peer_can_receive_update = locked_CanReceiveUpdate(item, grp_is_known); if(item_was_encrypted) @@ -4309,7 +4341,7 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item,bool item_ // We update suppliers in two cases: // Case 1: the grp is known because it is the hash of an existing group, but it's not yet in the server config map - // Case 2: the gtp is not known, possibly because it was deleted, but there's an entry in mServerGrpConfigMap due to statistics gathering. Still, statistics are only + // Case 2: the grp is not known, possibly because it was deleted, but there's an entry in mServerGrpConfigMap due to statistics gathering. Still, statistics are only // gathered from known suppliers. So statistics never add new suppliers. These are only added here. if(grp_is_known || mServerGrpConfigMap.find(item->grpId)!=mServerGrpConfigMap.end()) diff --git a/libretroshare/src/gxs/rsgxsnetservice.h b/libretroshare/src/gxs/rsgxsnetservice.h index e4056bb06..f7abdc176 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.h +++ b/libretroshare/src/gxs/rsgxsnetservice.h @@ -439,6 +439,7 @@ private: bool locked_CanReceiveUpdate(const RsNxsSyncGrpReqItem *item); bool locked_CanReceiveUpdate(RsNxsSyncMsgReqItem *item, bool &grp_is_known); void locked_resetClientTS(const RsGxsGroupId& grpId); + bool locked_checkResendingOfUpdates(const RsPeerId& pid, const RsGxsGroupId &grpId, rstime_t incoming_ts, RsPeerUpdateTsRecord& rec); static RsGxsGroupId hashGrpId(const RsGxsGroupId& gid,const RsPeerId& pid) ; diff --git a/libretroshare/src/rsitems/rsgxsupdateitems.h b/libretroshare/src/rsitems/rsgxsupdateitems.h index 9e7a194ac..03bd21800 100644 --- a/libretroshare/src/rsitems/rsgxsupdateitems.h +++ b/libretroshare/src/rsitems/rsgxsupdateitems.h @@ -106,12 +106,22 @@ public: RsPeerId peerID; }; +struct RsPeerUpdateTsRecord +{ + RsPeerUpdateTsRecord() : mLastTsReceived(0), mTs(0) {} + + rstime_t mLastTsReceived; // last TS that was sent for this group by this peer ID. + rstime_t mTs; // time at which this TS was sent. +}; + class RsGxsServerGrpUpdate { public: RsGxsServerGrpUpdate() { grpUpdateTS = 0 ; } uint32_t grpUpdateTS; + + std::map grpUpdateTsRecords; }; class RsGxsServerGrpUpdateItem : public RsGxsNetServiceItem, public RsGxsServerGrpUpdate @@ -168,7 +178,13 @@ class RsGxsServerMsgUpdate public: RsGxsServerMsgUpdate() { msgUpdateTS = 0 ;} - uint32_t msgUpdateTS; // local time stamp this group last received a new msg + uint32_t msgUpdateTS; // local time stamp at which this group last received a new msg + + // Now we also store for each peer the last own TS the peer sent and when it did so. This allows to detect when transactions are stuck because of + // outqueues clogging. If that happens, we receive multiple times the same TS from the friend, in which case we do not send the list of msgs + // again until a significant amount of time has passed. These values are obviously initialized to 0. + + std::map msgUpdateTsRecords; }; class RsGxsServerMsgUpdateItem : public RsGxsNetServiceItem, public RsGxsServerMsgUpdate