diff --git a/libretroshare/src/gxs/rsgenexchange.cc b/libretroshare/src/gxs/rsgenexchange.cc index ff71e83ac..2b88ad730 100644 --- a/libretroshare/src/gxs/rsgenexchange.cc +++ b/libretroshare/src/gxs/rsgenexchange.cc @@ -61,7 +61,9 @@ static const uint32_t INDEX_AUTHEN_ADMIN = 0x00000040; // admin key #define GXS_MASK "GXS_MASK_HACK" -#define GEN_EXCH_DEBUG 1 +/* + * #define GEN_EXCH_DEBUG 1 + */ // Data flow in RsGenExchange // diff --git a/libretroshare/src/services/p3gxscircles.cc b/libretroshare/src/services/p3gxscircles.cc index c16756f73..5253a4b46 100644 --- a/libretroshare/src/services/p3gxscircles.cc +++ b/libretroshare/src/services/p3gxscircles.cc @@ -40,6 +40,7 @@ /**** * #define DEBUG_CIRCLES 1 ****/ + #define DEBUG_CIRCLES 1 /*extern*/ RsGxsCircles* rsGxsCircles = nullptr; @@ -1074,6 +1075,7 @@ RsGxsCircleCache::RsGxsCircleCache() mLastUpdatedMembershipTS = 0 ; mStatus = CircleEntryCacheStatus::NO_DATA_YET; mAllIdsHere = false; + mHasOwnMembershipMessage = false; return; } @@ -1286,7 +1288,7 @@ bool p3GxsCircles::cache_request_load(const RsGxsCircleId &id) int32_t age = 0; if (RsTickEvent::prev_event_ago(CIRCLE_EVENT_CACHELOAD, age) && age grpIds ; @@ -1573,7 +1572,7 @@ bool p3GxsCircles::locked_checkCircleCacheForMembershipUpdate(RsGxsCircleCache& /* We need to AutoSubscribe if the Circle is relevent to us */ -bool p3GxsCircles::locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache &cache) +bool p3GxsCircles::locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache& cache) { #ifdef DEBUG_CIRCLES std::cerr << "p3GxsCircles::locked_checkCircleCacheForAutoSubscribe() : "<< cache.mCircleId << std::endl; @@ -1615,73 +1614,70 @@ bool p3GxsCircles::locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache &cac return false ; } - bool in_admin_list = false ; - bool member_request = false ; + bool am_I_invited = false ; - for(std::list::const_iterator it(myOwnIds.begin());it!=myOwnIds.end() && (!in_admin_list) && (!member_request);++it) + for(std::list::const_iterator it(myOwnIds.begin());it!=myOwnIds.end() && (!am_I_invited);++it) { std::map::const_iterator it2 = cache.mMembershipStatus.find(*it) ; if(it2 != cache.mMembershipStatus.end()) - { - in_admin_list = in_admin_list || bool(it2->second.subscription_flags & GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST) ; - member_request= member_request|| bool(it2->second.subscription_flags & GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED) ; - } + am_I_invited = am_I_invited || bool(it2->second.subscription_flags & GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST) ; } - bool am_I_admin( cache.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN) ; - -#ifdef DEBUG_CIRCLES - std::cerr << " own ID in circle: " << in_admin_list << ", own subscribe request: " << member_request << ", am I admin?: " << am_I_admin << std::endl; -#endif - if(in_admin_list || member_request || am_I_admin) - { - uint32_t token, token2; - - if(! (cache.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED)) - { -#ifdef DEBUG_CIRCLES - /* we are part of this group - subscribe, clear unprocessed flag */ - std::cerr << " I'm allowed in this circle => AutoSubscribing!" << std::endl; -#endif - RsGenExchange::subscribeToGroup(token, RsGxsGroupId(cache.mCircleId), true); - } -#ifdef DEBUG_CIRCLES - else - std::cerr << " I'm allowed in this circle, and already subscribed." << std::endl; -#endif - - RsGenExchange::setGroupStatusFlags(token2, RsGxsGroupId(cache.mCircleId), 0, GXS_SERV::GXS_GRP_STATUS_UNPROCESSED); - - cache.mGroupStatus &= ~GXS_SERV::GXS_GRP_STATUS_UNPROCESSED; + bool am_I_admin( cache.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN) ; + bool do_I_have_a_msg( cache.mHasOwnMembershipMessage ); - mShouldSendCacheUpdateNotification = true; - - return true; - } +#ifdef DEBUG_CIRCLES + std::cerr << " own ID invited in circle: " << am_I_invited << ", membership msg author: " << do_I_have_a_msg << ", admin: " << am_I_admin << std::endl; +#endif + if(do_I_have_a_msg || am_I_admin) + { + if(! (cache.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED)) + { +#ifdef DEBUG_CIRCLES + /* we are part of this group - subscribe, clear unprocessed flag */ + std::cerr << " either admin or have posted a subscribe/unsubscribe message => AutoSubscribing!" << std::endl; +#endif + uint32_t token; + RsGenExchange::subscribeToGroup(token, RsGxsGroupId(cache.mCircleId), true); + mShouldSendCacheUpdateNotification = true; + } +#ifdef DEBUG_CIRCLES + else + std::cerr << " either admin or have posted a subscribe/unsubscribe message, already subscribed." << std::endl; +#endif + + cache.mGroupStatus &= ~GXS_SERV::GXS_GRP_STATUS_UNPROCESSED; + + return true; + } else { - /* we know all the peers - we are not part - we can flag as PROCESSED. */ - uint32_t token,token2; - RsGenExchange::setGroupStatusFlags(token, RsGxsGroupId(cache.mCircleId.toStdString()), 0, GXS_SERV::GXS_GRP_STATUS_UNPROCESSED); - + /* we know all the peers - we are not part - we can flag as PROCESSED. */ if(cache.mGroupSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED) { - RsGenExchange::subscribeToGroup(token2, RsGxsGroupId(cache.mCircleId), false); + uint32_t token; + RsGenExchange::subscribeToGroup(token, RsGxsGroupId(cache.mCircleId), false); + mShouldSendCacheUpdateNotification = true; #ifdef DEBUG_CIRCLES - std::cerr << " Not part of the group! Let's unsubscribe this circle of unfriendly Napoleons!" << std::endl; + std::cerr << " Neither admin nor subscription msg author! Let's unsubscribe this circle of unfriendly Napoleons!" << std::endl; #endif } #ifdef DEBUG_CIRCLES else - std::cerr << " Not part of the group, and not subscribed either." << std::endl; + std::cerr << " Neither admin nor subscription msg author! Not subscribed either." << std::endl; #endif - - cache.mGroupStatus &= ~GXS_SERV::GXS_GRP_STATUS_UNPROCESSED; - - mShouldSendCacheUpdateNotification = true; return true ; } + +#ifdef DEBUG_CIRCLES + std::cerr << " Marking the cache entry as processed." << std::endl; +#endif + uint32_t token2; + cache.mGroupStatus &= ~GXS_SERV::GXS_GRP_STATUS_UNPROCESSED; + RsGenExchange::setGroupStatusFlags(token2, RsGxsGroupId(cache.mCircleId), 0, GXS_SERV::GXS_GRP_STATUS_UNPROCESSED); + + return true; } void p3GxsCircles::addCircleIdToList(const RsGxsCircleId &circleId, uint32_t circleType) @@ -1861,14 +1857,22 @@ bool p3GxsCircles::pushCircleMembershipRequest( return false; } - if(!getCirclesInfo( - std::list{static_cast(circle_id)}, - RS_DEFAULT_STORAGE_PARAM(std::vector) )) + if(!getCirclesInfo( std::list{static_cast(circle_id)}, RS_DEFAULT_STORAGE_PARAM(std::vector) )) { - RsErr() << __PRETTY_FUNCTION__ << " Cannot generate membership request " - << "from unknown circle: " << circle_id << std::endl; + RsErr() << __PRETTY_FUNCTION__ << " Cannot generate membership request from unknown circle: " << circle_id << std::endl; return false; } + // If the circle is not subscribed, then subscribe, whatever the subscription type. Indeed, if we publish a msg, even a msg for + // unsubscribing, we need to have a subscribed group first. + + uint32_t token ; + RsGenExchange::subscribeToGroup(token, RsGxsGroupId(circle_id), true); + + if(waitToken(token) != RsTokenService::COMPLETE) + { + std::cerr << __PRETTY_FUNCTION__ << " Could not subscribe to Circle group." << std::endl; + return false; + } // Create a subscribe item @@ -1898,11 +1902,7 @@ bool p3GxsCircles::pushCircleMembershipRequest( std::cerr << " AuthorId : " << s->meta.mAuthorId << std::endl; std::cerr << " ThreadId : " << s->meta.mThreadId << std::endl; #endif - uint32_t token ; - - if(request_type == RsGxsCircleSubscriptionType::SUBSCRIBE) - RsGenExchange::subscribeToGroup(token, RsGxsGroupId(circle_id), true); - + RsGenExchange::publishMsg(token, s); // update the cache. @@ -1920,13 +1920,93 @@ bool p3GxsCircles::cancelCircleMembership(const RsGxsId& own_gxsid,const RsGxsCi return pushCircleMembershipRequest(own_gxsid,circle_id,RsGxsCircleSubscriptionType::UNSUBSCRIBE) ; } +bool p3GxsCircles::locked_processMembershipMessages(RsGxsCircleCache& cache, const std::vector& items, GxsMsgReq& messages_to_delete, const std::set &own_ids) +{ +#ifdef DEBUG_CIRCLES + std::cerr << " Circle found in cache!" << std::endl; + std::cerr << " Retrieving messages..." << std::endl; +#endif + cache.mHasOwnMembershipMessage = false; // default + + for(uint32_t i=0;imeta.mGroupId << ", Message ID: " << items[i]->meta.mMsgId << ", thread ID: " << items[i]->meta.mThreadId << ", author: " << items[i]->meta.mAuthorId << ": " ; +#endif + RsGxsCircleSubscriptionRequestItem *item = dynamic_cast(items[i]) ; + + if(item == NULL) + { + std::cerr << " (EE) item is not a RsGxsCircleSubscriptionRequestItem. Weird. Scheduling for deletion." << std::endl; + + messages_to_delete[RsGxsGroupId(cache.mCircleId)].insert(item->meta.mMsgId); + continue ; + } + + RsGxsCircleMembershipStatus& info(cache.mMembershipStatus[item->meta.mAuthorId]) ; + +#ifdef DEBUG_CIRCLES + std::cerr << " " << time(NULL) - item->time_stamp << " seconds ago, " ; +#endif + + if(info.last_subscription_TS <= item->time_stamp) // the <= here allows to make sure we update the flags is something happenned + { + info.last_subscription_TS = item->time_stamp ; + + if(item->subscription_type == RsGxsCircleSubscriptionType::SUBSCRIBE) + info.subscription_flags |= GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED; + else if(item->subscription_type == RsGxsCircleSubscriptionType::UNSUBSCRIBE) + info.subscription_flags &= ~GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED; + else + std::cerr << " (EE) unknown subscription order type: " << static_cast(item->subscription_type) ; + + mShouldSendCacheUpdateNotification = true; +#ifdef DEBUG_CIRCLES + std::cerr << " UPDATING status to " << std::hex << info.subscription_flags << std::dec << std::endl; +#endif + + if(own_ids.end() != own_ids.find(item->meta.mAuthorId)) // we have at least one subscribe/unsubscribe message. So we update the flag accordingly. + cache.mHasOwnMembershipMessage = true; + } + else if(info.last_subscription_TS > item->time_stamp) + std::cerr << " Too old: item->TS=" << item->time_stamp << ", last_subscription_TS=" << info.last_subscription_TS << ". IGNORING." << std::endl; + } + + // now do another sweep and remove all msgs that are older than the latest + +#ifdef DEBUG_CIRCLES + std::cerr << " Cleaning older messages..." << std::endl; +#endif + + for(uint32_t i=0;imeta.mAuthorId]) ; + RsGxsCircleSubscriptionRequestItem *item = dynamic_cast(items[i]) ; + + if(item && info.last_subscription_TS > item->time_stamp) + { +#ifdef DEBUG_CIRCLES + std::cerr << " " << item->meta.mMsgId << ": Older than last known (" << (long int)info.last_subscription_TS - (long int)item->time_stamp << " seconds before): deleting." << std::endl; +#endif + messages_to_delete[RsGxsGroupId(cache.mCircleId)].insert(item->meta.mMsgId) ; + } + } + + cache.mLastUpdatedMembershipTS = time(NULL) ; + cache.mStatus = CircleEntryCacheStatus::UP_TO_DATE; + cache.mLastUpdateTime = time(NULL); + mShouldSendCacheUpdateNotification = true; + + return true; +} + bool p3GxsCircles::processMembershipRequests(uint32_t token) { // Go through membership request messages and process them according to the following rule: // * for each ID only keep the latest membership request. Delete the older ones. // * for each circle, keep a list of IDs sorted into membership categories (e.g. keep updated flags for each IDs) // Because msg loading is async-ed, the job in split in two methods: one calls the loading, the other one handles the loaded data. - + #ifdef DEBUG_CIRCLES std::cerr << "Processing circle membership requests." << std::endl; #endif @@ -1934,111 +2014,59 @@ bool p3GxsCircles::processMembershipRequests(uint32_t token) if(!RsGenExchange::getMsgData(token, msgItems)) { - std::cerr << "(EE) Cannot get msg data for circle. Something's weird." << std::endl; - return false; + std::cerr << "(EE) Cannot get msg data for circle. Something's weird." << std::endl; + return false; } - + + std::list own_ids ; + rsIdentity->getOwnIds(own_ids); + std::set own_ids_set; + for(auto& id:own_ids) + own_ids_set.insert(id); + GxsMsgReq messages_to_delete ; for(GxsMsgDataMap::const_iterator it(msgItems.begin());it!=msgItems.end();++it) - { - RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/ + { + RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/ #ifdef DEBUG_CIRCLES - std::cerr << " Circle ID: " << it->first << std::endl; + std::cerr << " Circle ID: " << it->first << std::endl; #endif - // Find the circle ID in cache and process the list of messages to keep the latest order in time. + // Find the circle ID in cache and process the list of messages to keep the latest order in time. RsGxsCircleId circle_id(it->first); - RsGxsCircleCache& cache( mCircleCache[circle_id] ); + RsGxsCircleCache& cache( mCircleCache[circle_id] ); + // First process membership messages #ifdef DEBUG_CIRCLES - std::cerr << " Circle found in cache!" << std::endl; - std::cerr << " Retrieving messages..." << std::endl; + std::cerr << " Processing membership messages..." << std::endl; #endif - - for(uint32_t i=0;isecond.size();++i) - { + locked_processMembershipMessages(cache,it->second,messages_to_delete,own_ids_set); #ifdef DEBUG_CIRCLES - std::cerr << " Group ID: " << it->second[i]->meta.mGroupId << ", Message ID: " << it->second[i]->meta.mMsgId << ", thread ID: " << it->second[i]->meta.mThreadId << ", author: " << it->second[i]->meta.mAuthorId << ": " ; + std::cerr << " Now checking for auto-subscribe..." << std::endl; #endif + locked_checkCircleCacheForAutoSubscribe(cache); + } - RsGxsCircleSubscriptionRequestItem *item = dynamic_cast(it->second[i]) ; - - if(item == NULL) - { - std::cerr << " (EE) item is not a RsGxsCircleSubscriptionRequestItem. Weird. Scheduling for deletion." << std::endl; - - messages_to_delete[RsGxsGroupId(circle_id)].insert(it->second[i]->meta.mMsgId); - continue ; - } - - RsGxsCircleMembershipStatus& info(cache.mMembershipStatus[item->meta.mAuthorId]) ; - -#ifdef DEBUG_CIRCLES - std::cerr << " " << time(NULL) - item->time_stamp << " seconds ago, " ; -#endif - - if(info.last_subscription_TS <= item->time_stamp) // the <= here allows to make sure we update the flags is something happenned - { - info.last_subscription_TS = item->time_stamp ; - - if(item->subscription_type == RsGxsCircleSubscriptionType::SUBSCRIBE) - info.subscription_flags |= GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED; - else if(item->subscription_type == RsGxsCircleSubscriptionType::UNSUBSCRIBE) - info.subscription_flags &= ~GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED; - else - std::cerr << " (EE) unknown subscription order type: " << static_cast(item->subscription_type) ; - - mShouldSendCacheUpdateNotification = true; -#ifdef DEBUG_CIRCLES - std::cerr << " UPDATING status to " << std::hex << info.subscription_flags << std::dec << std::endl; -#endif - } - else if(info.last_subscription_TS > item->time_stamp) - std::cerr << " Too old: item->TS=" << item->time_stamp << ", last_subscription_TS=" << info.last_subscription_TS << ". IGNORING." << std::endl; - } - - // now do another sweep and remove all msgs that are older than the latest - -#ifdef DEBUG_CIRCLES - std::cerr << " Cleaning old messages..." << std::endl; -#endif - - for(uint32_t i=0;isecond.size();++i) - { - RsGxsCircleMembershipStatus& info(cache.mMembershipStatus[it->second[i]->meta.mAuthorId]) ; - RsGxsCircleSubscriptionRequestItem *item = dynamic_cast(it->second[i]) ; - - if(item && info.last_subscription_TS > item->time_stamp) - { -#ifdef DEBUG_CIRCLES - std::cerr << " " << item->meta.mMsgId << ": Older than last known (" << (long int)info.last_subscription_TS - (long int)item->time_stamp << " seconds before): deleting." << std::endl; -#endif - messages_to_delete[RsGxsGroupId(circle_id)].insert(item->meta.mMsgId) ; - } - } - - cache.mLastUpdatedMembershipTS = time(NULL) ; - cache.mStatus = CircleEntryCacheStatus::UP_TO_DATE; - cache.mLastUpdateTime = time(NULL); - mShouldSendCacheUpdateNotification = true; - } - RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/ uint32_t token2; RsGenExchange::deleteMsgs(token2,messages_to_delete); - return true ; } + //====================================================================================// // DEBUG STUFF // //====================================================================================// bool p3GxsCircles::debug_dumpCacheEntry(RsGxsCircleCache& cache) { - std::cerr << " Circle: " << cache.mCircleId << " status: " << cache.mStatus << " MembershipTS: " << cache.mLastUpdatedMembershipTS - << " UpdateTS: " << cache.mLastUpdateTime << " All Ids here: " << cache.mAllIdsHere << std::endl; + std::cerr << " Circle: " << cache.mCircleId + << " status: " << static_cast(cache.mStatus) + << " MembershipTS: " << cache.mLastUpdatedMembershipTS + << " UpdateTS: " << cache.mLastUpdateTime + << " All Ids here: " << cache.mAllIdsHere + << " Has own msg: " << cache.mHasOwnMembershipMessage << std::endl; return true; } diff --git a/libretroshare/src/services/p3gxscircles.h b/libretroshare/src/services/p3gxscircles.h index 1b7986014..373ea21cd 100644 --- a/libretroshare/src/services/p3gxscircles.h +++ b/libretroshare/src/services/p3gxscircles.h @@ -128,7 +128,7 @@ public: uint32_t subscription_flags ; // combination of GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST and GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED }; -enum CircleEntryCacheStatus: uint8_t { +enum class CircleEntryCacheStatus: uint8_t { UNKNOWN = 0x00, // Used to detect uninitialized memory NO_DATA_YET = 0x01, // Used in the constuctor LOADING = 0x02, // When the token request to load cache has been sent and no data is present @@ -167,17 +167,18 @@ public: bool mIsExternal; RsGxsCircleId mRestrictedCircleId ; // circle ID that circle is restricted to. - uint32_t mGroupStatus; - uint32_t mGroupSubscribeFlags; + bool mHasOwnMembershipMessage; // Do I have a subscribe/unsubscribe message in the circle group? Will be used to determine if we subscribe to the group or not + uint32_t mGroupStatus; // Copy of the group status from the GXS group. + uint32_t mGroupSubscribeFlags; // Subscribe flags of the group. #ifdef SUBSCIRCLES std::set mUnprocessedCircles; std::set mProcessedCircles; #endif - std::map mMembershipStatus; + std::map mMembershipStatus; // Membership status of each ID cited in the group (including the ones posting a message) - std::set mAllowedGxsIds; // IDs that are allowed in the circle and have requested membership. This is the official members list. - std::set mAllowedNodes; + std::set mAllowedGxsIds; // IDs that are allowed in the circle and have requested membership. This is the official members list. + std::set mAllowedNodes; // List of friend nodes allowed in the circle (local circles only) RsPeerId mOriginator ; // peer who sent the data, in case we need to ask for ids }; @@ -338,7 +339,8 @@ public: bool checkCircleCache(); bool locked_checkCircleCacheForAutoSubscribe(RsGxsCircleCache &cache); - bool locked_processLoadingCacheEntry(RsGxsCircleCache &cache); + bool locked_processMembershipMessages(RsGxsCircleCache& cache,const std::vector& items, GxsMsgReq& messages_to_delete,const std::set& own_ids); + bool locked_processLoadingCacheEntry(RsGxsCircleCache &cache); bool locked_checkCircleCacheForMembershipUpdate(RsGxsCircleCache &cache); p3IdService *mIdentities; // Needed for constructing Circle Info,