finished implementing the backend part for the subscription system

This commit is contained in:
csoler 2016-05-18 21:13:54 -04:00
parent 89472d6502
commit 722609a3e6
10 changed files with 344 additions and 414 deletions

View File

@ -213,9 +213,11 @@ class RsGcxs
virtual int canSend(const RsGxsCircleId &circleId, const RsPgpId &id,bool& should_encrypt) = 0;
virtual int canReceive(const RsGxsCircleId &circleId, const RsPgpId &id) = 0;
virtual bool recipients(const RsGxsCircleId &circleId, std::list<RsPgpId>& friendlist) = 0;
virtual bool recipients(const RsGxsCircleId &circleId, std::list<RsGxsId>& idlist) = 0;
virtual bool isRecipient(const RsGxsCircleId &circleId, const RsGxsId& id) = 0;
virtual bool recipients(const RsGxsCircleId &circleId, const RsGxsGroupId& destination_group, std::list<RsGxsId>& idlist) = 0;
virtual bool isRecipient(const RsGxsCircleId &circleId, const RsGxsGroupId& destination_group, const RsGxsId& id) = 0;
virtual bool getLocalCircleServerUpdateTS(const RsGxsCircleId& gid,time_t& grp_server_update_TS,time_t& msg_server_update_TS) =0;
};

View File

