diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index 675fe3d1e..a479b9f4d 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -106,21 +106,22 @@ // +--------- sort items from mPendingResp // | | // | +------ locked_createTransactionFromPending(GrpRespPending / MsgRespPending) -// | | // takes accepted transaction and adds them to the list of active trans +// | | // takes accepted transaction and adds them to the list of active trans // | // +--------- sort items from mPendingCircleVetting // | // +------ locked_createTransactionFromPending(GrpCircleIdsRequestVetting / MsgCircleIdsRequestVetting) -// // takes accepted transaction and adds them to the list of active trans +// // takes accepted transaction and adds them to the list of active trans // // Objects for time stamps // ======================= // -// mClientGrpUpdateMap: map< RsPeerId, TimeStamp > Time stamp over all groups sent by that peer Id -// Updated in processCompletedIncomingTransaction() from Grp list trans. -// -// Used in syncWithPeers() sending in RsNxsSyncGrp once to all peers. -// Set at server to be mGrpServerUpdateItem->grpUpdateTS +// mClientGrpUpdateMap: map< RsPeerId, TimeStamp > Time stamp of last modification of group data for that peer (in peer's clock time!) +// (Set at server side to be mGrpServerUpdateItem->grpUpdateTS) +// +// Only updated in processCompletedIncomingTransaction() from Grp list transaction. +// Used in syncWithPeers() sending in RsNxsSyncGrp once to all peers: peer will send data if +// has something new. All time comparisons are in the friends' clock time. // // mClientMsgUpdateMap: map< RsPeerId, map > // @@ -129,9 +130,9 @@ // Used in syncWithPeers() sending in RsNxsSyncGrp once to all peers. // Set at server to be mServerMsgUpdateMap[grpId]->msgUpdateTS // -// mGrpServerUpdateItem: TimeStamp Last group modification timestamp over all groups +// mGrpServerUpdateItem: TimeStamp Last group local modification timestamp over all groups // -// mServerMsgUpdateMap: map< GrpId, TimeStamp > Timestamp modification for each group (time of most recent msg) +// mServerMsgUpdateMap: map< GrpId, TimeStamp > Timestamp local modification for each group (i.e. time of most recent msg / metadata update) // // // Group update algorithm @@ -160,9 +161,9 @@ // Suggestions // =========== // * handleRecvSyncGroup should use mit->second.mLastPost to limit the sending of already known data -// * apparently mServerMsgUpdateMap is initially empty -> by default clients will always want to receive the data. +// X * apparently mServerMsgUpdateMap is initially empty -> by default clients will always want to receive the data. // => new peers will always send data for each group until they get an update for that group. -// * check that there is a timestamp for unsubscribed items, otherwise we always send TS=0 and we always get them!! (in 346) +// X * check that there is a timestamp for unsubscribed items, otherwise we always send TS=0 and we always get them!! (in 346) // // -> there is not. mClientMsgUpdateMap is updated when msgs are received. // -> 1842: leaves before asking for msg content. @@ -177,21 +178,18 @@ // * the last TS method is not perfect: do new peers always receive old messages? // // * there's double information between mServerMsgUpdateMap first element (groupId) and second->grpId -// * processExplicitGroupRequests() seems to send the group list that it was -// asked for without further information. How is that useful??? +// * processExplicitGroupRequests() seems to send the group list that it was asked for without further information. How is that useful??? // // * grps without messages will never be stamped because stamp happens in genReqMsgTransaction, after checking msgListL.empty() // Problem: without msg, we cannot know the grpId!! // -// * what is the effect of a time shift between computers on the GXS system? -// * we should check that we never compare time stamps computed on different computers -// -// * mClientMsgUpdateMap[peerid][grpId] is only updated when new msgs are received. Up to date groups will keep asking for lists! +// * mClientMsgUpdateMap[peerid][grpId] is only updated when new msgs are received. Up to date groups will keep asking for lists! #include #include #include +#include #include "rsgxsnetservice.h" #include "retroshare/rsconfig.h" @@ -466,7 +464,7 @@ void RsGxsNetService::syncWithPeers() NxsBandwidthRecorder::recordEvent(mServType,grp) ; #ifdef NXS_NET_DEBUG_0 - GXSNETDEBUG_P_(*sit) << " sending RsNxsSyncGrp (sending timestamp of latest group change) to peer id: " << *sit << " ts=" << updateTS << std::endl; + GXSNETDEBUG_P_(*sit) << " sending RsNxsSyncGrp (sending back to peer the timestamp of latest group change we know about him) to peer id: " << *sit << " ts=" << updateTS << std::endl; #endif sendItem(grp); } @@ -490,6 +488,7 @@ void RsGxsNetService::syncWithPeers() { RsGxsGrpMetaData* meta = mit->second; + // This was commented out because we want to know how many messages are available for unsubscribed groups. // if(meta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED ) // { toRequest.insert(std::make_pair(mit->first, meta)); @@ -537,7 +536,8 @@ void RsGxsNetService::syncWithPeers() if(!checkCanRecvMsgFromPeer(peerId, *meta)) continue; - // On default, the info has never been received so the TS is 0. + // On default, the info has never been received so the TS is 0, meaning the peer has sent that it had no information. + uint32_t updateTS = 0; if(mui) @@ -1389,31 +1389,43 @@ void RsGxsNetService::data_tick() processExplicitGroupRequests(); } +static std::string nice_time_stamp(time_t now,time_t TS) +{ + if(TS == 0) + return "Never" ; + else + { + std::ostringstream s; + s << now - TS << " secs ago" ; + return s.str() ; + } +} + void RsGxsNetService::debugDump() { time_t now = time(NULL) ; GXSNETDEBUG___<< "RsGxsNetService::debugDump():" << std::endl; - GXSNETDEBUG___<< " mGrpServerUpdateItem time stamp: " << now - mGrpServerUpdateItem->grpUpdateTS << " secs ago (is the last modification time over all groups of this service)" << std::endl; + GXSNETDEBUG___<< " mGrpServerUpdateItem time stamp: " << now - mGrpServerUpdateItem->grpUpdateTS << " secs ago (is the last local modification time over all groups of this service)" << std::endl; GXSNETDEBUG___<< " mServerMsgUpdateMap: (is for each subscribed group, the last modification time)" << std::endl; for(std::map::const_iterator it(mServerMsgUpdateMap.begin());it!=mServerMsgUpdateMap.end();++it) - GXSNETDEBUG__G(it->first) << " Grp:" << it->first << " last modification: " << now - it->second->msgUpdateTS << " secs ago." << std::endl; + GXSNETDEBUG__G(it->first) << " Grp:" << it->first << " last local modification (secs ago): " << nice_time_stamp(now,it->second->msgUpdateTS) << std::endl; GXSNETDEBUG___<< " mClientGrpUpdateMap: (is for each friend, the latest time the friend sent content, all groups included)" << std::endl; for(std::map::const_iterator it(mClientGrpUpdateMap.begin());it!=mClientGrpUpdateMap.end();++it) - GXSNETDEBUG_P_(it->first) << " From peer: " << it->first << " - last updated " << now - it->second->grpUpdateTS << " secs ago." << std::endl; + GXSNETDEBUG_P_(it->first) << " From peer: " << it->first << " - last updated at peer (secs ago): " << nice_time_stamp(now,it->second->grpUpdateTS) << std::endl; - GXSNETDEBUG___<< " mClientMsgUpdateMap: (is for each friend, the latest time the friend sent content for each group)" << std::endl; + GXSNETDEBUG___<< " mClientMsgUpdateMap: (is for each friend, the latest modification time for each group, sent by the friend himself)" << std::endl; for(std::map::const_iterator it(mClientMsgUpdateMap.begin());it!=mClientMsgUpdateMap.end();++it) { GXSNETDEBUG_P_(it->first) << " From peer: " << it->first << std::endl; for(std::map::const_iterator it2(it->second->msgUpdateInfos.begin());it2!=it->second->msgUpdateInfos.end();++it2) - GXSNETDEBUG_PG(it->first,it2->first) << " group " << it2->first << " - last updated " << now - it2->second.time_stamp << " secs ago. Message count=" << it2->second.message_count << std::endl; + GXSNETDEBUG_PG(it->first,it2->first) << " group " << it2->first << " - last updated at peer (secs ago): " << nice_time_stamp(now,it2->second.time_stamp) << ". Message count=" << it2->second.message_count << std::endl; } } @@ -1423,6 +1435,10 @@ void RsGxsNetService::updateServerSyncTS() std::map gxsMap; +#ifdef NXS_NET_DEBUG_0 + GXSNETDEBUG___<< "updateServerSyncTS(): updating last modification time stamp of local data." << std::endl; +#endif + // retrieve all grps and update TS mDataStore->retrieveGxsGrpMetaData(gxsMap); std::map::iterator mit = gxsMap.begin(); @@ -1445,7 +1461,7 @@ void RsGxsNetService::updateServerSyncTS() if(mGrpServerUpdateItem->grpUpdateTS < grpMeta->mPublishTs) { #ifdef NXS_NET_DEBUG_0 - GXSNETDEBUG__G(grpId) << "publish time stamp of group " << grpId << " has changed to " << time(NULL)-grpMeta->mPublishTs << " secs ago. updating!" << std::endl; + GXSNETDEBUG__G(grpId) << " publish time stamp of group " << grpId << " has changed to " << time(NULL)-grpMeta->mPublishTs << " secs ago. updating!" << std::endl; #endif mGrpServerUpdateItem->grpUpdateTS = grpMeta->mPublishTs; } @@ -1454,21 +1470,32 @@ void RsGxsNetService::updateServerSyncTS() { msui = new RsGxsServerMsgUpdateItem(mServType); msui->grpId = grpMeta->mGroupId; + mServerMsgUpdateMap.insert(std::make_pair(msui->grpId, msui)); - }else - { - msui = mapIT->second; +#ifdef NXS_NET_DEBUG_0 + GXSNETDEBUG__G(grpId) << " created new entry for group " << grpId << std::endl; +#endif } + else + msui = mapIT->second; if(grpMeta->mLastPost > msui->msgUpdateTS ) { change = true; msui->msgUpdateTS = grpMeta->mLastPost; +#ifdef NXS_NET_DEBUG_0 + GXSNETDEBUG__G(grpId) << " updated msgUpdateTS to last post = " << time(NULL) - grpMeta->mLastPost << " secs ago for group "<< grpId << std::endl; +#endif } - // this might be very inefficient with time - if(grpMeta->mRecvTS > mGrpServerUpdateItem->grpUpdateTS) + // This might be very inefficient with time. This is needed because an old message might have been received, so the last modification time + // needs to account for this so that a friend who hasn't + + if(mGrpServerUpdateItem->grpUpdateTS < grpMeta->mRecvTS) { +#ifdef NXS_NET_DEBUG_0 + GXSNETDEBUG__G(grpId) << " updated msgUpdateTS to last RecvTS = " << time(NULL) - grpMeta->mRecvTS << " secs ago for group "<< grpId << std::endl; +#endif mGrpServerUpdateItem->grpUpdateTS = grpMeta->mRecvTS; change = true; } @@ -1854,6 +1881,7 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr) mClientGrpUpdateMap.insert(std::make_pair(peerFrom, item)); } +#warning should not we conservatively use the most recent one, in case the peer has reset its mServerGrpUpdate time?? What happens if the peer unsubscribed a recent group? item->grpUpdateTS = updateTS; item->peerId = peerFrom; @@ -2111,9 +2139,7 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr) gnsr.max_visible_count = std::max(gnsr.max_visible_count, mcount) ; if (oldVisibleCount != gnsr.max_visible_count || oldSuppliersCount != gnsr.suppliers.size()) - { mObserver->notifyChangedGroupStats(grpId); - } #ifdef NXS_NET_DEBUG_1 GXSNETDEBUG_PG(item->PeerId(),grpId) << " grpId = " << grpId << std::endl; @@ -2127,13 +2153,15 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr) #warning TODO: what if grpMeta is NULL? if(! (grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED )) { - // For unsubscribed groups, we update the timestamp to now, so that the group content will not be asked to the same - // peer again, unless the peer has new info about it. - // That needs of course to reset that time to 0 when we subscribe. + // For unsubscribed groups, we update the timestamp something more recent, so that the group content will not be asked to the same + // peer again, unless the peer has new info about it. It's important to use the same clock (this is peer's clock) so that + // we never compare times from different (and potentially badly sync-ed clocks) - locked_stampPeerGroupUpdateTime(pid,grpId,time(NULL),msgItemL.size()) ; + locked_stampPeerGroupUpdateTime(pid,grpId,tr->mTransaction->updateTS,msgItemL.size()) ; + if(grpMeta) delete grpMeta; + return ; } @@ -2317,8 +2345,8 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr) { // The list to req is empty. That means we already have all messages that this peer can // provide. So we can stamp the group from this peer to be up to date. - - locked_stampPeerGroupUpdateTime(pid,grpId,time(NULL),msgItemL.size()) ; +#warning we should use tr->mTransaction->updateTS instead of time(NULL) + locked_stampPeerGroupUpdateTime(pid,grpId,tr->mTransaction->updateTS,msgItemL.size()) ; } if(grpMeta) delete grpMeta; @@ -2326,16 +2354,20 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr) void RsGxsNetService::locked_stampPeerGroupUpdateTime(const RsPeerId& pid,const RsGxsGroupId& grpId,time_t tm,uint32_t n_messages) { - RsGxsMsgUpdateItem *& pitem(mClientMsgUpdateMap[pid]) ; - - if(pitem == NULL) + std::map::iterator it = mClientMsgUpdateMap.find(pid) ; + + RsGxsMsgUpdateItem *pitem; + + if(it == mClientMsgUpdateMap.end()) { pitem = new RsGxsMsgUpdateItem(mServType) ; pitem->peerId = pid ; } - - pitem->msgUpdateInfos[grpId].time_stamp = time(NULL) ; - pitem->msgUpdateInfos[grpId].message_count = n_messages ; + else + pitem = it->second ; + + pitem->msgUpdateInfos[grpId].time_stamp = tm; + pitem->msgUpdateInfos[grpId].message_count = std::max(n_messages, pitem->msgUpdateInfos[grpId].message_count) ; IndicateConfigChanged(); } @@ -2573,8 +2605,7 @@ void RsGxsNetService::locked_genSendGrpsTransaction(NxsTransaction* tr) RsNxsTransac* ntr = new RsNxsTransac(mServType); ntr->transactionNumber = transN; - ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 | - RsNxsTransac::FLAG_TYPE_GRPS; + ntr->transactFlag = RsNxsTransac::FLAG_BEGIN_P1 | RsNxsTransac::FLAG_TYPE_GRPS; ntr->updateTS = updateTS; ntr->nItems = grps.size(); ntr->PeerId(tr->mTransaction->PeerId()); @@ -2848,8 +2879,12 @@ void RsGxsNetService::locked_pushGrpRespFromList(std::list& respList bool RsGxsNetService::locked_CanReceiveUpdate(const RsNxsSyncGrp *item) { - // don't sync if you have no new updates for this peer + // Do we have new updates for this peer? + // This is one of the few places where we compare a local time stamp (mGrpServerUpdateItem->grpUpdateTS) to a peer's time stamp. + // Because this is the global modification time for groups, async-ed computers will eventually figure out that their data needs + // to be synced. + if(mGrpServerUpdateItem) { #ifdef NXS_NET_DEBUG_0 @@ -3172,6 +3207,9 @@ bool RsGxsNetService::checkCanRecvMsgFromPeer(const RsPeerId& sslId, const RsGxs bool RsGxsNetService::locked_CanReceiveUpdate(const RsNxsSyncMsg *item) { + // Do we have new updates for this peer? + // Here we compare times in the same clock: the friend's clock, so it should be fine. + ServerMsgMap::const_iterator cit = mServerMsgUpdateMap.find(item->grpId); if(cit != mServerMsgUpdateMap.end()) @@ -3187,8 +3225,7 @@ bool RsGxsNetService::locked_CanReceiveUpdate(const RsNxsSyncMsg *item) GXSNETDEBUG_PG(item->PeerId(),item->grpId) << " no local time stamp for this grp. " ; #endif -#warning when no timestamp is found, the return value should be false, since we do not want to send anything - return true; + return false; } void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsg* item) { diff --git a/libretroshare/src/serialiser/rsgxsupdateitems.h b/libretroshare/src/serialiser/rsgxsupdateitems.h index 9e1352b3c..1f5e55e20 100644 --- a/libretroshare/src/serialiser/rsgxsupdateitems.h +++ b/libretroshare/src/serialiser/rsgxsupdateitems.h @@ -85,6 +85,8 @@ public: struct MsgUpdateInfo { + MsgUpdateInfo(): time_stamp(0), message_count(0) {} + uint32_t time_stamp ; uint32_t message_count ; }; @@ -105,7 +107,7 @@ public: virtual std::ostream &print(std::ostream &out, uint16_t indent); RsGxsGroupId grpId; - uint32_t msgUpdateTS; // the last time this group received a new msg + uint32_t msgUpdateTS; // local time stamp this group last received a new msg };