made GxsNotify for messages with more granularity. Removed RsGxsCircleMsg class that was not used.

This commit is contained in:
csoler 2020-05-03 23:20:13 +02:00
parent efb26ce9c0
commit ce6abe5d66
No known key found for this signature in database
GPG key ID: 7BCA522266C0804C
19 changed files with 217 additions and 284 deletions

View file

@ -239,7 +239,7 @@ void p3GxsChannels::notifyChanges(std::vector<RsGxsNotify *> &changes)
#endif
/* iterate through and grab any new messages */
std::list<RsGxsGroupId> unprocessedGroups;
std::set<RsGxsGroupId> unprocessedGroups;
std::vector<RsGxsNotify *>::iterator it;
for(it = changes.begin(); it != changes.end(); ++it)
@ -253,16 +253,12 @@ void p3GxsChannels::notifyChanges(std::vector<RsGxsNotify *> &changes)
/* message received */
if (rsEvents)
{
std::map<RsGxsGroupId, std::set<RsGxsMessageId> > &msgChangeMap = msgChange->msgChangeMap;
for (auto mit = msgChangeMap.begin(); mit != msgChangeMap.end(); ++mit)
for (auto mit1 = mit->second.begin(); mit1 != mit->second.end(); ++mit1)
{
auto ev = std::make_shared<RsGxsChannelEvent>();
ev->mChannelMsgId = *mit1;
ev->mChannelGroupId = mit->first;
ev->mChannelEventCode = RsChannelEventCode::NEW_MESSAGE;
rsEvents->postEvent(ev);
}
auto ev = std::make_shared<RsGxsChannelEvent>();
ev->mChannelMsgId = msgChange->mMsgId;
ev->mChannelGroupId = msgChange->mGroupId;
ev->mChannelEventCode = RsChannelEventCode::NEW_MESSAGE;
rsEvents->postEvent(ev);
}
}
@ -273,25 +269,21 @@ void p3GxsChannels::notifyChanges(std::vector<RsGxsNotify *> &changes)
std::cerr << std::endl;
#endif
std::map<RsGxsGroupId, std::set<RsGxsMessageId> > &msgChangeMap = msgChange->msgChangeMap;
for(auto mit = msgChangeMap.begin(); mit != msgChangeMap.end(); ++mit)
#ifdef GXSCHANNELS_DEBUG
std::cerr << "p3GxsChannels::notifyChanges() Msgs for Group: " << mit->first;
std::cerr << std::endl;
#endif
{
#ifdef GXSCHANNELS_DEBUG
std::cerr << "p3GxsChannels::notifyChanges() Msgs for Group: " << mit->first;
std::cerr << "p3GxsChannels::notifyChanges() AutoDownload for Group: " << mit->first;
std::cerr << std::endl;
#endif
bool enabled = false;
if (autoDownloadEnabled(mit->first, enabled) && enabled)
{
#ifdef GXSCHANNELS_DEBUG
std::cerr << "p3GxsChannels::notifyChanges() AutoDownload for Group: " << mit->first;
std::cerr << std::endl;
#endif
/* problem is most of these will be comments and votes,
* should make it occasional - every 5mins / 10minutes TODO */
unprocessedGroups.push_back(mit->first);
}
/* problem is most of these will be comments and votes, should make it occasional - every 5mins / 10minutes TODO */
// We do not call if(autoDownLoadEnabled()) here, because it would be too costly when
// many msgs are received from the same group. We back the groupIds and then request one by one.
unprocessedGroups.insert(msgChange->mGroupId);
}
}
}
@ -376,8 +368,16 @@ void p3GxsChannels::notifyChanges(std::vector<RsGxsNotify *> &changes)
delete *it;
}
if(!unprocessedGroups.empty())
request_SpecificSubscribedGroups(unprocessedGroups);
std::list<RsGxsGroupId> grps;
for(auto& grp_id:unprocessedGroups)
{
bool enabled = false;
if (autoDownloadEnabled(grp_id, enabled) && enabled) // costly call, that's why it's packed down here.
grps.push_back(grp_id);
}
if(!grps.empty())
request_SpecificSubscribedGroups(grps);
}
void p3GxsChannels::service_tick()
@ -705,8 +705,7 @@ void p3GxsChannels::request_AllSubscribedGroups()
}
void p3GxsChannels::request_SpecificSubscribedGroups(
const std::list<RsGxsGroupId> &groups )
void p3GxsChannels::request_SpecificSubscribedGroups( const std::list<RsGxsGroupId> &groups )
{
#ifdef GXSCHANNELS_DEBUG
std::cerr << "p3GxsChannels::request_SpecificSubscribedGroups()";
@ -719,8 +718,7 @@ void p3GxsChannels::request_SpecificSubscribedGroups(
uint32_t token = 0;
if(!RsGenExchange::getTokenService()->
requestGroupInfo(token, ansType, opts, groups))
if(!RsGenExchange::getTokenService()-> requestGroupInfo(token, ansType, opts, groups))
{
std::cerr << __PRETTY_FUNCTION__ << " Failed requesting groups info!"
<< std::endl;

View file

@ -535,57 +535,51 @@ void p3GxsCircles::notifyChanges(std::vector<RsGxsNotify *> &changes)
for(auto it = changes.begin(); it != changes.end(); ++it)
{
RsGxsMsgChange *msgChange = dynamic_cast<RsGxsMsgChange *>(*it);
RsGxsNotify *c = *it;
RsGxsMsgChange *msgChange = dynamic_cast<RsGxsMsgChange *>(c);
if (msgChange)
{
#ifdef DEBUG_CIRCLES
std::cerr << " Found circle Message Change Notification" << std::endl;
std::cerr << " Found circle Message Change Notification for group " << msgChange->mGroupId << ", msg ID " << msgChange->mMsgId << std::endl;
#endif
for(auto mit = msgChange->msgChangeMap.begin(); mit != msgChange->msgChangeMap.end(); ++mit)
{
#ifdef DEBUG_CIRCLES
std::cerr << " Msgs for Group: " << mit->first << std::endl;
std::cerr << " Msgs for Group: " << mit->first << std::endl;
#endif
RsGxsCircleId circle_id(mit->first);
RsGxsCircleId circle_id(msgChange->mGroupId);
force_cache_reload(circle_id);
if(rsEvents && (c->getType() == RsGxsNotify::TYPE_RECEIVED_NEW))
{
const RsGxsCircleSubscriptionRequestItem *item = dynamic_cast<const RsGxsCircleSubscriptionRequestItem *>(msgChange->mNewMsgItem);
RsGxsCircleDetails details;
getCircleDetails(circle_id,details);
if(item)
{
auto ev = std::make_shared<RsGxsCircleEvent>();
ev->mCircleId = circle_id;
ev->mGxsId = msgChange->mNewMsgItem->meta.mAuthorId;
if(rsEvents && (c->getType() == RsGxsNotify::TYPE_RECEIVED_NEW|| c->getType() == RsGxsNotify::TYPE_PUBLISHED) )
for (auto msgIdIt(mit->second.begin()), end(mit->second.end()); msgIdIt != end; ++msgIdIt)
if (item->subscription_type == RsGxsCircleSubscriptionType::UNSUBSCRIBE)
{
RsGxsCircleMsg msg;
if(getCircleRequest(RsGxsGroupId(circle_id),*msgIdIt,msg))
{
auto ev = std::make_shared<RsGxsCircleEvent>();
ev->mCircleId = circle_id;
ev->mGxsId = msg.mMeta.mAuthorId;
if (msg.stuff == "SUBSCRIPTION_REQUEST_UNSUBSCRIBE")
{
ev->mCircleEventType = RsGxsCircleEventCode::CIRCLE_MEMBERSHIP_LEAVE;
rsEvents->postEvent(ev);
}
else if(msg.stuff == "SUBSCRIPTION_REQUEST_SUBSCRIBE")
{
ev->mCircleEventType = RsGxsCircleEventCode::CIRCLE_MEMBERSHIP_REQUEST;
rsEvents->postEvent(ev);
}
}
else
RsErr()<< __PRETTY_FUNCTION__<<" Cannot request CircleMsg " << *msgIdIt << ". Db not ready?" << std::endl;
ev->mCircleEventType = RsGxsCircleEventCode::CIRCLE_MEMBERSHIP_LEAVE;
rsEvents->postEvent(ev);
}
mCircleCache.erase(circle_id);
mCacheUpdated = true;
else if(item->subscription_type == RsGxsCircleSubscriptionType::SUBSCRIBE)
{
ev->mCircleEventType = RsGxsCircleEventCode::CIRCLE_MEMBERSHIP_REQUEST;
rsEvents->postEvent(ev);
}
else
RsErr() << __PRETTY_FUNCTION__ << " Unknown subscription request type " << static_cast<uint32_t>(item->subscription_type) << " in msg item" << std::endl;
}
else
RsErr() << __PRETTY_FUNCTION__ << ": missing SubscriptionRequestItem in msg notification for msg " << msgChange->mMsgId << std::endl;
}
mCircleCache.erase(circle_id);
mCacheUpdated = true;
}
RsGxsGroupChange *groupChange = dynamic_cast<RsGxsGroupChange *>(*it);
RsGxsGroupChange *groupChange = dynamic_cast<RsGxsGroupChange *>(c);
/* add groups to ExternalIdList (Might get Personal Circles here until NetChecks in place) */
if (groupChange)
@ -697,7 +691,7 @@ void p3GxsCircles::notifyChanges(std::vector<RsGxsNotify *> &changes)
}
}
delete *it;
delete c;
}
}
@ -930,8 +924,9 @@ bool p3GxsCircles::getMsgData(const uint32_t &token, std::vector<RsGxsCircleMsg>
for(; vit != msgItems.end(); ++vit)
{
#ifdef TO_REMOVE
RsGxsCircleMsgItem* item = dynamic_cast<RsGxsCircleMsgItem*>(*vit);
RsGxsCircleSubscriptionRequestItem* rsItem = dynamic_cast<RsGxsCircleSubscriptionRequestItem*>(*vit);
if(item)
{
RsGxsCircleMsg msg = item->mMsg;
@ -939,22 +934,16 @@ bool p3GxsCircles::getMsgData(const uint32_t &token, std::vector<RsGxsCircleMsg>
msgs.push_back(msg);
delete item;
}
else if (rsItem)
#endif
RsGxsCircleSubscriptionRequestItem* rsItem = dynamic_cast<RsGxsCircleSubscriptionRequestItem*>(*vit);
if (rsItem)
{
RsGxsCircleMsg msg ;//= rsItem->mMsg;
msg.mMeta = rsItem->meta;
switch (rsItem->subscription_type)
{
case RsGxsCircleSubscriptionRequestItem::SUBSCRIPTION_REQUEST_UNKNOWN:
msg.stuff.clear();
break;
case RsGxsCircleSubscriptionRequestItem::SUBSCRIPTION_REQUEST_SUBSCRIBE:
msg.stuff="SUBSCRIPTION_REQUEST_SUBSCRIBE";
break;
case RsGxsCircleSubscriptionRequestItem::SUBSCRIPTION_REQUEST_UNSUBSCRIBE:
msg.stuff="SUBSCRIPTION_REQUEST_UNSUBSCRIBE";
break;
}
msg.mSubscriptionType = rsItem->subscription_type;
msgs.push_back(msg);
delete rsItem;
}
@ -2320,19 +2309,15 @@ void p3GxsCircles::handle_event(uint32_t event_type, const std::string &elabel)
bool p3GxsCircles::pushCircleMembershipRequest(
const RsGxsId& own_gxsid, const RsGxsCircleId& circle_id,
uint32_t request_type )
RsGxsCircleSubscriptionType request_type )
{
Dbg3() << __PRETTY_FUNCTION__ << "own_gxsid = " << own_gxsid
<< ", circle=" << circle_id << ", req type=" << request_type
<< std::endl;
if( request_type !=
RsGxsCircleSubscriptionRequestItem::SUBSCRIPTION_REQUEST_SUBSCRIBE &&
request_type !=
RsGxsCircleSubscriptionRequestItem::SUBSCRIPTION_REQUEST_UNSUBSCRIBE )
if( request_type != RsGxsCircleSubscriptionType::SUBSCRIBE && request_type != RsGxsCircleSubscriptionType::UNSUBSCRIBE )
{
RsErr() << __PRETTY_FUNCTION__ << " Unknown request type: "
<< request_type << std::endl;
RsErr() << __PRETTY_FUNCTION__ << " Unknown request type: " << static_cast<uint32_t>(request_type) << std::endl;
return false;
}
@ -2368,7 +2353,7 @@ bool p3GxsCircles::pushCircleMembershipRequest(
s->meta.mGroupId = RsGxsGroupId(circle_id) ;
s->meta.mMsgId.clear();
s->meta.mThreadId = RsDirUtil::sha1sum(tmpmem,tmpmem.size()); // make the ID from the hash of the cirle ID and the author ID
s->meta.mThreadId = RsGxsMessageId(RsDirUtil::sha1sum(tmpmem,tmpmem.size())); // make the ID from the hash of the cirle ID and the author ID
s->meta.mAuthorId = own_gxsid;
// msgItem->meta.mParentId = ; // leave these blank
@ -2382,7 +2367,7 @@ bool p3GxsCircles::pushCircleMembershipRequest(
#endif
uint32_t token ;
if(request_type == RsGxsCircleSubscriptionRequestItem::SUBSCRIPTION_REQUEST_SUBSCRIBE)
if(request_type == RsGxsCircleSubscriptionType::SUBSCRIBE)
RsGenExchange::subscribeToGroup(token, RsGxsGroupId(circle_id), true);
RsGenExchange::publishMsg(token, s);
@ -2395,11 +2380,11 @@ bool p3GxsCircles::pushCircleMembershipRequest(
bool p3GxsCircles::requestCircleMembership(const RsGxsId& own_gxsid,const RsGxsCircleId& circle_id)
{
return pushCircleMembershipRequest(own_gxsid,circle_id,RsGxsCircleSubscriptionRequestItem::SUBSCRIPTION_REQUEST_SUBSCRIBE) ;
return pushCircleMembershipRequest(own_gxsid,circle_id,RsGxsCircleSubscriptionType::SUBSCRIBE) ;
}
bool p3GxsCircles::cancelCircleMembership(const RsGxsId& own_gxsid,const RsGxsCircleId& circle_id)
{
return pushCircleMembershipRequest(own_gxsid,circle_id,RsGxsCircleSubscriptionRequestItem::SUBSCRIPTION_REQUEST_UNSUBSCRIBE) ;
return pushCircleMembershipRequest(own_gxsid,circle_id,RsGxsCircleSubscriptionType::UNSUBSCRIBE) ;
}
@ -2470,12 +2455,12 @@ bool p3GxsCircles::processMembershipRequests(uint32_t token)
{
info.last_subscription_TS = item->time_stamp ;
if(item->subscription_type == RsGxsCircleSubscriptionRequestItem::SUBSCRIPTION_REQUEST_SUBSCRIBE)
if(item->subscription_type == RsGxsCircleSubscriptionType::SUBSCRIBE)
info.subscription_flags |= GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED;
else if(item->subscription_type == RsGxsCircleSubscriptionRequestItem::SUBSCRIPTION_REQUEST_UNSUBSCRIBE)
else if(item->subscription_type == RsGxsCircleSubscriptionType::UNSUBSCRIBE)
info.subscription_flags &= ~GXS_EXTERNAL_CIRCLE_FLAGS_SUBSCRIBED;
else
std::cerr << " (EE) unknown subscription order type: " << item->subscription_type ;
std::cerr << " (EE) unknown subscription order type: " << static_cast<uint32_t>(item->subscription_type) ;
mCacheUpdated = true;
#ifdef DEBUG_CIRCLES

View file

@ -264,7 +264,7 @@ public:
protected:
bool pushCircleMembershipRequest(const RsGxsId& own_gxsid,const RsGxsCircleId& circle_id,uint32_t request_type) ;
bool pushCircleMembershipRequest(const RsGxsId& own_gxsid, const RsGxsCircleId& circle_id, RsGxsCircleSubscriptionType request_type) ;
static uint32_t circleAuthenPolicy();
/** Notifications **/

View file

@ -195,16 +195,11 @@ void p3GxsForums::notifyChanges(std::vector<RsGxsNotify *> &changes)
if (msgChange->getType() == RsGxsNotify::TYPE_RECEIVED_NEW || msgChange->getType() == RsGxsNotify::TYPE_PUBLISHED) /* message received */
if (rsEvents)
{
std::map<RsGxsGroupId, std::set<RsGxsMessageId> >& msgChangeMap = msgChange->msgChangeMap;
for (auto mit = msgChangeMap.begin(); mit != msgChangeMap.end(); ++mit)
for (auto mit1 = mit->second.begin(); mit1 != mit->second.end(); ++mit1)
{
auto ev = std::make_shared<RsGxsForumEvent>();
ev->mForumMsgId = *mit1;
ev->mForumGroupId = mit->first;
ev->mForumEventCode = RsForumEventCode::NEW_MESSAGE;
rsEvents->postEvent(ev);
}
auto ev = std::make_shared<RsGxsForumEvent>();
ev->mForumMsgId = msgChange->mMsgId;
ev->mForumGroupId = msgChange->mGroupId;
ev->mForumEventCode = RsForumEventCode::NEW_MESSAGE;
rsEvents->postEvent(ev);
}
#ifdef NOT_USED_YET

View file

@ -606,22 +606,7 @@ void p3IdService::notifyChanges(std::vector<RsGxsNotify *> &changes)
RsGxsMsgChange *msgChange = dynamic_cast<RsGxsMsgChange *>(changes[i]);
if (msgChange && !msgChange->metaChange())
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::notifyChanges() Found Message Change Notification";
std::cerr << std::endl;
#endif
std::map<RsGxsGroupId, std::set<RsGxsMessageId> > &msgChangeMap = msgChange->msgChangeMap;
for(auto mit = msgChangeMap.begin(); mit != msgChangeMap.end(); ++mit)
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::notifyChanges() Msgs for Group: " << mit->first;
std::cerr << std::endl;
#endif
}
}
RsWarn() << __PRETTY_FUNCTION__ << " Found a Msg data change in p3IdService. This is quite unexpected." << std::endl;
RsGxsGroupChange *groupChange = dynamic_cast<RsGxsGroupChange *>(changes[i]);

View file

@ -98,27 +98,22 @@ void p3PostBase::notifyChanges(std::vector<RsGxsNotify *> &changes)
std::cerr << std::endl;
#endif
std::map<RsGxsGroupId, std::set<RsGxsMessageId> > &msgChangeMap = msgChange->msgChangeMap;
for(auto mit = msgChangeMap.begin(); mit != msgChangeMap.end(); ++mit)
{
#ifdef POSTBASE_DEBUG
std::cerr << "p3PostBase::notifyChanges() Msgs for Group: " << mit->first;
std::cerr << std::endl;
std::cerr << "p3PostBase::notifyChanges() Msgs for Group: " << mit->first;
std::cerr << std::endl;
#endif
// To start with we are just going to trigger updates on these groups.
// FUTURE OPTIMISATION.
// It could be taken a step further and directly request these msgs for an update.
addGroupForProcessing(mit->first);
// To start with we are just going to trigger updates on these groups.
// FUTURE OPTIMISATION.
// It could be taken a step further and directly request these msgs for an update.
addGroupForProcessing(msgChange->mGroupId);
if (rsEvents && (msgChange->getType() == RsGxsNotify::TYPE_RECEIVED_NEW || msgChange->getType() == RsGxsNotify::TYPE_PUBLISHED))
for (auto mit1 = mit->second.begin(); mit1 != mit->second.end(); ++mit1)
{
auto ev = std::make_shared<RsGxsPostedEvent>();
ev->mPostedMsgId = *mit1;
ev->mPostedGroupId = mit->first;
ev->mPostedEventCode = RsPostedEventCode::NEW_MESSAGE;
rsEvents->postEvent(ev);
}
if (rsEvents && (msgChange->getType() == RsGxsNotify::TYPE_RECEIVED_NEW || msgChange->getType() == RsGxsNotify::TYPE_PUBLISHED))
{
auto ev = std::make_shared<RsGxsPostedEvent>();
ev->mPostedMsgId = msgChange->mMsgId;
ev->mPostedGroupId = msgChange->mGroupId;
ev->mPostedEventCode = RsPostedEventCode::NEW_MESSAGE;
rsEvents->postEvent(ev);
}
}
@ -336,11 +331,7 @@ void p3PostBase::addGroupForProcessing(RsGxsGroupId grpId)
{
RsStackMutex stack(mPostBaseMtx); /********** STACK LOCKED MTX ******/
// no point having multiple lookups queued.
if (mBgGroupList.end() == std::find(mBgGroupList.begin(),
mBgGroupList.end(), grpId))
{
mBgGroupList.push_back(grpId);
}
mBgGroupList.insert(grpId);
}
}
@ -373,8 +364,8 @@ void p3PostBase::background_requestUnprocessedGroup()
return;
}
grpId = mBgGroupList.front();
mBgGroupList.pop_front();
grpId = *mBgGroupList.begin();
mBgGroupList.erase(grpId);
mBgProcessing = true;
}
@ -468,8 +459,6 @@ void p3PostBase::background_loadMsgs(const uint32_t &token, bool unprocessed)
// generate vector of changes to push to the GUI.
std::vector<RsGxsNotify *> changes;
RsGxsMsgChange *msgChanges = new RsGxsMsgChange(RsGxsNotify::TYPE_PROCESSED, false);
RsGxsGroupId groupId;
std::map<RsGxsGroupId, std::vector<RsGxsMsgItem*> >::iterator mit;
@ -520,7 +509,7 @@ void p3PostBase::background_loadMsgs(const uint32_t &token, bool unprocessed)
#endif
/* but we need to notify GUI about them */
msgChanges->msgChangeMap[mit->first].insert((*vit)->meta.mMsgId);
changes.push_back(new RsGxsMsgChange(RsGxsNotify::TYPE_PROCESSED, mit->first,(*vit)->meta.mMsgId, false));
}
else if (NULL != (commentItem = dynamic_cast<RsGxsCommentItem *>(*vit)))
{
@ -616,22 +605,7 @@ void p3PostBase::background_loadMsgs(const uint32_t &token, bool unprocessed)
}
/* push updates of new Posts */
if (msgChanges->msgChangeMap.size() > 0)
{
#ifdef POSTBASE_DEBUG
std::cerr << "p3PostBase::background_processNewMessages() -> receiveChanges()";
std::cerr << std::endl;
#endif
changes.push_back(msgChanges);
//receiveHelperChanges(changes);
notifyChanges(changes);
}
else
{
delete(msgChanges);
}
notifyChanges(changes);
/* request the summary info from the parents */
uint32_t token_b;
@ -696,7 +670,6 @@ void p3PostBase::background_updateVoteCounts(const uint32_t &token)
// generate vector of changes to push to the GUI.
std::vector<RsGxsNotify *> changes;
RsGxsMsgChange *msgChanges = new RsGxsMsgChange(RsGxsNotify::TYPE_PROCESSED, false);
for(mit = parentMsgList.begin(); mit != parentMsgList.end(); ++mit)
{
@ -739,7 +712,8 @@ void p3PostBase::background_updateVoteCounts(const uint32_t &token)
#endif
stats.increment(it->second);
msgChanges->msgChangeMap[mit->first].insert(vit->mMsgId);
changes.push_back(new RsGxsMsgChange(RsGxsNotify::TYPE_PROCESSED,mit->first,vit->mMsgId, false));
}
else
{
@ -771,21 +745,7 @@ void p3PostBase::background_updateVoteCounts(const uint32_t &token)
}
}
if (msgChanges->msgChangeMap.size() > 0)
{
#ifdef POSTBASE_DEBUG
std::cerr << "p3PostBase::background_updateVoteCounts() -> receiveChanges()";
std::cerr << std::endl;
#endif
changes.push_back(msgChanges);
//receiveHelperChanges(changes);
notifyChanges(changes);
}
else
{
delete(msgChanges);
}
notifyChanges(changes);
// DONE!.
background_cleanup();

View file

@ -125,7 +125,7 @@ private:
bool mBgProcessing;
bool mBgIncremental;
std::list<RsGxsGroupId> mBgGroupList;
std::set<RsGxsGroupId> mBgGroupList;
std::map<RsGxsMessageId, PostStats> mBgStatsMap;
std::map<RsGxsGroupId,rstime_t> mKnownPosted;