@ -761,8 +761,7 @@ void RsGxsNetService::syncWithPeers()
RsNxsItem *encrypted_item = NULL ;
uint32_t status ;
#warning pass on the group id here so as to know which list to encrypt to (admin/members)
if(encryptSingleNxsItem(msg, encrypt_to_this_circle_id, encrypted_item, status))
if(encryptSingleNxsItem(msg, encrypt_to_this_circle_id, grpId, encrypted_item, status))
sendItem(encrypted_item) ;
else
std::cerr << "(WW) could not encrypt for circle ID " << encrypt_to_this_circle_id << ". Not yet in cache?" << std::endl;
@ -1271,8 +1270,7 @@ bool RsGxsNetService::locked_createTransactionFromPending(GrpCircleIdRequestVett
RsNxsItem *encrypted_item = NULL ;
uint32_t status = RS_NXS_ITEM_ENCRYPTION_STATUS_UNKNOWN ;
#warning pass on the group id here so as to know which list to encrypt to (admin/members)
if(encryptSingleNxsItem(gItem, entry.mCircleId, encrypted_item,status))
if(encryptSingleNxsItem(gItem, entry.mCircleId, entry.mGroupId,encrypted_item,status))
{
itemL.push_back(encrypted_item) ;
delete gItem ;
@ -1323,8 +1321,7 @@ bool RsGxsNetService::locked_createTransactionFromPending(MsgCircleIdsRequestVet
RsNxsItem *encrypted_item = NULL ;
uint32_t status = RS_NXS_ITEM_ENCRYPTION_STATUS_UNKNOWN ;
#warning pass on the group id here so as to know which list to encrypt to (admin/members)
if(encryptSingleNxsItem(mItem,msgPend->mCircleId,encrypted_item,status))
if(encryptSingleNxsItem(mItem,msgPend->mCircleId,msgPend->mGrpId,encrypted_item,status))
{
itemL.push_back(encrypted_item) ;
delete mItem ;
@ -3668,7 +3665,7 @@ bool RsGxsNetService::locked_addTransaction(NxsTransaction* tr)
// Returns false when the keys are not loaded. Question to solve: what do we do if we miss some keys??
// We should probably send anyway.
bool RsGxsNetService::encryptSingleNxsItem(RsNxsItem *item, const RsGxsCircleId& destination_circle, RsNxsItem *&encrypted_item, uint32_t& status)
bool RsGxsNetService::encryptSingleNxsItem(RsNxsItem *item, const RsGxsCircleId& destination_circle, const RsGxsGroupId& destination_group, RsNxsItem *&encrypted_item, uint32_t& status)
{
encrypted_item = NULL ;
status = RS_NXS_ITEM_ENCRYPTION_STATUS_UNKNOWN ;
@ -3681,8 +3678,7 @@ bool RsGxsNetService::encryptSingleNxsItem(RsNxsItem *item, const RsGxsCircleId&
std::list<RsGxsId> recipients ;
#warning recipients should take the destination group ID as parameter and use it to know if is will use the admin list or the members list.
if(!mCircles->recipients(destination_circle,recipients))
if(!mCircles->recipients(destination_circle,destination_group,recipients))
{
std::cerr << " (EE) Cannot encrypt transaction: recipients list not available. Should re-try later." << std::endl;
status = RS_NXS_ITEM_ENCRYPTION_STATUS_CIRCLE_ERROR ;
@ -4051,8 +4047,8 @@ void RsGxsNetService::handleRecvSyncGroup(RsNxsSyncGrpReqItem *item)
#endif
RsNxsItem *encrypted_item = NULL ;
uint32_t status = RS_NXS_ITEM_ENCRYPTION_STATUS_UNKNOWN ;
#warning pass on the group id here so as to know which list to encrypt to (admin/members)
if(encryptSingleNxsItem(gItem, grpMeta->mCircleId, encrypted_item,status))
if(encryptSingleNxsItem(gItem, grpMeta->mCircleId,mit->first, encrypted_item,status))
{
itemL.push_back(encrypted_item) ;
delete gItem ;
@ -4475,8 +4471,7 @@ void RsGxsNetService::handleRecvSyncMessage(RsNxsSyncMsgReqItem *item,bool item_
RsNxsItem *encrypted_item = NULL ;
uint32_t status = RS_NXS_ITEM_ENCRYPTION_STATUS_UNKNOWN ;
#warning pass on the group id here so as to know which list to encrypt to (admin/members)
if(encryptSingleNxsItem(mItem, grpMeta->mCircleId, encrypted_item,status))
if(encryptSingleNxsItem(mItem, grpMeta->mCircleId,m->mGroupId, encrypted_item,status))
{
itemL.push_back(encrypted_item) ;
delete mItem ;
@ -4624,7 +4619,7 @@ bool RsGxsNetService::canSendMsgIds(std::vector<RsGxsMsgMetaData*>& msgMetas, co
++i ;
else
{
if(mCircles->isLoaded(circleId) && mCircles->isRecipient(circleId, msgMetas[i]->mAuthorId))
if(mCircles->isLoaded(circleId) && mCircles->isRecipient(circleId, grpMeta.mGroupId, msgMetas[i]->mAuthorId))
{
++i ;
continue ;

View File

@ -454,7 +454,7 @@ private:
/*!
* encrypts/decrypts the transaction for the destination circle id.
*/
bool encryptSingleNxsItem(RsNxsItem *item, const RsGxsCircleId& destination_circle, RsNxsItem *& encrypted_item, uint32_t &status) ;
bool encryptSingleNxsItem(RsNxsItem *item, const RsGxsCircleId& destination_circle, const RsGxsGroupId &destination_group, RsNxsItem *& encrypted_item, uint32_t &status) ;
bool decryptSingleNxsItem(const RsNxsEncryptedDataItem *encrypted_item, RsNxsItem *&nxsitem, std::vector<RsTlvSecurityKey> *private_keys=NULL);
bool processTransactionForDecryption(NxsTransaction *tr); // return false when the keys are not loaded => need retry later

View File

@ -291,7 +291,7 @@ bool MsgCircleIdsRequestVetting::cleared()
}
for(uint32_t i=0;i<mMsgs.size();)
if(!mCircles->isRecipient(mCircleId,mMsgs[i].mAuthorId))
if(!mCircles->isRecipient(mCircleId,mGrpId,mMsgs[i].mAuthorId))
{
std::cerr << "(WW) MsgCircleIdsRequestVetting::cleared() filtering out message " << mMsgs[i].mMsgId << " because it's signed by author " << mMsgs[i].mAuthorId << " which is not in circle " << mCircleId << std::endl;

View File

@ -61,7 +61,8 @@ static const uint32_t GXS_CIRCLE_TYPE_YOUR_EYES_ONLY = 0x0006 ; // distribute
static const uint32_t GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST = 0x0001 ;// user is validated by circle admin
static const uint32_t GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED = 0x0002 ;// user has subscribed the group
static const uint32_t GXS_EXTERNAL_CIRCLE_FLAGS_ALLOWED = 0x0003 ;// user is allowed. Combines both flags above.
static const uint32_t GXS_EXTERNAL_CIRCLE_FLAGS_KEY_AVAILABLE = 0x0004 ;// key is available, so we can encrypt for this circle
static const uint32_t GXS_EXTERNAL_CIRCLE_FLAGS_ALLOWED = 0x0007 ;// user is allowed. Combines all flags above.
static const uint32_t GXS_CIRCLE_FLAGS_IS_EXTERNAL = 0x0008 ;// user is allowed
@ -111,7 +112,6 @@ class RsGxsCircleDetails
std::string mCircleName;
uint32_t mCircleType;
uint32_t mSubscribeFlags ;
bool mAmIAllowed ;

View File

@ -195,7 +195,7 @@ std::ostream& RsGxsCircleGroupItem::print(std::ostream& out, uint16_t indent)
return out;
}
uint32_t RsGxsCircleSerialiser::sizeGxsCircleSubscriptionRequestItem(RsGxsCircleSubscriptionRequestItem *item)
uint32_t RsGxsCircleSerialiser::sizeGxsCircleSubscriptionRequestItem(RsGxsCircleSubscriptionRequestItem * /* item */)
{
uint32_t s=8 ; // header

View File

@ -32,6 +32,7 @@
#include "util/rsstring.h"
#include "pgp/pgpauxutils.h"
#include "retroshare/rsgxscircles.h"
#include <sstream>
#include <stdio.h>
@ -100,6 +101,7 @@ RsGxsCircles *rsGxsCircles = NULL;
#define GXSID_LOAD_CYCLE 10 // GXSID completes a load in this period.
#define MIN_CIRCLE_LOAD_GAP 5
#define GXS_CIRCLE_DELAY_TO_FORCE_MEMBERSHIP_UPDATE 1200 // re-check every 20 mins. Normally this shouldn't be necessary since notifications inform abotu new messages.
/********************************************************************************/
/******************* Startup / Tick ******************************************/
@ -126,6 +128,13 @@ p3GxsCircles::p3GxsCircles(RsGeneralDataService *gds, RsNetworkExchangeService *
mDummyIdToken = 0;
}
static bool allowedGxsIdFlagTest(uint32_t subscription_flags,bool group_is_self_restricted)
{
if(group_is_self_restricted)
return (subscription_flags & GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST) && (subscription_flags & GXS_EXTERNAL_CIRCLE_FLAGS_KEY_AVAILABLE);
else
return (subscription_flags & GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST) && (subscription_flags & GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED) && (subscription_flags & GXS_EXTERNAL_CIRCLE_FLAGS_KEY_AVAILABLE);
}
const std::string GXS_CIRCLES_APP_NAME = "gxscircle";
const uint16_t GXS_CIRCLES_APP_MAJOR_VERSION = 1;
@ -270,13 +279,19 @@ bool p3GxsCircles:: getCircleDetails(const RsGxsCircleId &id, RsGxsCircleDetails
details.mCircleType = data.mCircleType;
details.mAllowedGxsIds = data.mAllowedGxsIds;
details.mAllowedNodes = data.mAllowedNodes;
details.mSubscriptionFlags.clear();
details.mAllowedGxsIds.clear();
for(std::map<RsGxsId,RsGxsCircleMembershipStatus>::const_iterator it(data.mMembershipStatus.begin());it!=data.mMembershipStatus.end();++it)
{
details.mSubscriptionFlags[it->first] = it->second.subscription_flags ;
if(it->second.subscription_flags == GXS_EXTERNAL_CIRCLE_FLAGS_ALLOWED)
details.mAllowedGxsIds.insert(it->first) ;
}
details.mAmIAllowed = data.mAmIAllowed ;
#warning p3GxsCircles::getCircleDetails(): make this right later when the backend is there
details.mSubscribeFlags = GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST ;
return true;
}
}
@ -396,31 +411,34 @@ bool p3GxsCircles::recipients(const RsGxsCircleId &circleId, std::list<RsPgpId>&
return false;
}
bool p3GxsCircles::isRecipient(const RsGxsCircleId &circleId, const RsGxsId& id)
bool p3GxsCircles::isRecipient(const RsGxsCircleId &circleId, const RsGxsGroupId& destination_group, const RsGxsId& id)
{
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
if (mCircleCache.is_cached(circleId))
{
const RsGxsCircleCache &data = mCircleCache.ref(circleId);
return data.isAllowedPeer(id);
return data.isAllowedPeer(id,destination_group);
}
return false;
}
// This function (will) use the destination group for the transaction in order to decide which list of
// This function uses the destination group for the transaction in order to decide which list of
// keys to ecnrypt to. When sending to a self-restricted group, the list of recipients is extended to
// the admin list rather than just the members list.
#warning TODO
bool p3GxsCircles::recipients(const RsGxsCircleId& circleId, std::list<RsGxsId>& gxs_ids)
bool p3GxsCircles::recipients(const RsGxsCircleId& circleId, const RsGxsGroupId& dest_group, std::list<RsGxsId>& gxs_ids)
{
RsGxsCircleDetails details ;
gxs_ids.clear() ;
if(!getCircleDetails(circleId, details))
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
if (!mCircleCache.is_cached(circleId))
return false ;
for(std::set<RsGxsId>::const_iterator it(details.mAllowedGxsIds.begin());it!=details.mAllowedGxsIds.end();++it)
gxs_ids.push_back(*it) ;
const RsGxsCircleCache& cache = mCircleCache.ref(circleId);
for(std::map<RsGxsId,RsGxsCircleMembershipStatus>::const_iterator it(cache.mMembershipStatus.begin());it!=cache.mMembershipStatus.end();++it)
if(allowedGxsIdFlagTest(it->second.subscription_flags, RsGxsCircleId(dest_group) == circleId))
gxs_ids.push_back(it->first) ;
return true;
}
@ -535,7 +553,6 @@ RsGxsCircleCache::RsGxsCircleCache()
{
mCircleType = GXS_CIRCLE_TYPE_EXTERNAL;
mIsExternal = true;
mAmIAllowed = false ;
mUpdateTime = 0;
mGroupStatus = 0;
mGroupSubscribeFlags = 0;
@ -556,7 +573,18 @@ bool RsGxsCircleCache::loadBaseCircle(const RsGxsCircleGroup &circle)
mIsExternal = (mCircleType != GXS_CIRCLE_TYPE_LOCAL);
mGroupStatus = circle.mMeta.mGroupStatus;
mGroupSubscribeFlags = circle.mMeta.mSubscribeFlags;
mAmIAllowed = false ; // by default. Will be computed later.
mOriginator = circle.mMeta.mOriginator ;
mAllowedNodes = circle.mLocalFriends ;
mMembershipStatus.clear() ;
for(std::set<RsGxsId>::const_iterator it(circle.mInvitedMembers.begin());it!=circle.mInvitedMembers.end();++it)
{
RsGxsCircleMembershipStatus& s(mMembershipStatus[*it]) ;
s.last_subscription_TS = 0 ;
s.subscription_flags = GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST ;
}
#ifdef DEBUG_CIRCLES
std::cerr << "RsGxsCircleCache::loadBaseCircle(" << mCircleId << ")";
@ -590,16 +618,14 @@ bool RsGxsCircleCache::getAllowedPeersList(std::list<RsPgpId>& friendlist) const
return true;
}
bool RsGxsCircleCache::isAllowedPeer(const RsGxsId& id) const
bool RsGxsCircleCache::isAllowedPeer(const RsGxsId& id,const RsGxsGroupId& destination_group) const
{
std::map<RsGxsId,RsGxsCircleMembershipStatus>::const_iterator it = mMembershipStatusMap.find(id) ;
std::map<RsGxsId,RsGxsCircleMembershipStatus>::const_iterator it = mMembershipStatus.find(id) ;
if(it == mMembershipStatusMap.end())
if(it == mMembershipStatus.end())
return false ;
// Peers are admitted if in admin list. This is the condition for messages to propagate.
return !!(it->second & GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST) ;
return allowedGxsIdFlagTest(it->second.subscription_flags, RsGxsGroupId(mCircleId) == destination_group) ;
}
bool RsGxsCircleCache::isAllowedPeer(const RsPgpId &id) const
@ -926,10 +952,15 @@ bool p3GxsCircles::cache_load_for_token(uint32_t token)
#endif // DEBUG_CIRCLES
std::vector<RsGxsGrpItem*> grpData;
bool ok = RsGenExchange::getGroupData(token, grpData);
if(ok)
if(!RsGenExchange::getGroupData(token, grpData))
{
std::cerr << "p3GxsCircles::cache_load_for_token() ERROR no data";
std::cerr << std::endl;
return false;
}
std::vector<RsGxsGrpItem*>::iterator vit = grpData.begin();
for(; vit != grpData.end(); ++vit)
@ -952,9 +983,12 @@ bool p3GxsCircles::cache_load_for_token(uint32_t token)
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
/* should already have a LoadingCache entry */
RsGxsCircleId id = RsGxsCircleId(item->meta.mGroupId.toStdString());
std::map<RsGxsCircleId, RsGxsCircleCache>::iterator it;
it = mLoadingCache.find(id);
RsGxsCircleId id = RsGxsCircleId(item->meta.mGroupId) ;
// (cyril) I'm not sure this logic is needed. The token system would avoid duplicates normally.
std::map<RsGxsCircleId, RsGxsCircleCache>::iterator it = mLoadingCache.find(id);
if (it == mLoadingCache.end())
{
std::cerr << "p3GxsCircles::cache_load_for_token() Load ERROR: ";
@ -966,73 +1000,67 @@ bool p3GxsCircles::cache_load_for_token(uint32_t token)
}
RsGxsCircleCache& cache = it->second;
cache.loadBaseCircle(group);
cache.mOriginator = item->meta.mOriginator ;
cache.mAmIAllowed = false;
delete item;
if(locked_processLoadingCacheEntry(it->second))
{
std::cerr << " All peers available. Moving to cache..." << std::endl;
mLoadingCache.erase(it);
}
else
{
std::cerr << " Unprocessed peers. Requesting reload..." << std::endl;
bool isComplete = true;
/* schedule event to try reload gxsIds */
RsTickEvent::schedule_in(CIRCLE_EVENT_RELOADIDS, GXSID_LOAD_CYCLE, id.toStdString());
}
}
return true;
}
// This method parses the cache entry and makes sure that all ids are known. If not, requests the missing ids
// when done, the entry is removed from mLoadingCache
bool p3GxsCircles::locked_processLoadingCacheEntry(RsGxsCircleCache& cache)
{
bool isUnprocessedPeers = false;
if (cache.mIsExternal)
{
#ifdef DEBUG_CIRCLES
std::cerr << " Loading External Circle" << std::endl;
#endif
std::set<RsGxsId> &peers = group.mInvitedMembers;
std::set<RsGxsId>::const_iterator pit;
std::list<RsGxsId> myOwnIds;
if(rsIdentity->getOwnIds(myOwnIds))
{
bool ownIdInCircle = false ;
for(std::list<RsGxsId>::const_iterator it(myOwnIds.begin());it!=myOwnIds.end() && !ownIdInCircle;++it)
ownIdInCircle = ownIdInCircle || (peers.find(*it) != peers.end()) ;
#ifdef DEBUG_CIRCLES
std::cerr << " own ID in circle: " << ownIdInCircle << std::endl;
#endif
cache.mAmIAllowed = ownIdInCircle;
}
else
{
std::cerr << " own ids not loaded yet." << std::endl;
isComplete = false;
isUnprocessedPeers = true;
}
// need to trigger the searches.
for(pit = peers.begin(); pit != peers.end(); ++pit)
for(std::map<RsGxsId,RsGxsCircleMembershipStatus>::iterator pit = cache.mMembershipStatus.begin(); pit != cache.mMembershipStatus.end(); ++pit)
{
#ifdef DEBUG_CIRCLES
std::cerr << " Invited Member: " << *pit ;
std::cerr << " Member status: " << pit->first << " : " << pit->second.subscription_flags;
#endif
/* check cache */
if (mIdentities->haveKey(*pit))
cache.mAllowedGxsIds.insert(*pit);
if(!(pit->second.subscription_flags & GXS_EXTERNAL_CIRCLE_FLAGS_KEY_AVAILABLE))
if(mIdentities->haveKey(pit->first))
{
pit->second.subscription_flags |= GXS_EXTERNAL_CIRCLE_FLAGS_KEY_AVAILABLE;
#ifdef DEBUG_CIRCLES
std::cerr << " Key is now available!"<< std::endl;
#endif
}
else
{
#ifdef DEBUG_CIRCLES
std::cerr << " Requesting UnprocessedPeer: " << *pit << std::endl;
std::cerr << " Requesting unknown/unloaded identity: " << *pit << " to originator " << cache.mOriginator << std::endl;
#endif
std::list<PeerId> peers;
peers.push_back(cache.mOriginator) ;
mIdentities->requestKey(*pit, peers);
mIdentities->requestKey(pit->first, peers);
/* store in to_process queue. */
cache.mUnprocessedPeers.insert(*pit);
isComplete = false;
isUnprocessedPeers = true;
}
}
@ -1073,76 +1101,28 @@ bool p3GxsCircles::cache_load_for_token(uint32_t token)
#endif
#endif
}
else
{
#ifdef DEBUG_CIRCLES
else
std::cerr << " Loading Personal Circle" << std::endl;
#endif
// LOCAL Load.
std::set<RsPgpId> &peers = group.mLocalFriends;
std::set<RsPgpId>::const_iterator pit;
// need to trigger the searches.
for(pit = peers.begin(); pit != peers.end(); ++pit)
{
#ifdef DEBUG_CIRCLES
std::cerr << " Local Friend: " << *pit << std::endl;
#endif
cache.addLocalFriend(*pit);
}
isComplete = true;
isUnprocessedPeers = false;
}
// we can check for self inclusion in the circle right away, since own ids are always loaded.
// We can check for self inclusion in the circle right away, since own ids are always loaded.
// that allows to subscribe/unsubscribe uncomplete circles
checkCircleCacheForAutoSubscribe(cache);
if (isComplete)
{
if(isUnprocessedPeers)
return false ;
/* move straight into the cache */
mCircleCache.store(id, cache);
mCircleCache.store(cache.mCircleId, cache);
mCircleCache.resize();
/* remove from loading queue */
mLoadingCache.erase(it);
std::cerr << " Loading complete." << std::endl;
}
if (isUnprocessedPeers)
{
std::cerr << " Unprocessed peers. Requesting reload..." << std::endl;
/* schedule event to try reload gxsIds */
RsTickEvent::schedule_in(CIRCLE_EVENT_RELOADIDS, GXSID_LOAD_CYCLE, id.toStdString());
}
}
}
else
{
std::cerr << "p3GxsCircles::cache_load_for_token() ERROR no data";
std::cerr << std::endl;
return false;
}
#ifdef HANDLE_SUBCIRCLES
#if 0
if (!subCirclesToLoad.empty())
{
/* request load of subcircles */
}
#endif
#endif
return true ;
}
bool p3GxsCircles::cache_reloadids(const RsGxsCircleId &circleId)
{
#ifdef DEBUG_CIRCLES
@ -1165,92 +1145,27 @@ bool p3GxsCircles::cache_reloadids(const RsGxsCircleId &circleId)
return false;
}
RsGxsCircleCache &cache = it->second;
std::list<RsGxsId> myOwnIds;
if(rsIdentity->getOwnIds(myOwnIds))
if(locked_processLoadingCacheEntry(it->second))
{
bool ownIdInCircle = false ;
for(std::list<RsGxsId>::const_iterator it(myOwnIds.begin());it!=myOwnIds.end() && !ownIdInCircle;++it)
ownIdInCircle = ownIdInCircle || cache.isAllowedPeer(*it) ;
cache.mAmIAllowed = ownIdInCircle ;
}
/* try reload Ids */
for(std::set<RsGxsId>::const_iterator pit = cache.mUnprocessedPeers.begin(); pit != cache.mUnprocessedPeers.end(); ++pit)
{
/* check cache */
if (mIdentities->haveKey(*pit))
{
/* we can process now! */
RsIdentityDetails details;
if (mIdentities->getIdDetails(*pit, details))
{
cache.mAllowedGxsIds.insert(*pit);
#ifdef DEBUG_CIRCLES
std::cerr << "p3GxsCircles::cache_reloadids() AllowedPeer: ";
std::cerr << *pit;
std::cerr << std::endl;
#endif // DEBUG_CIRCLES
}
else
{
// ERROR.
std::cerr << "p3GxsCircles::cache_reloadids() ERROR ";
std::cerr << " Should haveKey for Id: " << *pit;
std::cerr << std::endl;
}
}
else
{
// UNKNOWN ID.
std::cerr << "p3GxsCircles::cache_reloadids() UNKNOWN Id: ";
std::cerr << *pit;
std::cerr << std::endl;
}
}
// clear unprocessed List.
cache.mUnprocessedPeers.clear();
// If sub-circles are complete too.
#ifdef SUBSCIRCLES
if (cache.mUnprocessedCircles.empty())
{
#endif
#ifdef DEBUG_CIRCLES
std::cerr << "p3GxsCircles::cache_reloadids() Adding to cache Id: ";
std::cerr << circleId;
std::cerr << std::endl;
#endif // DEBUG_CIRCLES
checkCircleCacheForAutoSubscribe(cache);
// Push to Cache.
mCircleCache.store(circleId, cache);
mCircleCache.resize();
/* remove from loading queue */
mLoadingCache.erase(it);
#ifdef SUBSCIRCLES
}
else
{
std::cerr << "p3GxsCircles::cache_reloadids() WARNING Incomplete Cache Loading: ";
std::cerr << circleId;
std::cerr << std::endl;
std::cerr << " Unprocessed peers. Requesting reload for circle " << circleId << std::endl;
/* schedule event to try reload gxsIds */
RsTickEvent::schedule_in(CIRCLE_EVENT_RELOADIDS, GXSID_LOAD_CYCLE, circleId.toStdString());
}
#endif
return true;
}
bool p3GxsCircles::checkCircleCacheForMembershipUpdate(RsGxsCircleCache& cache)
{
if(cache.mLastUpdatedMembership_TS + < now)
time_t now = time(NULL) ;
if(cache.mLastUpdatedMembershipTS + GXS_CIRCLE_DELAY_TO_FORCE_MEMBERSHIP_UPDATE < now)
{
// this should be called regularly
uint32_t token ;
@ -1258,7 +1173,7 @@ bool p3GxsCircles::checkCircleCacheForMembershipUpdate(RsGxsCircleCache& cache)
opts.mReqType = GXS_REQUEST_TYPE_MSG_DATA;
std::list<RsGxsGroupId> grpIds ;
grpIds.push_back(grp) ;
grpIds.push_back(RsGxsGroupId(cache.mCircleId)) ;
RsGenExchange::getTokenService()->requestMsgInfo(token, RS_TOKREQ_ANSTYPE_SUMMARY, opts, grpIds);
GxsTokenQueue::queueRequest(token, CIRCLEREQ_MESSAGE_DATA);
@ -1298,9 +1213,32 @@ bool p3GxsCircles::checkCircleCacheForAutoSubscribe(RsGxsCircleCache &cache)
const RsPgpId& ownPgpId = mPgpUtils->getPGPOwnId();
bool am_I_allowed = cache.mAmIAllowed || (cache.mAllowedNodes.find(ownPgpId) != cache.mAllowedNodes.end()) ;
std::list<RsGxsId> myOwnIds;
if(am_I_allowed)
if(!rsIdentity->getOwnIds(myOwnIds))
{
std::cerr << " own ids not loaded yet." << std::endl;
/* schedule event to try reload gxsIds */
RsTickEvent::schedule_in(CIRCLE_EVENT_RELOADIDS, GXSID_LOAD_CYCLE, cache.mCircleId.toStdString());
return false ;
}
bool in_admin_list = false ;
for(std::list<RsGxsId>::const_iterator it(myOwnIds.begin());it!=myOwnIds.end() && !in_admin_list;++it)
{
std::map<RsGxsId,RsGxsCircleMembershipStatus>::const_iterator it2 = cache.mMembershipStatus.find(*it) ;
if(it2 != cache.mMembershipStatus.end())
in_admin_list = in_admin_list || (it2->second.subscription_flags & GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST) ;
}
#ifdef DEBUG_CIRCLES
std::cerr << " own ID in circle: " << ownIdInCircle << std::endl;
#endif
if(in_admin_list)
{
uint32_t token, token2;
@ -1323,7 +1261,7 @@ bool p3GxsCircles::checkCircleCacheForAutoSubscribe(RsGxsCircleCache &cache)
return true;
}
else if (cache.mUnprocessedPeers.empty())
else
{
/* we know all the peers - we are not part - we can flag as PROCESSED. */
uint32_t token,token2;
@ -1345,15 +1283,6 @@ bool p3GxsCircles::checkCircleCacheForAutoSubscribe(RsGxsCircleCache &cache)
return true ;
}
else
{
#ifdef DEBUG_CIRCLES
std::cerr << " Leaving Unprocessed" << std::endl;
#endif
// Don't clear UNPROCESSED - as we might not know all the peers.
// TODO - work out when we flag as PROCESSED.
}
return false;
}
void p3GxsCircles::addCircleIdToList(const RsGxsCircleId &circleId, uint32_t circleType)
@ -1954,7 +1883,7 @@ bool p3GxsCircles::pushCircleMembershipRequest(const RsGxsId& own_gxsid,const Rs
std::cerr << " AuthorId : " << s->meta.mAuthorId << std::endl;
std::cerr << " ThreadId : " << s->meta.mThreadId << std::endl;
#warning Would be nice to wait for a few seconds before publishing, so that the user can potentially cancel a wrong request before it gets piped into the system
#warning Would be nice to wait for a few seconds before publishing, so that the user can potentially cancel a wrong request before it gets piped into the system, or better look if there is a less recent version that we can keep/remove?
//RsGenExchange::publishMsg(token, msgItem);
return true;
}
@ -1981,7 +1910,7 @@ bool p3GxsCircles::processMembershipRequests(uint32_t token)
if(!RsGenExchange::getMsgData(token, msgItems))
{
std::cerr << "(EE) Cannot get msg data for circle. Something's weird." << std::endl;
return ;
return false;
}
GxsMsgReq messages_to_delete ;
@ -1991,7 +1920,7 @@ bool p3GxsCircles::processMembershipRequests(uint32_t token)
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
std::cerr << "Circle ID: " << it->first << std::endl;
RsGxsCircleId cid = it->first ;
RsGxsCircleId cid ( it->first );
if (!mCircleCache.is_cached(cid))
{
@ -2014,11 +1943,11 @@ bool p3GxsCircles::processMembershipRequests(uint32_t token)
}
RsGxsCircleSubscriptionStatusInfo& info(data.mSubscriptions[item->meta.mAuthorId]) ;
RsGxsCircleMembershipStatus& info(data.mMembershipStatus[item->meta.mAuthorId]) ;
if(info.subscription_TS < item->time_stamp)
if(info.last_subscription_TS < item->time_stamp)
{
info.subscription_TS = item->time_stamp ;
info.last_subscription_TS = item->time_stamp ;
if(item->subscription_type == RsGxsCircleSubscriptionRequestItem::SUBSCRIPTION_REQUEST_SUBSCRIBE)
info.subscription_flags |= GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED;
@ -2027,7 +1956,7 @@ bool p3GxsCircles::processMembershipRequests(uint32_t token)
else
std::cerr << "(EE) unknown subscription order type " << item->subscription_type << " for circle " << cid << " by id " << item->meta.mAuthorId << std::endl;
}
else if(info.subscription_TS > item->time_stamp)
else if(info.last_subscription_TS > item->time_stamp)
{
std::cerr << " scheduling for deletion" << std::endl;
messages_to_delete[RsGxsGroupId(cid)].push_back(it->second[i]->meta.mMsgId) ;
@ -2038,7 +1967,10 @@ bool p3GxsCircles::processMembershipRequests(uint32_t token)
}
RsStackMutex stack(mCircleMtx); /********** STACK LOCKED MTX ******/
mDataStore->removeMsgs(messages_to_delete);
uint32_t token2;
RsGenExchange::deleteMsgs(token2,messages_to_delete);
return true ;
}

View File

@ -127,7 +127,7 @@
class RsGxsCircleMembershipStatus
{
public:
RsGxsCircleMembershipStatus() : subscription_TS(0), subscription_flags(0) {}
RsGxsCircleMembershipStatus() : last_subscription_TS(0), subscription_flags(0) {}
time_t last_subscription_TS ;
uint32_t subscription_flags ; // combination of GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST and GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED
@ -143,7 +143,7 @@ class RsGxsCircleCache
bool getAllowedPeersList(std::list<RsPgpId> &friendlist) const;
bool isAllowedPeer(const RsPgpId &id) const;
bool isAllowedPeer(const RsGxsId &id) const;
bool isAllowedPeer(const RsGxsId &id, const RsGxsGroupId &destination_group) const;
bool addAllowedPeer(const RsPgpId &pgpid);
bool addLocalFriend(const RsPgpId &pgpid);
@ -152,7 +152,6 @@ class RsGxsCircleCache
uint32_t mCircleType;
bool mIsExternal;
bool mAmIAllowed ;
uint32_t mGroupStatus;
uint32_t mGroupSubscribeFlags;
@ -162,8 +161,8 @@ class RsGxsCircleCache
std::set<RsGxsCircleId> mUnprocessedCircles;
std::set<RsGxsCircleId> mProcessedCircles;
#endif
std::map<RsGxsId,RsGxsCircleMembershipStatus> mMembershipStatusMap;
time_t mLastUpdateMembershipTS ; // last time the subscribe messages have been requested. Should be reset when new messages arrive.
std::map<RsGxsId,RsGxsCircleMembershipStatus> mMembershipStatus;
time_t mLastUpdatedMembershipTS ; // last time the subscribe messages have been requested. Should be reset when new messages arrive.
std::set<RsGxsId> mAllowedGxsIds; // IDs that are allowed in the circle and have requested membership. This is the official members list.
std::set<RsPgpId> mAllowedNodes;
@ -192,9 +191,10 @@ virtual RsServiceInfo getServiceInfo();
virtual int canSend(const RsGxsCircleId &circleId, const RsPgpId &id, bool &should_encrypt);
virtual int canReceive(const RsGxsCircleId &circleId, const RsPgpId &id);
virtual bool recipients(const RsGxsCircleId &circleId, std::list<RsPgpId> &friendlist) ;
virtual bool recipients(const RsGxsCircleId &circleId, std::list<RsGxsId> &gxs_ids) ;
virtual bool isRecipient(const RsGxsCircleId &circleId, const RsGxsId& id) ;
virtual bool recipients(const RsGxsCircleId &circleId, const RsGxsGroupId& dest_group, std::list<RsGxsId> &gxs_ids) ;
virtual bool isRecipient(const RsGxsCircleId &circleId, const RsGxsGroupId& destination_group, const RsGxsId& id) ;
virtual bool getGroupData(const uint32_t &token, std::vector<RsGxsCircleGroup> &groups);
@ -249,7 +249,7 @@ virtual RsServiceInfo getServiceInfo();
bool checkCircleCacheForAutoSubscribe(RsGxsCircleCache &cache);
bool checkCircleCacheForMembershipUpdate(RsGxsCircleCache& cache);
bool locked_processLoadingCacheEntry(RsGxsCircleCache &cache);
p3IdService *mIdentities; // Needed for constructing Circle Info,
PgpAuxUtils *mPgpUtils;
@ -290,8 +290,6 @@ virtual RsServiceInfo getServiceInfo();
uint32_t mDummyIdToken;
std::list<RsGxsId> mDummyPgpLinkedIds;
std::list<RsGxsId> mDummyOwnIds;
};
#endif // P3_CIRCLES_SERVICE_HEADER

View File

@ -462,7 +462,8 @@ void IdDialog::loadCircleGroupMeta(const uint32_t &token)
item->setText(CIRCLEGROUP_CIRCLE_COL_GROUPNAME, QString::fromUtf8(vit->mGroupName.c_str()));
item->setText(CIRCLEGROUP_CIRCLE_COL_GROUPID, QString::fromStdString(vit->mGroupId.toStdString()));
item->setData(CIRCLEGROUP_CIRCLE_COL_GROUPFLAGS, Qt::UserRole, QVariant(vit->mSubscribeFlags));
item->setData(CIRCLEGROUP_CIRCLE_COL_SUBSCRIBEFLAGS, Qt::UserRole, QVariant(details.mSubscribeFlags));
#warning TODO
//item->setData(CIRCLEGROUP_CIRCLE_COL_SUBSCRIBEFLAGS, Qt::UserRole, QVariant(details.mSubscribeFlags));
if(am_I_in_circle)
{
@ -489,7 +490,8 @@ void IdDialog::loadCircleGroupMeta(const uint32_t &token)
// just in case.
item->setData(CIRCLEGROUP_CIRCLE_COL_GROUPFLAGS, Qt::UserRole, QVariant(vit->mSubscribeFlags));
item->setData(CIRCLEGROUP_CIRCLE_COL_SUBSCRIBEFLAGS, Qt::UserRole, QVariant(details.mSubscribeFlags));
#warning TODO
//item->setData(CIRCLEGROUP_CIRCLE_COL_SUBSCRIBEFLAGS, Qt::UserRole, QVariant(details.mSubscribeFlags));
if (vit->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN)
{
@ -500,28 +502,29 @@ void IdDialog::loadCircleGroupMeta(const uint32_t &token)
item->setFont(CIRCLEGROUP_CIRCLE_COL_GROUPFLAGS,font) ;
}
switch(details.mSubscribeFlags)
{
case GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED:
item->setIcon(CIRCLEGROUP_CIRCLE_COL_GROUPNAME,QIcon(":icons/bullet_yellow_128.png")) ;
item->setToolTip(CIRCLEGROUP_CIRCLE_COL_GROUPNAME,tr("Your request to be in this circle is pending. You need to wait for the administrator to validate it.")) ;
break ;
case GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST:
item->setIcon(CIRCLEGROUP_CIRCLE_COL_GROUPNAME,QIcon(":icons/bullet_blue_128.png")) ;
item->setToolTip(CIRCLEGROUP_CIRCLE_COL_GROUPNAME,tr("You are invited to this circle by the administrator. Right click to join the circle.")) ;
break ;
case GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST | GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED:
item->setIcon(CIRCLEGROUP_CIRCLE_COL_GROUPNAME,QIcon(":icons/bullet_green_128.png")) ;
item->setToolTip(CIRCLEGROUP_CIRCLE_COL_GROUPNAME,tr("You are a validated member of this circle.")) ;
break ;
default:
item->setIcon(CIRCLEGROUP_CIRCLE_COL_GROUPNAME,QIcon()) ;
item->setToolTip(CIRCLEGROUP_CIRCLE_COL_GROUPNAME,"") ;
break ;
}
#warning TODO
// switch(details.mSubscribeFlags)
// {
// case GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED:
// item->setIcon(CIRCLEGROUP_CIRCLE_COL_GROUPNAME,QIcon(":icons/bullet_yellow_128.png")) ;
// item->setToolTip(CIRCLEGROUP_CIRCLE_COL_GROUPNAME,tr("Your request to be in this circle is pending. You need to wait for the administrator to validate it.")) ;
// break ;
//
// case GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST:
// item->setIcon(CIRCLEGROUP_CIRCLE_COL_GROUPNAME,QIcon(":icons/bullet_blue_128.png")) ;
// item->setToolTip(CIRCLEGROUP_CIRCLE_COL_GROUPNAME,tr("You are invited to this circle by the administrator. Right click to join the circle.")) ;
// break ;
//
// case GXS_EXTERNAL_CIRCLE_FLAGS_IN_ADMIN_LIST | GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED:
// item->setIcon(CIRCLEGROUP_CIRCLE_COL_GROUPNAME,QIcon(":icons/bullet_green_128.png")) ;
// item->setToolTip(CIRCLEGROUP_CIRCLE_COL_GROUPNAME,tr("You are a validated member of this circle.")) ;
// break ;
//
// default:
// item->setIcon(CIRCLEGROUP_CIRCLE_COL_GROUPNAME,QIcon()) ;
// item->setToolTip(CIRCLEGROUP_CIRCLE_COL_GROUPNAME,"") ;
// break ;
// }
}
}

View File

@ -43,7 +43,7 @@ static bool same_RsGxsCircleDetails(const RsGxsCircleDetails& d1,const RsGxsCirc
return ( d1.mCircleId == d2.mCircleId
&& d1.mCircleName == d2.mCircleName
&& d1.mCircleType == d2.mCircleType
&& d1.mSubscribeFlags == d2.mSubscribeFlags
&& d1.mSubscriptionFlags == d2.mSubscriptionFlags
&& d1.mAllowedGxsIds== d2.mAllowedGxsIds
&& d1.mAllowedNodes == d2.mAllowedNodes
);