added a few methods to improve consistency between client and server sides, and force update of groups in some cases (such as database erased)

This commit is contained in:
csoler 2015-12-13 18:22:04 -05:00
parent ed7a261001
commit f702c942ed
2 changed files with 85 additions and 7 deletions

View File

@ -223,16 +223,18 @@
// A small SYNC_PERIOD fasten message propagation, but is likely to overload the server side of transactions (e.g. overload outqueues).
//
#define SYNC_PERIOD 60
#define MAX_REQLIST_SIZE 20 // No more than 20 items per msg request list => creates smaller transactions that are less likely to be cancelled.
#define TRANSAC_TIMEOUT 2000 // In seconds. Has been increased to avoid epidemic transaction cancelling due to overloaded outqueues.
#define MAX_REQLIST_SIZE 20 // No more than 20 items per msg request list => creates smaller transactions that are less likely to be cancelled.
#define TRANSAC_TIMEOUT 2000 // In seconds. Has been increased to avoid epidemic transaction cancelling due to overloaded outqueues.
#define SECURITY_DELAY_TO_FORCE_CLIENT_REUPDATE 3600 // force re-update if there happens to be a large delay between our server side TS and the client side TS of friends
// Debug system to allow to print only for some IDs (group, Peer, etc)
#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4)
//static const RsPeerId peer_to_print = RsPeerId(std::string("6718ae182d97c23af203959e678b98ac")) ; // use this to limit print to this peer id only, or "" for all IDs
static const RsPeerId peer_to_print = RsPeerId(std::string("")) ; // use this to limit print to this peer id only, or "" for all IDs
static const RsGxsGroupId group_id_to_print = RsGxsGroupId(std::string("")) ; // use this to allow to this group id only, or "" for all IDs
static const uint32_t service_to_print = 0x0215 ; // use this to allow to this service id only, or 0 for all services
static const RsGxsGroupId group_id_to_print = RsGxsGroupId(std::string("8ad26b347697d7b443ef872ba3a8ea4b")) ; // use this to allow to this group id only, or "" for all IDs
static const uint32_t service_to_print = 0x0217 ; // use this to allow to this service id only, or 0 for all services
// warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h)
class nullstream: public std::ostream {};
@ -507,7 +509,9 @@ void RsGxsNetService::syncWithPeers()
// Synchronise group msg for groups which we're subscribed to
// For each peer and each group, we send to the peer the time stamp of the most
// recent message we have. If the peer has more recent messages he will send them.
// recent modification the peer has sent. If the peer has more recent messages he will send them, because its latest
// modifications will be more recent. This ensures that we always compare timestamps all taken in the same
// computer (the peer's computer in this case)
for(; sit != peers.end(); ++sit)
{
@ -1369,6 +1373,7 @@ void RsGxsNetService::data_tick()
if(mUpdateCounter >= 120) // 60 seconds
{
updateServerSyncTS();
updateClientSyncTS();
mUpdateCounter = 1;
}
else
@ -1409,7 +1414,7 @@ void RsGxsNetService::debugDump()
GXSNETDEBUG___<< "RsGxsNetService::debugDump():" << std::endl;
if(mGrpServerUpdateItem != NULL)
GXSNETDEBUG___<< " mGrpServerUpdateItem time stamp: " << now - mGrpServerUpdateItem->grpUpdateTS << " secs ago (is the last local modification time over all groups of this service)" << std::endl;
GXSNETDEBUG___<< " mGrpServerUpdateItem time stamp: " << nice_time_stamp(now , mGrpServerUpdateItem->grpUpdateTS) << " (is the last local modification time over all groups of this service)" << std::endl;
else
GXSNETDEBUG___<< " mGrpServerUpdateItem time stamp: not inited yet (is the last local modification time over all groups of this service)" << std::endl;
@ -1434,6 +1439,46 @@ void RsGxsNetService::debugDump()
}
}
// This method is normally not needed, but we use it to correct possible inconsistencies in the updte time stamps
// on the client side.
void RsGxsNetService::updateClientSyncTS()
{
RS_STACK_MUTEX(mNxsMutex) ;
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG___<< "updateClientSyncTS(): checking last modification time stamps of local data w.r.t. client's modification times" << std::endl;
#endif
time_t now = time(NULL) ;
if(mGrpServerUpdateItem == NULL)
mGrpServerUpdateItem = new RsGxsServerGrpUpdateItem(mServType);
for(ClientGrpMap::iterator it = mClientGrpUpdateMap.begin();it!=mClientGrpUpdateMap.end();++it)
if(it->second->grpUpdateTS > SECURITY_DELAY_TO_FORCE_CLIENT_REUPDATE + mGrpServerUpdateItem->grpUpdateTS)
{
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_P_(it->first) << " last global client GRP modification time known for peer (" << nice_time_stamp(now,it->second->grpUpdateTS) << " is quite more recent than our own server modification time (" << nice_time_stamp(now,mGrpServerUpdateItem->grpUpdateTS) << ". Forcing update! " << std::endl;
#endif
it->second->grpUpdateTS = 0 ;
}
for(ClientMsgMap::iterator it = mClientMsgUpdateMap.begin();it!=mClientMsgUpdateMap.end();++it)
for(std::map<RsGxsGroupId, RsGxsMsgUpdateItem::MsgUpdateInfo>::iterator it2 = it->second->msgUpdateInfos.begin();it2!=it->second->msgUpdateInfos.end();++it2)
{
std::map<RsGxsGroupId,RsGxsServerMsgUpdateItem*>::const_iterator mmit = mServerMsgUpdateMap.find(it2->first) ;
if(mmit != mServerMsgUpdateMap.end() && it2->second.time_stamp > SECURITY_DELAY_TO_FORCE_CLIENT_REUPDATE + mmit->second->msgUpdateTS)
{
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_PG(it->first,it2->first) << " last group msg modification time known for peer (" << nice_time_stamp(now,it2->second.time_stamp) << " and group " << it2->first << " is quite more recent than our own server modification time (" << nice_time_stamp(now,mmit->second->msgUpdateTS) << ". Forcing update! " << std::endl;
#endif
it2->second.time_stamp = 0 ;
}
}
}
void RsGxsNetService::updateServerSyncTS()
{
RS_STACK_MUTEX(mNxsMutex) ;
@ -1452,8 +1497,40 @@ void RsGxsNetService::updateServerSyncTS()
if(mGrpServerUpdateItem == NULL)
mGrpServerUpdateItem = new RsGxsServerGrpUpdateItem(mServType);
// First reset it. That's important because it will re-compute correct TS in case
// we have unsubscribed a group.
mGrpServerUpdateItem->grpUpdateTS = 0 ;
bool change = false;
// then remove from mServerMsgUpdateMap, all items that are not in the group list!
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG___ << " cleaning server map of groups with no data:" << std::endl;
#endif
for(std::map<RsGxsGroupId, RsGxsServerMsgUpdateItem*>::iterator it(mServerMsgUpdateMap.begin());it!=mServerMsgUpdateMap.end();)
if(gxsMap.find(it->first) == gxsMap.end())
{
// not found! Removing server update info for this group
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG__G(it->first) << " removing server update info for group " << it->first << std::endl;
#endif
std::map<RsGxsGroupId, RsGxsServerMsgUpdateItem*>::iterator tmp(it) ;
++tmp ;
mServerMsgUpdateMap.erase(it) ;
it = tmp ;
}
else
++it;
#ifdef NXS_NET_DEBUG_0
if(gxsMap.empty())
GXSNETDEBUG___<< " database seems to be empty. The modification timestamp will be reset." << std::endl;
#endif
// finally, update timestamps.
for(; mit != gxsMap.end(); ++mit)
{
const RsGxsGroupId& grpId = mit->first;
@ -1505,7 +1582,7 @@ void RsGxsNetService::updateServerSyncTS()
change = true;
}
}
// actual change in config settings, then save configuration
if(change)
IndicateConfigChanged();

View File

@ -368,6 +368,7 @@ private:
void locked_doMsgUpdateWork(const RsNxsTransac* nxsTrans, const RsGxsGroupId& grpId);
void updateServerSyncTS();
void updateClientSyncTS();
bool locked_CanReceiveUpdate(const RsNxsSyncGrp* item);
bool locked_CanReceiveUpdate(const RsNxsSyncMsg* item);