fixed a number of timing issues in rsgxsnetservice. To be tested.

This commit is contained in:
csoler 2015-12-11 22:38:17 -05:00
parent 66d6f053f5
commit 7be7233b29
2 changed files with 89 additions and 50 deletions

View File

@ -116,11 +116,12 @@
// Objects for time stamps
// =======================
//
// mClientGrpUpdateMap: map< RsPeerId, TimeStamp > Time stamp over all groups sent by that peer Id
// Updated in processCompletedIncomingTransaction() from Grp list trans.
// mClientGrpUpdateMap: map< RsPeerId, TimeStamp > Time stamp of last modification of group data for that peer (in peer's clock time!)
// (Set at server side to be mGrpServerUpdateItem->grpUpdateTS)
//
// Used in syncWithPeers() sending in RsNxsSyncGrp once to all peers.
// Set at server to be mGrpServerUpdateItem->grpUpdateTS
// Only updated in processCompletedIncomingTransaction() from Grp list transaction.
// Used in syncWithPeers() sending in RsNxsSyncGrp once to all peers: peer will send data if
// has something new. All time comparisons are in the friends' clock time.
//
// mClientMsgUpdateMap: map< RsPeerId, map<grpId,TimeStamp > >
//
@ -129,9 +130,9 @@
// Used in syncWithPeers() sending in RsNxsSyncGrp once to all peers.
// Set at server to be mServerMsgUpdateMap[grpId]->msgUpdateTS
//
// mGrpServerUpdateItem: TimeStamp Last group modification timestamp over all groups
// mGrpServerUpdateItem: TimeStamp Last group local modification timestamp over all groups
//
// mServerMsgUpdateMap: map< GrpId, TimeStamp > Timestamp modification for each group (time of most recent msg)
// mServerMsgUpdateMap: map< GrpId, TimeStamp > Timestamp local modification for each group (i.e. time of most recent msg / metadata update)
//
//
// Group update algorithm
@ -160,9 +161,9 @@
// Suggestions
// ===========
// * handleRecvSyncGroup should use mit->second.mLastPost to limit the sending of already known data
// * apparently mServerMsgUpdateMap is initially empty -> by default clients will always want to receive the data.
// X * apparently mServerMsgUpdateMap is initially empty -> by default clients will always want to receive the data.
// => new peers will always send data for each group until they get an update for that group.
// * check that there is a timestamp for unsubscribed items, otherwise we always send TS=0 and we always get them!! (in 346)
// X * check that there is a timestamp for unsubscribed items, otherwise we always send TS=0 and we always get them!! (in 346)
//
// -> there is not. mClientMsgUpdateMap is updated when msgs are received.
// -> 1842: leaves before asking for msg content.
@ -177,21 +178,18 @@
// * the last TS method is not perfect: do new peers always receive old messages?
//
// * there's double information between mServerMsgUpdateMap first element (groupId) and second->grpId
// * processExplicitGroupRequests() seems to send the group list that it was
// asked for without further information. How is that useful???
// * processExplicitGroupRequests() seems to send the group list that it was asked for without further information. How is that useful???
//
// * grps without messages will never be stamped because stamp happens in genReqMsgTransaction, after checking msgListL.empty()
// Problem: without msg, we cannot know the grpId!!
//
// * what is the effect of a time shift between computers on the GXS system?
// * we should check that we never compare time stamps computed on different computers
//
// * mClientMsgUpdateMap[peerid][grpId] is only updated when new msgs are received. Up to date groups will keep asking for lists!
#include <unistd.h>
#include <sys/time.h>
#include <math.h>
#include <sstream>
#include "rsgxsnetservice.h"
#include "retroshare/rsconfig.h"
@ -466,7 +464,7 @@ void RsGxsNetService::syncWithPeers()
NxsBandwidthRecorder::recordEvent(mServType,grp) ;
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_P_(*sit) << " sending RsNxsSyncGrp (sending timestamp of latest group change) to peer id: " << *sit << " ts=" << updateTS << std::endl;
GXSNETDEBUG_P_(*sit) << " sending RsNxsSyncGrp (sending back to peer the timestamp of latest group change we know about him) to peer id: " << *sit << " ts=" << updateTS << std::endl;
#endif
sendItem(grp);
}
@ -490,6 +488,7 @@ void RsGxsNetService::syncWithPeers()
{
RsGxsGrpMetaData* meta = mit->second;
// This was commented out because we want to know how many messages are available for unsubscribed groups.
// if(meta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED )
// {
toRequest.insert(std::make_pair(mit->first, meta));
@ -537,7 +536,8 @@ void RsGxsNetService::syncWithPeers()
if(!checkCanRecvMsgFromPeer(peerId, *meta))
continue;
// On default, the info has never been received so the TS is 0.
// On default, the info has never been received so the TS is 0, meaning the peer has sent that it had no information.
uint32_t updateTS = 0;
if(mui)
@ -1389,31 +1389,43 @@ void RsGxsNetService::data_tick()
processExplicitGroupRequests();
}
static std::string nice_time_stamp(time_t now,time_t TS)
{
if(TS == 0)
return "Never" ;
else
{
std::ostringstream s;
s << now - TS << " secs ago" ;
return s.str() ;
}
}
void RsGxsNetService::debugDump()
{
time_t now = time(NULL) ;
GXSNETDEBUG___<< "RsGxsNetService::debugDump():" << std::endl;
GXSNETDEBUG___<< " mGrpServerUpdateItem time stamp: " << now - mGrpServerUpdateItem->grpUpdateTS << " secs ago (is the last modification time over all groups of this service)" << std::endl;
GXSNETDEBUG___<< " mGrpServerUpdateItem time stamp: " << now - mGrpServerUpdateItem->grpUpdateTS << " secs ago (is the last local modification time over all groups of this service)" << std::endl;
GXSNETDEBUG___<< " mServerMsgUpdateMap: (is for each subscribed group, the last modification time)" << std::endl;
for(std::map<RsGxsGroupId,RsGxsServerMsgUpdateItem*>::const_iterator it(mServerMsgUpdateMap.begin());it!=mServerMsgUpdateMap.end();++it)
GXSNETDEBUG__G(it->first) << " Grp:" << it->first << " last modification: " << now - it->second->msgUpdateTS << " secs ago." << std::endl;
GXSNETDEBUG__G(it->first) << " Grp:" << it->first << " last local modification (secs ago): " << nice_time_stamp(now,it->second->msgUpdateTS) << std::endl;
GXSNETDEBUG___<< " mClientGrpUpdateMap: (is for each friend, the latest time the friend sent content, all groups included)" << std::endl;
for(std::map<RsPeerId,RsGxsGrpUpdateItem*>::const_iterator it(mClientGrpUpdateMap.begin());it!=mClientGrpUpdateMap.end();++it)
GXSNETDEBUG_P_(it->first) << " From peer: " << it->first << " - last updated " << now - it->second->grpUpdateTS << " secs ago." << std::endl;
GXSNETDEBUG_P_(it->first) << " From peer: " << it->first << " - last updated at peer (secs ago): " << nice_time_stamp(now,it->second->grpUpdateTS) << std::endl;
GXSNETDEBUG___<< " mClientMsgUpdateMap: (is for each friend, the latest time the friend sent content for each group)" << std::endl;
GXSNETDEBUG___<< " mClientMsgUpdateMap: (is for each friend, the latest modification time for each group, sent by the friend himself)" << std::endl;
for(std::map<RsPeerId,RsGxsMsgUpdateItem*>::const_iterator it(mClientMsgUpdateMap.begin());it!=mClientMsgUpdateMap.end();++it)
{
GXSNETDEBUG_P_(it->first) << " From peer: " << it->first << std::endl;
for(std::map<RsGxsGroupId, RsGxsMsgUpdateItem::MsgUpdateInfo>::const_iterator it2(it->second->msgUpdateInfos.begin());it2!=it->second->msgUpdateInfos.end();++it2)
GXSNETDEBUG_PG(it->first,it2->first) << " group " << it2->first << " - last updated " << now - it2->second.time_stamp << " secs ago. Message count=" << it2->second.message_count << std::endl;
GXSNETDEBUG_PG(it->first,it2->first) << " group " << it2->first << " - last updated at peer (secs ago): " << nice_time_stamp(now,it2->second.time_stamp) << ". Message count=" << it2->second.message_count << std::endl;
}
}
@ -1423,6 +1435,10 @@ void RsGxsNetService::updateServerSyncTS()
std::map<RsGxsGroupId, RsGxsGrpMetaData*> gxsMap;
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG___<< "updateServerSyncTS(): updating last modification time stamp of local data." << std::endl;
#endif
// retrieve all grps and update TS
mDataStore->retrieveGxsGrpMetaData(gxsMap);
std::map<RsGxsGroupId, RsGxsGrpMetaData*>::iterator mit = gxsMap.begin();
@ -1445,7 +1461,7 @@ void RsGxsNetService::updateServerSyncTS()
if(mGrpServerUpdateItem->grpUpdateTS < grpMeta->mPublishTs)
{
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG__G(grpId) << "publish time stamp of group " << grpId << " has changed to " << time(NULL)-grpMeta->mPublishTs << " secs ago. updating!" << std::endl;
GXSNETDEBUG__G(grpId) << " publish time stamp of group " << grpId << " has changed to " << time(NULL)-grpMeta->mPublishTs << " secs ago. updating!" << std::endl;
#endif
mGrpServerUpdateItem->grpUpdateTS = grpMeta->mPublishTs;
}
@ -1454,21 +1470,32 @@ void RsGxsNetService::updateServerSyncTS()
{
msui = new RsGxsServerMsgUpdateItem(mServType);
msui->grpId = grpMeta->mGroupId;
mServerMsgUpdateMap.insert(std::make_pair(msui->grpId, msui));
}else
{
msui = mapIT->second;
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG__G(grpId) << " created new entry for group " << grpId << std::endl;
#endif
}
else
msui = mapIT->second;
if(grpMeta->mLastPost > msui->msgUpdateTS )
{
change = true;
msui->msgUpdateTS = grpMeta->mLastPost;
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG__G(grpId) << " updated msgUpdateTS to last post = " << time(NULL) - grpMeta->mLastPost << " secs ago for group "<< grpId << std::endl;
#endif
}
// this might be very inefficient with time
if(grpMeta->mRecvTS > mGrpServerUpdateItem->grpUpdateTS)
// This might be very inefficient with time. This is needed because an old message might have been received, so the last modification time
// needs to account for this so that a friend who hasn't
if(mGrpServerUpdateItem->grpUpdateTS < grpMeta->mRecvTS)
{
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG__G(grpId) << " updated msgUpdateTS to last RecvTS = " << time(NULL) - grpMeta->mRecvTS << " secs ago for group "<< grpId << std::endl;
#endif
mGrpServerUpdateItem->grpUpdateTS = grpMeta->mRecvTS;
change = true;
}
@ -1854,6 +1881,7 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
mClientGrpUpdateMap.insert(std::make_pair(peerFrom, item));
}
#warning should not we conservatively use the most recent one, in case the peer has reset its mServerGrpUpdate time?? What happens if the peer unsubscribed a recent group?
item->grpUpdateTS = updateTS;
item->peerId = peerFrom;
@ -2111,9 +2139,7 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
gnsr.max_visible_count = std::max(gnsr.max_visible_count, mcount) ;
if (oldVisibleCount != gnsr.max_visible_count || oldSuppliersCount != gnsr.suppliers.size())
{
mObserver->notifyChangedGroupStats(grpId);
}
#ifdef NXS_NET_DEBUG_1
GXSNETDEBUG_PG(item->PeerId(),grpId) << " grpId = " << grpId << std::endl;
@ -2127,13 +2153,15 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
#warning TODO: what if grpMeta is NULL?
if(! (grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED ))
{
// For unsubscribed groups, we update the timestamp to now, so that the group content will not be asked to the same
// peer again, unless the peer has new info about it.
// That needs of course to reset that time to 0 when we subscribe.
// For unsubscribed groups, we update the timestamp something more recent, so that the group content will not be asked to the same
// peer again, unless the peer has new info about it. It's important to use the same clock (this is peer's clock) so that
// we never compare times from different (and potentially badly sync-ed clocks)
locked_stampPeerGroupUpdateTime(pid,grpId,tr->mTransaction->updateTS,msgItemL.size()) ;
locked_stampPeerGroupUpdateTime(pid,grpId,time(NULL),msgItemL.size()) ;
if(grpMeta)
delete grpMeta;
return ;
}
@ -2317,8 +2345,8 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
{
// The list to req is empty. That means we already have all messages that this peer can
// provide. So we can stamp the group from this peer to be up to date.
locked_stampPeerGroupUpdateTime(pid,grpId,time(NULL),msgItemL.size()) ;
#warning we should use tr->mTransaction->updateTS instead of time(NULL)
locked_stampPeerGroupUpdateTime(pid,grpId,tr->mTransaction->updateTS,msgItemL.size()) ;
}
if(grpMeta)
delete grpMeta;
@ -2326,16 +2354,20 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
void RsGxsNetService::locked_stampPeerGroupUpdateTime(const RsPeerId& pid,const RsGxsGroupId& grpId,time_t tm,uint32_t n_messages)
{
RsGxsMsgUpdateItem *& pitem(mClientMsgUpdateMap[pid]) ;
std::map<RsPeerId,RsGxsMsgUpdateItem*>::iterator it = mClientMsgUpdateMap.find(pid) ;
if(pitem == NULL)
RsGxsMsgUpdateItem *pitem;
if(it == mClientMsgUpdateMap.end())
{
pitem = new RsGxsMsgUpdateItem(mServType) ;
pitem->peerId = pid ;
}
else
pitem = it->second ;
pitem->msgUpdateInfos[grpId].time_stamp = time(NULL) ;
pitem->msgUpdateInfos[grpId].message_count = n_messages ;
pitem->msgUpdateInfos[grpId].time_stamp = tm;
pitem->msgUpdateInfos[grpId].message_count = std::max(n_messages, pitem->msgUpdateInfos[grpId].message_count) ;
IndicateConfigChanged();
}
@ -2573,8 +2605,7 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr)
RsNxsTransac* ntr = new RsNxsTransac(mServType);
ntr->transactionNumber = transN;
ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 |
RsNxsTransac::FLAG_TYPE_GRPS;
ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 | RsNxsTransac::FLAG_TYPE_GRPS;
ntr->updateTS = updateTS;
ntr->nItems = grps.size();
ntr->PeerId(tr->mTransaction->PeerId());
@ -2848,7 +2879,11 @@ void RsGxsNetService::locked_pushGrpRespFromList(std::list<RsNxsItem*>& respList
bool RsGxsNetService::locked_CanReceiveUpdate(const RsNxsSyncGrp *item)
{
// don't sync if you have no new updates for this peer
// Do we have new updates for this peer?
// This is one of the few places where we compare a local time stamp (mGrpServerUpdateItem->grpUpdateTS) to a peer's time stamp.
// Because this is the global modification time for groups, async-ed computers will eventually figure out that their data needs
// to be synced.
if(mGrpServerUpdateItem)
{
@ -3172,6 +3207,9 @@ bool RsGxsNetService::checkCanRecvMsgFromPeer(const RsPeerId& sslId, const RsGxs
bool RsGxsNetService::locked_CanReceiveUpdate(const RsNxsSyncMsg *item)
{
// Do we have new updates for this peer?
// Here we compare times in the same clock: the friend's clock, so it should be fine.
ServerMsgMap::const_iterator cit = mServerMsgUpdateMap.find(item->grpId);
if(cit != mServerMsgUpdateMap.end())
@ -3187,8 +3225,7 @@ bool RsGxsNetService::locked_CanReceiveUpdate(const RsNxsSyncMsg *item)
GXSNETDEBUG_PG(item->PeerId(),item->grpId) << " no local time stamp for this grp. " ;
#endif
#warning when no timestamp is found, the return value should be false, since we do not want to send anything
return true;
return false;
}
void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item)
{

View File

@ -85,6 +85,8 @@ public:
struct MsgUpdateInfo
{
MsgUpdateInfo(): time_stamp(0), message_count(0) {}
uint32_t time_stamp ;
uint32_t message_count ;
};
@ -105,7 +107,7 @@ public:
virtual std::ostream &print(std::ostream &out, uint16_t indent);
RsGxsGroupId grpId;
uint32_t msgUpdateTS; // the last time this group received a new msg
uint32_t msgUpdateTS; // local time stamp this group last received a new msg
};