Merge pull request #2024 from csoler/v0.6-GXS-Optim

added anti-clogging strategy in GXS. To be tested.
This commit is contained in:
csoler 2020-07-03 21:20:05 +02:00 committed by GitHub
commit d805b18578
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 9 deletions

View File

@ -274,7 +274,7 @@
NXS_NET_DEBUG_8 gxs distant sync
***/
//#define NXS_NET_DEBUG_0 1
#define NXS_NET_DEBUG_0 1
//#define NXS_NET_DEBUG_1 1
//#define NXS_NET_DEBUG_2 1
//#define NXS_NET_DEBUG_3 1
@ -306,6 +306,7 @@ static const uint32_t GROUP_STATS_UPDATE_DELAY = 240; //
static const uint32_t GROUP_STATS_UPDATE_NB_PEERS = 2; // number of peers to which the group stats are asked
static const uint32_t MAX_ALLOWED_GXS_MESSAGE_SIZE = 199000; // 200,000 bytes including signature and headers
static const uint32_t MIN_DELAY_BETWEEN_GROUP_SEARCH = 40; // dont search same group more than every 40 secs.
static const uint32_t SAFETY_DELAY_FOR_UNSUCCESSFUL_UPDATE = 1800; // avoid re-sending the same msg list to a peer who asks twice for the same update in less than this time
static const uint32_t RS_NXS_ITEM_ENCRYPTION_STATUS_UNKNOWN = 0x00 ;
static const uint32_t RS_NXS_ITEM_ENCRYPTION_STATUS_NO_ERROR = 0x01 ;
@ -3993,7 +3994,7 @@ bool RsGxsNetService::locked_CanReceiveUpdate(const RsNxsSyncGrpReqItem *item)
GXSNETDEBUG_P_(item->PeerId()) << " local modification time stamp: " << std::dec<< time(NULL) - mGrpServerUpdate.grpUpdateTS << " secs ago. Update sent: " <<
((item->updateTS < mGrpServerUpdate.grpUpdateTS)?"YES":"NO") << std::endl;
#endif
return item->updateTS < mGrpServerUpdate.grpUpdateTS;
return item->updateTS < mGrpServerUpdate.grpUpdateTS && locked_checkResendingOfUpdates(item->PeerId(),RsGxsGroupId(),item->updateTS,mGrpServerUpdate.grpUpdateTsRecords[item->PeerId()]) ;
}
void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrpReqItem *item)
@ -4238,10 +4239,33 @@ bool RsGxsNetService::checkCanRecvMsgFromPeer(const RsPeerId& sslId, const RsGxs
return true;
}
bool RsGxsNetService::locked_checkResendingOfUpdates(const RsPeerId& pid,const RsGxsGroupId& grpId,rstime_t incoming_ts,RsPeerUpdateTsRecord& rec)
{
rstime_t now = time(NULL);
// Now we check if the peer is sending the same outdated TS for the same time in a short while. This would mean the peer
// hasn't finished processing the updates we're sending and we shouldn't send new data anymore. Of course the peer might
// have disconnected or so, which means that we need to be careful about not sending. As a compromise we still send, but
// after waiting for a while (See
if(rec.mLastTsReceived == incoming_ts && rec.mTs + SAFETY_DELAY_FOR_UNSUCCESSFUL_UPDATE > now)
{
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_PG(pid,grpId) << "(II) peer " << pid << " already sent the same TS " << (long int)now-(long int)rec.mTs << " secs ago for that group ID. Will not send msg list again for a while to prevent clogging..." << std::endl;
#endif
return false;
}
rec.mLastTsReceived = incoming_ts;
rec.mTs = now;
return true;
}
bool RsGxsNetService::locked_CanReceiveUpdate(RsNxsSyncMsgReqItem *item,bool& grp_is_known)
{
// Do we have new updates for this peer?
// Here we compare times in the same clock: the friend's clock, so it should be fine.
// Here we compare times in the same clock: our own clock, so it should be fine.
grp_is_known = false ;
@ -4250,7 +4274,7 @@ bool RsGxsNetService::locked_CanReceiveUpdate(RsNxsSyncMsgReqItem *item,bool& gr
// Item contains the hashed group ID in order to protect is from friends who don't know it. So we de-hash it using bruteforce over known group IDs for this peer.
// We could save the de-hash result. But the cost is quite light, since the number of encrypted groups per service is usually low.
for(ServerMsgMap::const_iterator it(mServerMsgUpdateMap.begin());it!=mServerMsgUpdateMap.end();++it)
for(ServerMsgMap::iterator it(mServerMsgUpdateMap.begin());it!=mServerMsgUpdateMap.end();++it)
if(item->grpId == hashGrpId(it->first,item->PeerId()))
{
item->grpId = it->first ;
@ -4260,20 +4284,25 @@ bool RsGxsNetService::locked_CanReceiveUpdate(RsNxsSyncMsgReqItem *item,bool& gr
#endif
grp_is_known = true ;
return item->updateTS < it->second.msgUpdateTS ;
// The order of tests below is important because we want to only modify the map of requests records if the request actually is a valid requests instead of
// a simple check that nothing's changed.
return item->updateTS < it->second.msgUpdateTS && locked_checkResendingOfUpdates(item->PeerId(),item->grpId,item->updateTS,it->second.msgUpdateTsRecords[item->PeerId()]) ;
}
return false ;
}
ServerMsgMap::const_iterator cit = mServerMsgUpdateMap.find(item->grpId);
ServerMsgMap::iterator cit = mServerMsgUpdateMap.find(item->grpId);
if(cit != mServerMsgUpdateMap.end())
{
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_PG(item->PeerId(),item->grpId) << " local time stamp: " << std::dec<< time(NULL) - cit->second.msgUpdateTS << " secs ago. Update sent: " << (item->updateTS < cit->second.msgUpdateTS) << std::endl;
#endif
grp_is_known = true ;
return item->updateTS < cit->second.msgUpdateTS ;
return item->updateTS < cit->second.msgUpdateTS && locked_checkResendingOfUpdates(item->PeerId(),item->grpId,item->updateTS,cit->second.msgUpdateTsRecords[item->PeerId()]) ;
}
#ifdef NXS_NET_DEBUG_0
@ -4294,6 +4323,9 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item,bool item_
bool grp_is_known = false;
bool was_circle_protected = item_was_encrypted || bool(item->flag & RsNxsSyncMsgReqItem::FLAG_USE_HASHED_GROUP_ID);
// This call determines if the peer can receive updates from us, meaning that our last TS is larger than what the peer sent.
// It also changes the items' group id into the un-hashed group ID if the group is a distant group.
bool peer_can_receive_update = locked_CanReceiveUpdate(item, grp_is_known);
if(item_was_encrypted)
@ -4309,7 +4341,7 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item,bool item_
// We update suppliers in two cases:
// Case 1: the grp is known because it is the hash of an existing group, but it's not yet in the server config map
// Case 2: the gtp is not known, possibly because it was deleted, but there's an entry in mServerGrpConfigMap due to statistics gathering. Still, statistics are only
// Case 2: the grp is not known, possibly because it was deleted, but there's an entry in mServerGrpConfigMap due to statistics gathering. Still, statistics are only
// gathered from known suppliers. So statistics never add new suppliers. These are only added here.
if(grp_is_known || mServerGrpConfigMap.find(item->grpId)!=mServerGrpConfigMap.end())

View File

@ -439,6 +439,7 @@ private:
bool locked_CanReceiveUpdate(const RsNxsSyncGrpReqItem *item);
bool locked_CanReceiveUpdate(RsNxsSyncMsgReqItem *item, bool &grp_is_known);
void locked_resetClientTS(const RsGxsGroupId& grpId);
bool locked_checkResendingOfUpdates(const RsPeerId& pid, const RsGxsGroupId &grpId, rstime_t incoming_ts, RsPeerUpdateTsRecord& rec);
static RsGxsGroupId hashGrpId(const RsGxsGroupId& gid,const RsPeerId& pid) ;

View File

@ -106,12 +106,22 @@ public:
RsPeerId peerID;
};
struct RsPeerUpdateTsRecord
{
RsPeerUpdateTsRecord() : mLastTsReceived(0), mTs(0) {}
rstime_t mLastTsReceived; // last TS that was sent for this group by this peer ID.
rstime_t mTs; // time at which this TS was sent.
};
class RsGxsServerGrpUpdate
{
public:
RsGxsServerGrpUpdate() { grpUpdateTS = 0 ; }
uint32_t grpUpdateTS;
std::map<RsPeerId,RsPeerUpdateTsRecord> grpUpdateTsRecords;
};
class RsGxsServerGrpUpdateItem : public RsGxsNetServiceItem, public RsGxsServerGrpUpdate
@ -168,7 +178,13 @@ class RsGxsServerMsgUpdate
public:
RsGxsServerMsgUpdate() { msgUpdateTS = 0 ;}
uint32_t msgUpdateTS; // local time stamp this group last received a new msg
uint32_t msgUpdateTS; // local time stamp at which this group last received a new msg
// Now we also store for each peer the last own TS the peer sent and when it did so. This allows to detect when transactions are stuck because of
// outqueues clogging. If that happens, we receive multiple times the same TS from the friend, in which case we do not send the list of msgs
// again until a significant amount of time has passed. These values are obviously initialized to 0.
std::map<RsPeerId, RsPeerUpdateTsRecord> msgUpdateTsRecords;
};
class RsGxsServerMsgUpdateItem : public RsGxsNetServiceItem, public RsGxsServerMsgUpdate