fixed a few bugs in the new circle cache system

This commit is contained in:
csoler 2020-05-07 20:35:02 +02:00
parent 11a4b6540f
commit 5f5c2be64b
No known key found for this signature in database
GPG Key ID: 7BCA522266C0804C
3 changed files with 78 additions and 69 deletions

View File

@ -92,7 +92,8 @@ public:
class RsGxsMsgChange : public RsGxsNotify
{
public:
RsGxsMsgChange(NotifyType type, const RsGxsGroupId& gid, const RsGxsMessageId& msg_id,bool metaChange) : RsGxsNotify(gid), mNewMsgItem(nullptr),NOTIFY_TYPE(type), mMetaChange(metaChange) {}
RsGxsMsgChange(NotifyType type, const RsGxsGroupId& gid, const RsGxsMessageId& msg_id,bool metaChange)
: RsGxsNotify(gid), mMsgId(msg_id), mNewMsgItem(nullptr),NOTIFY_TYPE(type), mMetaChange(metaChange) {}
RsGxsMessageId mMsgId;
RsGxsMsgItem *mNewMsgItem;

View File

@ -40,6 +40,7 @@
/****
* #define DEBUG_CIRCLES 1
****/
#define DEBUG_CIRCLES 1
/*extern*/ RsGxsCircles* rsGxsCircles = nullptr;
@ -126,7 +127,7 @@ p3GxsCircles::p3GxsCircles( RsGeneralDataService *gds, RsNetworkExchangeService
RsTickEvent(), mIdentities(identities), mPgpUtils(pgpUtils),
mCircleMtx("p3GxsCircles"),
mCircleCache(DEFAULT_MEM_CACHE_SIZE, "GxsCircleCache" ),
mCacheUpdated(false)
mShouldSendCacheUpdateNotification(false)
{
// Kick off Cache Testing, + Others.
//RsTickEvent::schedule_in(CIRCLE_EVENT_CACHETEST, CACHETEST_PERIOD);
@ -519,7 +520,7 @@ void p3GxsCircles::service_tick()
rstime_t now = time(NULL);
if(mCacheUpdated && now > mLastCacheUpdateEvent + GXS_CIRCLE_DELAY_TO_SEND_CACHE_UPDATED_EVENT)
if(mShouldSendCacheUpdateNotification && now > mLastCacheUpdateEvent + GXS_CIRCLE_DELAY_TO_SEND_CACHE_UPDATED_EVENT)
{
if(rsEvents)
{
@ -529,7 +530,7 @@ void p3GxsCircles::service_tick()
}
mLastCacheUpdateEvent = now;
mCacheUpdated = false;
mShouldSendCacheUpdateNotification = false;
}
if(now > mLastCacheMembershipUpdateTS + GXS_CIRCLE_DELAY_TO_CHECK_MEMBERSHIP_UPDATE)
@ -569,9 +570,7 @@ void p3GxsCircles::notifyChanges(std::vector<RsGxsNotify *> &changes)
{
#ifdef DEBUG_CIRCLES
std::cerr << " Found circle Message Change Notification for group " << msgChange->mGroupId << ", msg ID " << msgChange->mMsgId << std::endl;
#endif
#ifdef DEBUG_CIRCLES
std::cerr << " Msgs for Group: " << mit->first << std::endl;
std::cerr << " Msgs for Group: " << msgChange->mGroupId << std::endl;
#endif
RsGxsCircleId circle_id(msgChange->mGroupId);
@ -602,9 +601,7 @@ void p3GxsCircles::notifyChanges(std::vector<RsGxsNotify *> &changes)
RsErr() << __PRETTY_FUNCTION__ << ": missing SubscriptionRequestItem in msg notification for msg " << msgChange->mMsgId << std::endl;
}
mCircleCache.erase(circle_id);
circles_to_reload.insert(circle_id);
mCacheUpdated = true;
}
RsGxsGroupChange *groupChange = dynamic_cast<RsGxsGroupChange *>(c);
@ -614,32 +611,36 @@ void p3GxsCircles::notifyChanges(std::vector<RsGxsNotify *> &changes)
{
const RsGxsGroupId *git(&groupChange->mGroupId);
if(!groupChange->metaChange())
#ifdef DEBUG_CIRCLES
std::cerr << " Found Group Change Notification of type " << c->getType() << std::endl;
#endif
switch(c->getType())
{
case RsGxsNotify::TYPE_RECEIVED_NEW:
case RsGxsNotify::TYPE_UPDATED:
case RsGxsNotify::TYPE_PUBLISHED:
{
#ifdef DEBUG_CIRCLES
std::cerr << " Found Group Change Notification" << std::endl;
#endif
//for(std::list<RsGxsGroupId>::iterator git = groupChange->mGrpIdList.begin(); git != groupChange->mGrpIdList.end(); ++git)
{
#ifdef DEBUG_CIRCLES
std::cerr << " Incoming Group: " << *git << ". Forcing cache load." << std::endl;
std::cerr << " Incoming/created/updated Group: " << *git << ". Forcing cache load." << std::endl;
#endif
// for new circles we need to add them to the list.
// we don't know the type of this circle here
// original behavior was to add all ids to the external ids list
// for new circles we need to add them to the list.
// we don't know the type of this circle here
// original behavior was to add all ids to the external ids list
addCircleIdToList(RsGxsCircleId(*git), 0);
addCircleIdToList(RsGxsCircleId(*git), 0);
// reset the cached circle data for this id
{
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
mCircleCache.erase(RsGxsCircleId(*git));
mCacheUpdated = true;
circles_to_reload.insert(RsGxsCircleId(*git));
}
}
circles_to_reload.insert(RsGxsCircleId(*git));
}
break;
default:
#ifdef DEBUG_CIRCLES
std::cerr << " Type: " << c->getType() << " is ignored" << std::endl;
#endif
break;
}
// Now compute which events should be sent.
if(rsEvents)
{
@ -709,15 +710,6 @@ void p3GxsCircles::notifyChanges(std::vector<RsGxsNotify *> &changes)
}
}
// reset circle from cache since the number of invitee may have changed.
{
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
mCircleCache.erase(RsGxsCircleId(*git));
mCacheUpdated = true;
circles_to_reload.insert(RsGxsCircleId(*git));
}
}
}
@ -736,50 +728,57 @@ bool p3GxsCircles::getCircleDetails(const RsGxsCircleId& id, RsGxsCircleDetails&
{
#ifdef DEBUG_CIRCLES
std::cerr << "p3GxsCircles::getCircleDetails(" << id << ")";
std::cerr << std::endl;
std::cerr << "p3GxsCircles::getCircleDetails(" << id << ")";
std::cerr << std::endl;
#endif // DEBUG_CIRCLES
{
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
bool should_reload = false;
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
RsGxsCircleCache& data(mCircleCache[id]);
if(data.mStatus < RsGxsCircleCache::CircleEntryCacheStatus::UPDATING)
if(data.mStatus < RsGxsCircleCache::CircleEntryCacheStatus::LOADING)
should_reload = true;
if(data.mStatus == RsGxsCircleCache::CircleEntryCacheStatus::LOADING)
return false;
// should also have meta data....
details.mCircleId = id;
details.mCircleName = data.mCircleName;
details.mCircleType = data.mCircleType;
details.mRestrictedCircleId = data.mRestrictedCircleId;
details.mAllowedNodes = data.mAllowedNodes;
details.mSubscriptionFlags.clear();
details.mAllowedGxsIds.clear();
details.mAmIAllowed = false ;
details.mAmIAdmin = bool(data.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN);
for(std::map<RsGxsId,RsGxsCircleMembershipStatus>::const_iterator it(data.mMembershipStatus.begin());it!=data.mMembershipStatus.end();++it)
if(!should_reload)
{
details.mSubscriptionFlags[it->first] = it->second.subscription_flags ;
details.mCircleId = id;
details.mCircleName = data.mCircleName;
if(it->second.subscription_flags == GXS_EXTERNAL_CIRCLE_FLAGS_ALLOWED)
details.mCircleType = data.mCircleType;
details.mRestrictedCircleId = data.mRestrictedCircleId;
details.mAllowedNodes = data.mAllowedNodes;
details.mSubscriptionFlags.clear();
details.mAllowedGxsIds.clear();
details.mAmIAllowed = false ;
details.mAmIAdmin = bool(data.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN);
for(std::map<RsGxsId,RsGxsCircleMembershipStatus>::const_iterator it(data.mMembershipStatus.begin());it!=data.mMembershipStatus.end();++it)
{
details.mAllowedGxsIds.insert(it->first) ;
details.mSubscriptionFlags[it->first] = it->second.subscription_flags ;
if(rsIdentity->isOwnId(it->first))
details.mAmIAllowed = true ;
if(it->second.subscription_flags == GXS_EXTERNAL_CIRCLE_FLAGS_ALLOWED)
{
details.mAllowedGxsIds.insert(it->first) ;
if(rsIdentity->isOwnId(it->first))
details.mAmIAllowed = true ;
}
}
}
return true;
return true;
}
}
/* it isn't there - add to public requests */
cache_request_load(id);
return false;
cache_request_load(id);
return false;
}
bool p3GxsCircles::getCircleExternalIdList(std::list<RsGxsCircleId> &circleIds)
@ -805,7 +804,8 @@ bool p3GxsCircles::getCircleExternalIdList(std::list<RsGxsCircleId> &circleIds)
bool p3GxsCircles::isLoaded(const RsGxsCircleId &circleId)
{
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
return (mCircleCache[circleId].mStatus >= RsGxsCircleCache::CircleEntryCacheStatus::UPDATING);
return mCircleCache.is_cached(circleId) && (mCircleCache[circleId].mStatus >= RsGxsCircleCache::CircleEntryCacheStatus::UPDATING);
}
bool p3GxsCircles::loadCircle(const RsGxsCircleId &circleId)
@ -1237,6 +1237,9 @@ bool p3GxsCircles::cache_request_load(const RsGxsCircleId &id)
RsGxsCircleCache& cache(mCircleCache[id]);
if(cache.mStatus < RsGxsCircleCache::CircleEntryCacheStatus::LOADING)
cache.mCircleId = id;
if(cache.mStatus == RsGxsCircleCache::CircleEntryCacheStatus::LOADING || cache.mStatus == RsGxsCircleCache::CircleEntryCacheStatus::UPDATING)
return false;
@ -1248,6 +1251,7 @@ bool p3GxsCircles::cache_request_load(const RsGxsCircleId &id)
cache.mStatus = RsGxsCircleCache::CircleEntryCacheStatus::LOADING;
mCirclesToLoad.insert(id);
mShouldSendCacheUpdateNotification = true;
}
if (RsTickEvent::event_count(CIRCLE_EVENT_CACHELOAD) > 0) /* its already scheduled */
@ -1360,6 +1364,7 @@ bool p3GxsCircles::cache_load_for_token(uint32_t token)
// that allows to subscribe/unsubscribe uncomplete circles
cache.mStatus = RsGxsCircleCache::CircleEntryCacheStatus::CHECKING_MEMBERSHIP;
cache.mLastUpdatedMembershipTS = 0; // force processing of membership request
locked_checkCircleCacheForMembershipUpdate(cache);
}
else
@ -1372,6 +1377,7 @@ bool p3GxsCircles::cache_load_for_token(uint32_t token)
/* schedule event to try reload gxsIds */
RsTickEvent::schedule_in(CIRCLE_EVENT_RELOADIDS, GXSID_LOAD_CYCLE, id.toStdString());
}
mShouldSendCacheUpdateNotification = true;
locked_checkCircleCacheForAutoSubscribe(cache);
}
@ -1504,6 +1510,7 @@ bool p3GxsCircles::checkCircleCache()
#endif
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
mCircleCache.applyToAllCachedEntries(*this,&p3GxsCircles::locked_checkCircleCacheForMembershipUpdate) ;
mCircleCache.applyToAllCachedEntries(*this,&p3GxsCircles::locked_checkCircleCacheForAutoSubscribe) ;
return true ;
@ -1595,7 +1602,7 @@ bool p3GxsCircles::locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache &cac
}
}
bool am_I_admin( cache.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN) ;
bool am_I_admin( cache.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN) ;
#ifdef DEBUG_CIRCLES
std::cerr << " own ID in circle: " << in_admin_list << ", own subscribe request: " << member_request << ", am I admin?: " << am_I_admin << std::endl;
@ -1621,7 +1628,7 @@ bool p3GxsCircles::locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache &cac
cache.mGroupStatus &= ~GXS_SERV::GXS_GRP_STATUS_UNPROCESSED;
mCacheUpdated = true;
mShouldSendCacheUpdateNotification = true;
return true;
}
@ -1645,7 +1652,7 @@ bool p3GxsCircles::locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache &cac
cache.mGroupStatus &= ~GXS_SERV::GXS_GRP_STATUS_UNPROCESSED;
mCacheUpdated = true;
mShouldSendCacheUpdateNotification = true;
return true ;
}
}
@ -1952,7 +1959,7 @@ bool p3GxsCircles::processMembershipRequests(uint32_t token)
else
std::cerr << " (EE) unknown subscription order type: " << static_cast<uint32_t>(item->subscription_type) ;
mCacheUpdated = true;
mShouldSendCacheUpdateNotification = true;
#ifdef DEBUG_CIRCLES
std::cerr << " UPDATING" << std::endl;
#endif
@ -1969,6 +1976,7 @@ bool p3GxsCircles::processMembershipRequests(uint32_t token)
cache.mLastUpdatedMembershipTS = time(NULL) ;
cache.mStatus = RsGxsCircleCache::CircleEntryCacheStatus::UP_TO_DATE;
cache.mLastUpdateTime = time(NULL);
mShouldSendCacheUpdateNotification = true;
}
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/

View File

@ -358,7 +358,7 @@ private:
uint32_t mDummyIdToken;
std::list<RsGxsId> mDummyPgpLinkedIds;
std::list<RsGxsId> mDummyOwnIds;
bool mCacheUpdated ;
bool mShouldSendCacheUpdateNotification ;
rstime_t mLastCacheUpdateEvent;
rstime_t mLastDebugPrintTS;