optimization in circles: now only auto-subscribe when a own msg is present

This commit is contained in:
csoler 2020-11-19 23:43:47 +01:00
parent 883dfd9c99
commit e15058d14c
3 changed files with 187 additions and 155 deletions

View File

@ -61,7 +61,9 @@ static const uint32_t INDEX_AUTHEN_ADMIN = 0x00000040; // admin key
#define GXS_MASK "GXS_MASK_HACK" #define GXS_MASK "GXS_MASK_HACK"
#define GEN_EXCH_DEBUG 1 /*
* #define GEN_EXCH_DEBUG 1
*/
// Data flow in RsGenExchange // Data flow in RsGenExchange
// //

View File

@ -40,6 +40,7 @@
/**** /****
* #define DEBUG_CIRCLES 1 * #define DEBUG_CIRCLES 1
****/ ****/
#define DEBUG_CIRCLES 1
/*extern*/ RsGxsCircles* rsGxsCircles = nullptr; /*extern*/ RsGxsCircles* rsGxsCircles = nullptr;
@ -1074,6 +1075,7 @@ RsGxsCircleCache::RsGxsCircleCache()
mLastUpdatedMembershipTS = 0 ; mLastUpdatedMembershipTS = 0 ;
mStatus = CircleEntryCacheStatus::NO_DATA_YET; mStatus = CircleEntryCacheStatus::NO_DATA_YET;
mAllIdsHere = false; mAllIdsHere = false;
mHasOwnMembershipMessage = false;
return; return;
} }
@ -1286,7 +1288,7 @@ bool p3GxsCircles::cache_request_load(const RsGxsCircleId &id)
int32_t age = 0; int32_t age = 0;
if (RsTickEvent::prev_event_ago(CIRCLE_EVENT_CACHELOAD, age) && age<MIN_CIRCLE_LOAD_GAP) if (RsTickEvent::prev_event_ago(CIRCLE_EVENT_CACHELOAD, age) && age<MIN_CIRCLE_LOAD_GAP)
RsTickEvent::schedule_in(CIRCLE_EVENT_CACHELOAD, MIN_CIRCLE_LOAD_GAP - age); RsTickEvent::schedule_in(CIRCLE_EVENT_CACHELOAD, MIN_CIRCLE_LOAD_GAP - age);
else else
RsTickEvent::schedule_now(CIRCLE_EVENT_CACHELOAD); RsTickEvent::schedule_now(CIRCLE_EVENT_CACHELOAD);
@ -1406,8 +1408,6 @@ bool p3GxsCircles::cache_load_for_token(uint32_t token)
RsTickEvent::schedule_in(CIRCLE_EVENT_RELOADIDS, GXSID_LOAD_CYCLE, id.toStdString()); RsTickEvent::schedule_in(CIRCLE_EVENT_RELOADIDS, GXSID_LOAD_CYCLE, id.toStdString());
} }
mShouldSendCacheUpdateNotification = true; mShouldSendCacheUpdateNotification = true;
locked_checkCircleCacheForAutoSubscribe(cache);
} }
return true; return true;
@ -1505,8 +1505,6 @@ bool p3GxsCircles::cache_reloadids(const RsGxsCircleId &circleId)
// We can check for self inclusion in the circle right away, since own ids are always loaded. // We can check for self inclusion in the circle right away, since own ids are always loaded.
// that allows to subscribe/unsubscribe uncomplete circles // that allows to subscribe/unsubscribe uncomplete circles
locked_checkCircleCacheForAutoSubscribe(cache);
cache.mStatus = CircleEntryCacheStatus::CHECKING_MEMBERSHIP; cache.mStatus = CircleEntryCacheStatus::CHECKING_MEMBERSHIP;
locked_checkCircleCacheForMembershipUpdate(cache); locked_checkCircleCacheForMembershipUpdate(cache);
@ -1537,8 +1535,7 @@ bool p3GxsCircles::checkCircleCache()
#endif #endif
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/ RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
mCircleCache.applyToAllCachedEntries(*this,&p3GxsCircles::locked_checkCircleCacheForMembershipUpdate) ; mCircleCache.applyToAllCachedEntries(*this,&p3GxsCircles::locked_checkCircleCacheForMembershipUpdate) ;
mCircleCache.applyToAllCachedEntries(*this,&p3GxsCircles::locked_checkCircleCacheForAutoSubscribe) ;
return true ; return true ;
} }
@ -1555,11 +1552,13 @@ bool p3GxsCircles::locked_checkCircleCacheForMembershipUpdate(RsGxsCircleCache&
#ifdef DEBUG_CIRCLES #ifdef DEBUG_CIRCLES
std::cerr << "Cache entry for circle " << cache.mCircleId << " needs a swab over membership requests. Re-scheduling it." << std::endl; std::cerr << "Cache entry for circle " << cache.mCircleId << " needs a swab over membership requests. Re-scheduling it." << std::endl;
#endif #endif
cache.mGroupStatus |= GXS_SERV::GXS_GRP_STATUS_UNPROCESSED; // forces processing of cache entry
uint32_t token;
RsGenExchange::setGroupStatusFlags(token, RsGxsGroupId(cache.mCircleId.toStdString()), 0, GXS_SERV::GXS_GRP_STATUS_UNPROCESSED);
// this should be called regularly // this should be called regularly
uint32_t token ; RsTokReqOptions opts;
RsTokReqOptions opts;
opts.mReqType = GXS_REQUEST_TYPE_MSG_DATA; opts.mReqType = GXS_REQUEST_TYPE_MSG_DATA;
std::list<RsGxsGroupId> grpIds ; std::list<RsGxsGroupId> grpIds ;
@ -1573,7 +1572,7 @@ bool p3GxsCircles::locked_checkCircleCacheForMembershipUpdate(RsGxsCircleCache&
/* We need to AutoSubscribe if the Circle is relevent to us */ /* We need to AutoSubscribe if the Circle is relevent to us */
bool p3GxsCircles::locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache &cache) bool p3GxsCircles::locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache& cache)
{ {
#ifdef DEBUG_CIRCLES #ifdef DEBUG_CIRCLES
std::cerr << "p3GxsCircles::locked_checkCircleCacheForAutoSubscribe() : "<< cache.mCircleId << std::endl; std::cerr << "p3GxsCircles::locked_checkCircleCacheForAutoSubscribe() : "<< cache.mCircleId << std::endl;
@ -1615,73 +1614,70 @@ bool p3GxsCircles::locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache &cac
return false ; return false ;
} }
bool in_admin_list = false ; bool am_I_invited = false ;
bool member_request = false ;
for(std::list<RsGxsId>::const_iterator it(myOwnIds.begin());it!=myOwnIds.end() && (!in_admin_list) && (!member_request);++it) for(std::list<RsGxsId>::const_iterator it(myOwnIds.begin());it!=myOwnIds.end() && (!am_I_invited);++it)
{ {
std::map<RsGxsId,RsGxsCircleMembershipStatus>::const_iterator it2 = cache.mMembershipStatus.find(*it) ; std::map<RsGxsId,RsGxsCircleMembershipStatus>::const_iterator it2 = cache.mMembershipStatus.find(*it) ;
if(it2 != cache.mMembershipStatus.end()) if(it2 != cache.mMembershipStatus.end())
{ am_I_invited = am_I_invited || bool(it2->second.subscription_flags & GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST) ;
in_admin_list = in_admin_list || bool(it2->second.subscription_flags & GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST) ;
member_request= member_request|| bool(it2->second.subscription_flags & GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED) ;
}
} }
bool am_I_admin( cache.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN) ; bool am_I_admin( cache.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN) ;
bool do_I_have_a_msg( cache.mHasOwnMembershipMessage );
#ifdef DEBUG_CIRCLES
std::cerr << " own ID in circle: " << in_admin_list << ", own subscribe request: " << member_request << ", am I admin?: " << am_I_admin << std::endl;
#endif
if(in_admin_list || member_request || am_I_admin)
{
uint32_t token, token2;
if(! (cache.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED))
{
#ifdef DEBUG_CIRCLES
/* we are part of this group - subscribe, clear unprocessed flag */
std::cerr << " I'm allowed in this circle => AutoSubscribing!" << std::endl;
#endif
RsGenExchange::subscribeToGroup(token, RsGxsGroupId(cache.mCircleId), true);
}
#ifdef DEBUG_CIRCLES
else
std::cerr << " I'm allowed in this circle, and already subscribed." << std::endl;
#endif
RsGenExchange::setGroupStatusFlags(token2, RsGxsGroupId(cache.mCircleId), 0, GXS_SERV::GXS_GRP_STATUS_UNPROCESSED);
cache.mGroupStatus &= ~GXS_SERV::GXS_GRP_STATUS_UNPROCESSED;
mShouldSendCacheUpdateNotification = true; #ifdef DEBUG_CIRCLES
std::cerr << " own ID invited in circle: " << am_I_invited << ", membership msg author: " << do_I_have_a_msg << ", admin: " << am_I_admin << std::endl;
return true; #endif
} if(do_I_have_a_msg || am_I_admin)
{
if(! (cache.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED))
{
#ifdef DEBUG_CIRCLES
/* we are part of this group - subscribe, clear unprocessed flag */
std::cerr << " either admin or have posted a subscribe/unsubscribe message => AutoSubscribing!" << std::endl;
#endif
uint32_t token;
RsGenExchange::subscribeToGroup(token, RsGxsGroupId(cache.mCircleId), true);
mShouldSendCacheUpdateNotification = true;
}
#ifdef DEBUG_CIRCLES
else
std::cerr << " either admin or have posted a subscribe/unsubscribe message, already subscribed." << std::endl;
#endif
cache.mGroupStatus &= ~GXS_SERV::GXS_GRP_STATUS_UNPROCESSED;
return true;
}
else else
{ {
/* we know all the peers - we are not part - we can flag as PROCESSED. */ /* we know all the peers - we are not part - we can flag as PROCESSED. */
uint32_t token,token2;
RsGenExchange::setGroupStatusFlags(token, RsGxsGroupId(cache.mCircleId.toStdString()), 0, GXS_SERV::GXS_GRP_STATUS_UNPROCESSED);
if(cache.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED) if(cache.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED)
{ {
RsGenExchange::subscribeToGroup(token2, RsGxsGroupId(cache.mCircleId), false); uint32_t token;
RsGenExchange::subscribeToGroup(token, RsGxsGroupId(cache.mCircleId), false);
mShouldSendCacheUpdateNotification = true;
#ifdef DEBUG_CIRCLES #ifdef DEBUG_CIRCLES
std::cerr << " Not part of the group! Let's unsubscribe this circle of unfriendly Napoleons!" << std::endl; std::cerr << " Neither admin nor subscription msg author! Let's unsubscribe this circle of unfriendly Napoleons!" << std::endl;
#endif #endif
} }
#ifdef DEBUG_CIRCLES #ifdef DEBUG_CIRCLES
else else
std::cerr << " Not part of the group, and not subscribed either." << std::endl; std::cerr << " Neither admin nor subscription msg author! Not subscribed either." << std::endl;
#endif #endif
cache.mGroupStatus &= ~GXS_SERV::GXS_GRP_STATUS_UNPROCESSED;
mShouldSendCacheUpdateNotification = true;
return true ; return true ;
} }
#ifdef DEBUG_CIRCLES
std::cerr << " Marking the cache entry as processed." << std::endl;
#endif
uint32_t token2;
cache.mGroupStatus &= ~GXS_SERV::GXS_GRP_STATUS_UNPROCESSED;
RsGenExchange::setGroupStatusFlags(token2, RsGxsGroupId(cache.mCircleId), 0, GXS_SERV::GXS_GRP_STATUS_UNPROCESSED);
return true;
} }
void p3GxsCircles::addCircleIdToList(const RsGxsCircleId &circleId, uint32_t circleType) void p3GxsCircles::addCircleIdToList(const RsGxsCircleId &circleId, uint32_t circleType)
@ -1861,14 +1857,22 @@ bool p3GxsCircles::pushCircleMembershipRequest(
return false; return false;
} }
if(!getCirclesInfo( if(!getCirclesInfo( std::list<RsGxsGroupId>{static_cast<RsGxsGroupId>(circle_id)}, RS_DEFAULT_STORAGE_PARAM(std::vector<RsGxsCircleGroup>) ))
std::list<RsGxsGroupId>{static_cast<RsGxsGroupId>(circle_id)},
RS_DEFAULT_STORAGE_PARAM(std::vector<RsGxsCircleGroup>) ))
{ {
RsErr() << __PRETTY_FUNCTION__ << " Cannot generate membership request " RsErr() << __PRETTY_FUNCTION__ << " Cannot generate membership request from unknown circle: " << circle_id << std::endl;
<< "from unknown circle: " << circle_id << std::endl;
return false; return false;
} }
// If the circle is not subscribed, then subscribe, whatever the subscription type. Indeed, if we publish a msg, even a msg for
// unsubscribing, we need to have a subscribed group first.
uint32_t token ;
RsGenExchange::subscribeToGroup(token, RsGxsGroupId(circle_id), true);
if(waitToken(token) != RsTokenService::COMPLETE)
{
std::cerr << __PRETTY_FUNCTION__ << " Could not subscribe to Circle group." << std::endl;
return false;
}
// Create a subscribe item // Create a subscribe item
@ -1898,11 +1902,7 @@ bool p3GxsCircles::pushCircleMembershipRequest(
std::cerr << " AuthorId : " << s->meta.mAuthorId << std::endl; std::cerr << " AuthorId : " << s->meta.mAuthorId << std::endl;
std::cerr << " ThreadId : " << s->meta.mThreadId << std::endl; std::cerr << " ThreadId : " << s->meta.mThreadId << std::endl;
#endif #endif
uint32_t token ;
if(request_type == RsGxsCircleSubscriptionType::SUBSCRIBE)
RsGenExchange::subscribeToGroup(token, RsGxsGroupId(circle_id), true);
RsGenExchange::publishMsg(token, s); RsGenExchange::publishMsg(token, s);
// update the cache. // update the cache.
@ -1920,13 +1920,93 @@ bool p3GxsCircles::cancelCircleMembership(const RsGxsId& own_gxsid,const RsGxsCi
return pushCircleMembershipRequest(own_gxsid,circle_id,RsGxsCircleSubscriptionType::UNSUBSCRIBE) ; return pushCircleMembershipRequest(own_gxsid,circle_id,RsGxsCircleSubscriptionType::UNSUBSCRIBE) ;
} }
bool p3GxsCircles::locked_processMembershipMessages(RsGxsCircleCache& cache, const std::vector<RsGxsMsgItem*>& items, GxsMsgReq& messages_to_delete, const std::set<RsGxsId> &own_ids)
{
#ifdef DEBUG_CIRCLES
std::cerr << " Circle found in cache!" << std::endl;
std::cerr << " Retrieving messages..." << std::endl;
#endif
cache.mHasOwnMembershipMessage = false; // default
for(uint32_t i=0;i<items.size();++i)
{
#ifdef DEBUG_CIRCLES
std::cerr << " Group ID: " << items[i]->meta.mGroupId << ", Message ID: " << items[i]->meta.mMsgId << ", thread ID: " << items[i]->meta.mThreadId << ", author: " << items[i]->meta.mAuthorId << ": " ;
#endif
RsGxsCircleSubscriptionRequestItem *item = dynamic_cast<RsGxsCircleSubscriptionRequestItem*>(items[i]) ;
if(item == NULL)
{
std::cerr << " (EE) item is not a RsGxsCircleSubscriptionRequestItem. Weird. Scheduling for deletion." << std::endl;
messages_to_delete[RsGxsGroupId(cache.mCircleId)].insert(item->meta.mMsgId);
continue ;
}
RsGxsCircleMembershipStatus& info(cache.mMembershipStatus[item->meta.mAuthorId]) ;
#ifdef DEBUG_CIRCLES
std::cerr << " " << time(NULL) - item->time_stamp << " seconds ago, " ;
#endif
if(info.last_subscription_TS <= item->time_stamp) // the <= here allows to make sure we update the flags is something happenned
{
info.last_subscription_TS = item->time_stamp ;
if(item->subscription_type == RsGxsCircleSubscriptionType::SUBSCRIBE)
info.subscription_flags |= GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED;
else if(item->subscription_type == RsGxsCircleSubscriptionType::UNSUBSCRIBE)
info.subscription_flags &= ~GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED;
else
std::cerr << " (EE) unknown subscription order type: " << static_cast<uint32_t>(item->subscription_type) ;
mShouldSendCacheUpdateNotification = true;
#ifdef DEBUG_CIRCLES
std::cerr << " UPDATING status to " << std::hex << info.subscription_flags << std::dec << std::endl;
#endif
if(own_ids.end() != own_ids.find(item->meta.mAuthorId)) // we have at least one subscribe/unsubscribe message. So we update the flag accordingly.
cache.mHasOwnMembershipMessage = true;
}
else if(info.last_subscription_TS > item->time_stamp)
std::cerr << " Too old: item->TS=" << item->time_stamp << ", last_subscription_TS=" << info.last_subscription_TS << ". IGNORING." << std::endl;
}
// now do another sweep and remove all msgs that are older than the latest
#ifdef DEBUG_CIRCLES
std::cerr << " Cleaning older messages..." << std::endl;
#endif
for(uint32_t i=0;i<items.size();++i)
{
RsGxsCircleMembershipStatus& info(cache.mMembershipStatus[items[i]->meta.mAuthorId]) ;
RsGxsCircleSubscriptionRequestItem *item = dynamic_cast<RsGxsCircleSubscriptionRequestItem*>(items[i]) ;
if(item && info.last_subscription_TS > item->time_stamp)
{
#ifdef DEBUG_CIRCLES
std::cerr << " " << item->meta.mMsgId << ": Older than last known (" << (long int)info.last_subscription_TS - (long int)item->time_stamp << " seconds before): deleting." << std::endl;
#endif
messages_to_delete[RsGxsGroupId(cache.mCircleId)].insert(item->meta.mMsgId) ;
}
}
cache.mLastUpdatedMembershipTS = time(NULL) ;
cache.mStatus = CircleEntryCacheStatus::UP_TO_DATE;
cache.mLastUpdateTime = time(NULL);
mShouldSendCacheUpdateNotification = true;
return true;
}
bool p3GxsCircles::processMembershipRequests(uint32_t token) bool p3GxsCircles::processMembershipRequests(uint32_t token)
{ {
// Go through membership request messages and process them according to the following rule: // Go through membership request messages and process them according to the following rule:
// * for each ID only keep the latest membership request. Delete the older ones. // * for each ID only keep the latest membership request. Delete the older ones.
// * for each circle, keep a list of IDs sorted into membership categories (e.g. keep updated flags for each IDs) // * for each circle, keep a list of IDs sorted into membership categories (e.g. keep updated flags for each IDs)
// Because msg loading is async-ed, the job in split in two methods: one calls the loading, the other one handles the loaded data. // Because msg loading is async-ed, the job in split in two methods: one calls the loading, the other one handles the loaded data.
#ifdef DEBUG_CIRCLES #ifdef DEBUG_CIRCLES
std::cerr << "Processing circle membership requests." << std::endl; std::cerr << "Processing circle membership requests." << std::endl;
#endif #endif
@ -1934,111 +2014,59 @@ bool p3GxsCircles::processMembershipRequests(uint32_t token)
if(!RsGenExchange::getMsgData(token, msgItems)) if(!RsGenExchange::getMsgData(token, msgItems))
{ {
std::cerr << "(EE) Cannot get msg data for circle. Something's weird." << std::endl; std::cerr << "(EE) Cannot get msg data for circle. Something's weird." << std::endl;
return false; return false;
} }
std::list<RsGxsId> own_ids ;
rsIdentity->getOwnIds(own_ids);
std::set<RsGxsId> own_ids_set;
for(auto& id:own_ids)
own_ids_set.insert(id);
GxsMsgReq messages_to_delete ; GxsMsgReq messages_to_delete ;
for(GxsMsgDataMap::const_iterator it(msgItems.begin());it!=msgItems.end();++it) for(GxsMsgDataMap::const_iterator it(msgItems.begin());it!=msgItems.end();++it)
{ {
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/ RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
#ifdef DEBUG_CIRCLES #ifdef DEBUG_CIRCLES
std::cerr << " Circle ID: " << it->first << std::endl; std::cerr << " Circle ID: " << it->first << std::endl;
#endif #endif
// Find the circle ID in cache and process the list of messages to keep the latest order in time. // Find the circle ID in cache and process the list of messages to keep the latest order in time.
RsGxsCircleId circle_id(it->first); RsGxsCircleId circle_id(it->first);
RsGxsCircleCache& cache( mCircleCache[circle_id] ); RsGxsCircleCache& cache( mCircleCache[circle_id] );
// First process membership messages
#ifdef DEBUG_CIRCLES #ifdef DEBUG_CIRCLES
std::cerr << " Circle found in cache!" << std::endl; std::cerr << " Processing membership messages..." << std::endl;
std::cerr << " Retrieving messages..." << std::endl;
#endif #endif
locked_processMembershipMessages(cache,it->second,messages_to_delete,own_ids_set);
for(uint32_t i=0;i<it->second.size();++i)
{
#ifdef DEBUG_CIRCLES #ifdef DEBUG_CIRCLES
std::cerr << " Group ID: " << it->second[i]->meta.mGroupId << ", Message ID: " << it->second[i]->meta.mMsgId << ", thread ID: " << it->second[i]->meta.mThreadId << ", author: " << it->second[i]->meta.mAuthorId << ": " ; std::cerr << " Now checking for auto-subscribe..." << std::endl;
#endif #endif
locked_checkCircleCacheForAutoSubscribe(cache);
}
RsGxsCircleSubscriptionRequestItem *item = dynamic_cast<RsGxsCircleSubscriptionRequestItem*>(it->second[i]) ;
if(item == NULL)
{
std::cerr << " (EE) item is not a RsGxsCircleSubscriptionRequestItem. Weird. Scheduling for deletion." << std::endl;
messages_to_delete[RsGxsGroupId(circle_id)].insert(it->second[i]->meta.mMsgId);
continue ;
}
RsGxsCircleMembershipStatus& info(cache.mMembershipStatus[item->meta.mAuthorId]) ;
#ifdef DEBUG_CIRCLES
std::cerr << " " << time(NULL) - item->time_stamp << " seconds ago, " ;
#endif
if(info.last_subscription_TS <= item->time_stamp) // the <= here allows to make sure we update the flags is something happenned
{
info.last_subscription_TS = item->time_stamp ;
if(item->subscription_type == RsGxsCircleSubscriptionType::SUBSCRIBE)
info.subscription_flags |= GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED;
else if(item->subscription_type == RsGxsCircleSubscriptionType::UNSUBSCRIBE)
info.subscription_flags &= ~GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED;
else
std::cerr << " (EE) unknown subscription order type: " << static_cast<uint32_t>(item->subscription_type) ;
mShouldSendCacheUpdateNotification = true;
#ifdef DEBUG_CIRCLES
std::cerr << " UPDATING status to " << std::hex << info.subscription_flags << std::dec << std::endl;
#endif
}
else if(info.last_subscription_TS > item->time_stamp)
std::cerr << " Too old: item->TS=" << item->time_stamp << ", last_subscription_TS=" << info.last_subscription_TS << ". IGNORING." << std::endl;
}
// now do another sweep and remove all msgs that are older than the latest
#ifdef DEBUG_CIRCLES
std::cerr << " Cleaning old messages..." << std::endl;
#endif
for(uint32_t i=0;i<it->second.size();++i)
{
RsGxsCircleMembershipStatus& info(cache.mMembershipStatus[it->second[i]->meta.mAuthorId]) ;
RsGxsCircleSubscriptionRequestItem *item = dynamic_cast<RsGxsCircleSubscriptionRequestItem*>(it->second[i]) ;
if(item && info.last_subscription_TS > item->time_stamp)
{
#ifdef DEBUG_CIRCLES
std::cerr << " " << item->meta.mMsgId << ": Older than last known (" << (long int)info.last_subscription_TS - (long int)item->time_stamp << " seconds before): deleting." << std::endl;
#endif
messages_to_delete[RsGxsGroupId(circle_id)].insert(item->meta.mMsgId) ;
}
}
cache.mLastUpdatedMembershipTS = time(NULL) ;
cache.mStatus = CircleEntryCacheStatus::UP_TO_DATE;
cache.mLastUpdateTime = time(NULL);
mShouldSendCacheUpdateNotification = true;
}
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/ RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
uint32_t token2; uint32_t token2;
RsGenExchange::deleteMsgs(token2,messages_to_delete); RsGenExchange::deleteMsgs(token2,messages_to_delete);
return true ; return true ;
} }
//====================================================================================// //====================================================================================//
// DEBUG STUFF // // DEBUG STUFF //
//====================================================================================// //====================================================================================//
bool p3GxsCircles::debug_dumpCacheEntry(RsGxsCircleCache& cache) bool p3GxsCircles::debug_dumpCacheEntry(RsGxsCircleCache& cache)
{ {
std::cerr << " Circle: " << cache.mCircleId << " status: " << cache.mStatus << " MembershipTS: " << cache.mLastUpdatedMembershipTS std::cerr << " Circle: " << cache.mCircleId
<< " UpdateTS: " << cache.mLastUpdateTime << " All Ids here: " << cache.mAllIdsHere << std::endl; << " status: " << static_cast<int>(cache.mStatus)
<< " MembershipTS: " << cache.mLastUpdatedMembershipTS
<< " UpdateTS: " << cache.mLastUpdateTime
<< " All Ids here: " << cache.mAllIdsHere
<< " Has own msg: " << cache.mHasOwnMembershipMessage << std::endl;
return true; return true;
} }

View File

@ -128,7 +128,7 @@ public:
uint32_t subscription_flags ; // combination of GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST and GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED uint32_t subscription_flags ; // combination of GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST and GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED
}; };
enum CircleEntryCacheStatus: uint8_t { enum class CircleEntryCacheStatus: uint8_t {
UNKNOWN = 0x00, // Used to detect uninitialized memory UNKNOWN = 0x00, // Used to detect uninitialized memory
NO_DATA_YET = 0x01, // Used in the constuctor NO_DATA_YET = 0x01, // Used in the constuctor
LOADING = 0x02, // When the token request to load cache has been sent and no data is present LOADING = 0x02, // When the token request to load cache has been sent and no data is present
@ -167,17 +167,18 @@ public:
bool mIsExternal; bool mIsExternal;
RsGxsCircleId mRestrictedCircleId ; // circle ID that circle is restricted to. RsGxsCircleId mRestrictedCircleId ; // circle ID that circle is restricted to.
uint32_t mGroupStatus; bool mHasOwnMembershipMessage; // Do I have a subscribe/unsubscribe message in the circle group? Will be used to determine if we subscribe to the group or not
uint32_t mGroupSubscribeFlags; uint32_t mGroupStatus; // Copy of the group status from the GXS group.
uint32_t mGroupSubscribeFlags; // Subscribe flags of the group.
#ifdef SUBSCIRCLES #ifdef SUBSCIRCLES
std::set<RsGxsCircleId> mUnprocessedCircles; std::set<RsGxsCircleId> mUnprocessedCircles;
std::set<RsGxsCircleId> mProcessedCircles; std::set<RsGxsCircleId> mProcessedCircles;
#endif #endif
std::map<RsGxsId,RsGxsCircleMembershipStatus> mMembershipStatus; std::map<RsGxsId,RsGxsCircleMembershipStatus> mMembershipStatus; // Membership status of each ID cited in the group (including the ones posting a message)
std::set<RsGxsId> mAllowedGxsIds; // IDs that are allowed in the circle and have requested membership. This is the official members list. std::set<RsGxsId> mAllowedGxsIds; // IDs that are allowed in the circle and have requested membership. This is the official members list.
std::set<RsPgpId> mAllowedNodes; std::set<RsPgpId> mAllowedNodes; // List of friend nodes allowed in the circle (local circles only)
RsPeerId mOriginator ; // peer who sent the data, in case we need to ask for ids RsPeerId mOriginator ; // peer who sent the data, in case we need to ask for ids
}; };
@ -338,7 +339,8 @@ public:
bool checkCircleCache(); bool checkCircleCache();
bool locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache &cache); bool locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache &cache);
bool locked_processLoadingCacheEntry(RsGxsCircleCache &cache); bool locked_processMembershipMessages(RsGxsCircleCache& cache,const std::vector<RsGxsMsgItem*>& items, GxsMsgReq& messages_to_delete,const std::set<RsGxsId>& own_ids);
bool locked_processLoadingCacheEntry(RsGxsCircleCache &cache);
bool locked_checkCircleCacheForMembershipUpdate(RsGxsCircleCache &cache); bool locked_checkCircleCacheForMembershipUpdate(RsGxsCircleCache &cache);
p3IdService *mIdentities; // Needed for constructing Circle Info, p3IdService *mIdentities; // Needed for constructing Circle Info,