RsGxsDataAccess filtered getters do not return all messages if none matches

This was causing scary error message in RsGxsChannels with autodoanload
  enabled, when all messages where already processed none matched the
  filter so all messages where returned,
  making p3GxsChannels::handleUnprocessedPost furious
  more details into this forum post
  retroshare://forum?name=Scary%20message%2C%20but%20which%20doesn%27t%20seem%20to%20be%20the%20source%20of%20the%20problems%20Was%3A%20More%20GXS%20strange%20messages&id=8fd22bd8f99754461e7ba1ca8a727995&msgid=04f10ff97f761c6840c33f1610cb050f0f73da8d
This commit is contained in:
Gioacchino Mazzurco 2018-11-09 21:24:31 +01:00
parent 2ed6904d06
commit 90bacf12bd
No known key found for this signature in database
GPG Key ID: A1FBCA3872E87051
6 changed files with 155 additions and 134 deletions

View File

@ -1198,16 +1198,16 @@ void RsDataService::locked_retrieveGroups(RetroCursor* c, std::vector<RsNxsGrp*>
} }
} }
int RsDataService::retrieveNxsMsgs(const GxsMsgReq &reqIds, GxsMsgResult &msg, bool /* cache */, bool withMeta) int RsDataService::retrieveNxsMsgs(
const GxsMsgReq &reqIds, GxsMsgResult &msg, bool /* cache */,
bool withMeta )
{ {
#ifdef RS_DATA_SERVICE_DEBUG_TIME #ifdef RS_DATA_SERVICE_DEBUG_TIME
rstime::RsScopeTimer timer(""); rstime::RsScopeTimer timer("");
int resultCount = 0; int resultCount = 0;
#endif #endif
GxsMsgReq::const_iterator mit = reqIds.begin(); for(auto mit = reqIds.begin(); mit != reqIds.end(); ++mit)
for(; mit != reqIds.end(); ++mit)
{ {
const RsGxsGroupId& grpId = mit->first; const RsGxsGroupId& grpId = mit->first;
@ -1216,9 +1216,9 @@ int RsDataService::retrieveNxsMsgs(const GxsMsgReq &reqIds, GxsMsgResult &msg, b
const std::set<RsGxsMessageId>& msgIdV = mit->second; const std::set<RsGxsMessageId>& msgIdV = mit->second;
std::vector<RsNxsMsg*> msgSet; std::vector<RsNxsMsg*> msgSet;
if(msgIdV.empty()){ if(msgIdV.empty())
{
RsStackMutex stack(mDbMutex); RS_STACK_MUTEX(mDbMutex);
RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, withMeta ? mMsgColumnsWithMeta : mMsgColumns, KEY_GRP_ID+ "='" + grpId.toStdString() + "'", ""); RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, withMeta ? mMsgColumnsWithMeta : mMsgColumns, KEY_GRP_ID+ "='" + grpId.toStdString() + "'", "");
@ -1228,16 +1228,17 @@ int RsDataService::retrieveNxsMsgs(const GxsMsgReq &reqIds, GxsMsgResult &msg, b
} }
delete c; delete c;
}else{ }
else
{
RS_STACK_MUTEX(mDbMutex);
// request each grp // request each grp
std::set<RsGxsMessageId>::const_iterator sit = msgIdV.begin(); for( std::set<RsGxsMessageId>::const_iterator sit = msgIdV.begin();
sit!=msgIdV.end();++sit )
for(; sit!=msgIdV.end();++sit){ {
const RsGxsMessageId& msgId = *sit; const RsGxsMessageId& msgId = *sit;
RsStackMutex stack(mDbMutex);
RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, withMeta ? mMsgColumnsWithMeta : mMsgColumns, KEY_GRP_ID+ "='" + grpId.toStdString() RetroCursor* c = mDb->sqlQuery(MSG_TABLE_NAME, withMeta ? mMsgColumnsWithMeta : mMsgColumns, KEY_GRP_ID+ "='" + grpId.toStdString()
+ "' AND " + KEY_MSG_ID + "='" + msgId.toStdString() + "'", ""); + "' AND " + KEY_MSG_ID + "='" + msgId.toStdString() + "'", "");

View File

@ -47,10 +47,14 @@ public:
* Retrieves all msgs * Retrieves all msgs
* @param reqIds requested msg ids (grpId,msgId), leave msg list empty to get all msgs for the grp * @param reqIds requested msg ids (grpId,msgId), leave msg list empty to get all msgs for the grp
* @param msg result of msg retrieval * @param msg result of msg retrieval
* @param cache whether to store results of this retrieval in memory for faster later retrieval * @param cache IGNORED whether to store results of this retrieval in memory
* for faster later retrieval
* @param strictFilter if true do not request any message if reqIds is empty
* @return error code * @return error code
*/ */
int retrieveNxsMsgs(const GxsMsgReq& reqIds, GxsMsgResult& msg, bool cache, bool withMeta = false); int retrieveNxsMsgs(
const GxsMsgReq& reqIds, GxsMsgResult& msg, bool cache,
bool withMeta = false );
/*! /*!
* Retrieves groups, if empty, retrieves all grps, if map is not empty * Retrieves groups, if empty, retrieves all grps, if map is not empty

View File

@ -137,16 +137,19 @@ public:
typedef std::map<RsNxsMsg*, RsGxsMsgMetaData*> MsgStoreMap; typedef std::map<RsNxsMsg*, RsGxsMsgMetaData*> MsgStoreMap;
RsGeneralDataService(){} RsGeneralDataService(){}
virtual ~RsGeneralDataService(){return;} virtual ~RsGeneralDataService(){}
/*! /*!
* Retrieves all msgs * Retrieves all msgs
* @param reqIds requested msg ids (grpId,msgId), leave msg list empty to get all msgs for the grp * @param reqIds requested msg ids (grpId,msgId), leave msg list empty to get all msgs for the grp
* @param msg result of msg retrieval * @param msg result of msg retrieval
* @param cache whether to store results of this retrieval in memory for faster later retrieval * @param cache whether to store results of this retrieval in memory for faster later retrieval
* @param strictFilter if true do not request any message if reqIds is empty
* @return error code * @return error code
*/ */
virtual int retrieveNxsMsgs(const GxsMsgReq& reqIds, GxsMsgResult& msg, bool cache, bool withMeta=false) = 0; virtual int retrieveNxsMsgs(
const GxsMsgReq& reqIds, GxsMsgResult& msg, bool cache,
bool withMeta = false ) = 0;
/*! /*!
* Retrieves all groups stored * Retrieves all groups stored

View File

@ -1036,29 +1036,42 @@ bool RsGxsDataAccess::getMsgData(MsgDataReq* req)
{ {
GxsMsgReq msgIdOut; GxsMsgReq msgIdOut;
const RsTokReqOptions& opts(req->Options);
// filter based on options // filter based on options
getMsgList(req->mMsgIds, req->Options, msgIdOut); getMsgList(req->mMsgIds, opts, msgIdOut);
// If the list is empty because of filtering do not retrieve from DB
if((opts.mMsgFlagMask || opts.mStatusMask) && msgIdOut.empty())
return true;
mDataStore->retrieveNxsMsgs(msgIdOut, req->mMsgData, true, true); mDataStore->retrieveNxsMsgs(msgIdOut, req->mMsgData, true, true);
return true; return true;
} }
bool RsGxsDataAccess::getMsgSummary(MsgMetaReq* req) bool RsGxsDataAccess::getMsgSummary(MsgMetaReq* req)
{ {
GxsMsgReq msgIdOut; GxsMsgReq msgIdOut;
// filter based on options const RsTokReqOptions& opts(req->Options);
getMsgList(req->mMsgIds, req->Options, msgIdOut);
mDataStore->retrieveGxsMsgMetaData(msgIdOut, req->mMsgMetaData); // filter based on options
getMsgList(req->mMsgIds, opts, msgIdOut);
// If the list is empty because of filtering do not retrieve from DB
if((opts.mMsgFlagMask || opts.mStatusMask) && msgIdOut.empty())
return true;
mDataStore->retrieveGxsMsgMetaData(msgIdOut, req->mMsgMetaData);
return true; return true;
} }
bool RsGxsDataAccess::getMsgList(const GxsMsgReq& msgIds, const RsTokReqOptions& opts, GxsMsgReq& msgIdsOut) bool RsGxsDataAccess::getMsgList(
const GxsMsgReq& msgIds, const RsTokReqOptions& opts,
GxsMsgReq& msgIdsOut )
{ {
GxsMsgMetaResult result; GxsMsgMetaResult result;
@ -1689,41 +1702,45 @@ void RsGxsDataAccess::cleanseMsgMetaMap(GxsMsgMetaResult& result)
return; return;
} }
void RsGxsDataAccess::filterMsgList(GxsMsgIdResult& msgIds, const RsTokReqOptions& opts, void RsGxsDataAccess::filterMsgList(
const MsgMetaFilter& msgMetas) const GxsMsgIdResult& resultsMap, const RsTokReqOptions& opts,
const MsgMetaFilter& msgMetas ) const
{ {
for( GxsMsgIdResult::iterator grpIt = resultsMap.begin();
GxsMsgIdResult::iterator mit = msgIds.begin(); grpIt != resultsMap.end(); ++grpIt )
for(;mit != msgIds.end(); ++mit)
{ {
const RsGxsGroupId& groupId(grpIt->first);
std::set<RsGxsMessageId>& msgsIdSet(grpIt->second);
MsgMetaFilter::const_iterator cit = msgMetas.find(mit->first); MsgMetaFilter::const_iterator cit = msgMetas.find(groupId);
if(cit == msgMetas.end()) continue;
if(cit == msgMetas.end()) std::cerr << __PRETTY_FUNCTION__ << " " << msgsIdSet.size()
continue; << " for group: " << groupId << " before filtering"
<< std::endl;
std::set<RsGxsMessageId>& msgs = mit->second; for( std::set<RsGxsMessageId>::iterator msgIdIt = msgsIdSet.begin();
std::set<RsGxsMessageId>::iterator vit = msgs.begin(); msgIdIt != msgsIdSet.end(); )
const std::map<RsGxsMessageId, RsGxsMsgMetaData*>& meta = cit->second;
std::map<RsGxsMessageId, RsGxsMsgMetaData*>::const_iterator cit2;
for(; vit != msgs.end();)
{ {
const RsGxsMessageId& msgId(*msgIdIt);
const std::map<RsGxsMessageId, RsGxsMsgMetaData*>& msgsMetaMap =
cit->second;
bool keep = false; bool keep = false;
if( (cit2 = meta.find(*vit)) != meta.end() ) std::map<RsGxsMessageId, RsGxsMsgMetaData*>::const_iterator msgsMetaMapIt;
if( (msgsMetaMapIt = msgsMetaMap.find(msgId)) != msgsMetaMap.end() )
{ {
keep = checkMsgFilter(opts, cit2->second); keep = checkMsgFilter(opts, msgsMetaMapIt->second);
} }
if(keep) if(keep) ++msgIdIt;
{ else msgIdIt = msgsIdSet.erase(msgIdIt);
++vit;
}else
{
vit = msgs.erase(vit);
}
} }
std::cerr << __PRETTY_FUNCTION__ << " " << msgsIdSet.size()
<< " for group: " << groupId << " after filtering"
<< std::endl;
} }
} }
@ -1901,62 +1918,87 @@ bool RsGxsDataAccess::checkGrpFilter(const RsTokReqOptions &opts, const RsGxsGrp
return subscribeMatch; return subscribeMatch;
} }
bool RsGxsDataAccess::checkMsgFilter(const RsTokReqOptions& opts, const RsGxsMsgMetaData* meta) const bool RsGxsDataAccess::checkMsgFilter(
const RsTokReqOptions& opts, const RsGxsMsgMetaData* meta ) const
{ {
bool statusMatch = false; if (opts.mStatusMask)
if (opts.mStatusMask)
{ {
// Exact Flags match required. // Exact Flags match required.
if ((opts.mStatusMask & opts.mStatusFilter) == (opts.mStatusMask & meta->mMsgStatus)) if ( (opts.mStatusMask & opts.mStatusFilter) ==
(opts.mStatusMask & meta->mMsgStatus) )
{ {
std::cerr << "checkMsgFilter() Accepting Msg as StatusMatches: "; #ifdef DATA_DEBUG
std::cerr << " Mask: " << opts.mStatusMask << " StatusFilter: " << opts.mStatusFilter; std::cerr << __PRETTY_FUNCTION__
std::cerr << " MsgStatus: " << meta->mMsgStatus << " MsgId: " << meta->mMsgId; << " Continue checking Msg as StatusMatches: "
std::cerr << std::endl; << " Mask: " << opts.mStatusMask
<< " StatusFilter: " << opts.mStatusFilter
statusMatch = true; << " MsgStatus: " << meta->mMsgStatus
<< " MsgId: " << meta->mMsgId << std::endl;
#endif
} }
else else
{ {
std::cerr << "checkMsgFilter() Dropping Msg due to !StatusMatch "; #ifdef DATA_DEBUG
std::cerr << " Mask: " << opts.mStatusMask << " StatusFilter: " << opts.mStatusFilter; std::cerr << __PRETTY_FUNCTION__
std::cerr << " MsgStatus: " << meta->mMsgStatus << " MsgId: " << meta->mMsgId; << " Dropping Msg due to !StatusMatch "
std::cerr << std::endl; << " Mask: " << opts.mStatusMask
<< " StatusFilter: " << opts.mStatusFilter
<< " MsgStatus: " << meta->mMsgStatus
<< " MsgId: " << meta->mMsgId << std::endl;
#endif
return false;
} }
} }
else else
{ {
// no status comparision, #ifdef DATA_DEBUG
statusMatch = true; std::cerr << __PRETTY_FUNCTION__
<< " Status check not requested"
<< " mStatusMask: " << opts.mStatusMask
<< " MsgId: " << meta->mMsgId << std::endl;
#endif
} }
bool flagMatch = false; if(opts.mMsgFlagMask)
{
// Exact Flags match required.
if ( (opts.mMsgFlagMask & opts.mMsgFlagFilter) ==
(opts.mMsgFlagMask & meta->mMsgFlags) )
{
#ifdef DATA_DEBUG
std::cerr << __PRETTY_FUNCTION__
<< " Accepting Msg as FlagMatches: "
<< " Mask: " << opts.mMsgFlagMask
<< " FlagFilter: " << opts.mMsgFlagFilter
<< " MsgFlag: " << meta->mMsgFlags
<< " MsgId: " << meta->mMsgId << std::endl;
#endif
}
else
{
#ifdef DATA_DEBUG
std::cerr << __PRETTY_FUNCTION__
<< " Dropping Msg due to !FlagMatch "
<< " Mask: " << opts.mMsgFlagMask
<< " FlagFilter: " << opts.mMsgFlagFilter
<< " MsgFlag: " << meta->mMsgFlags
<< " MsgId: " << meta->mMsgId << std::endl;
#endif
if(opts.mMsgFlagMask) return false;
{ }
// Exact Flags match required. }
if ((opts.mMsgFlagMask & opts.mMsgFlagFilter) == (opts.mMsgFlagMask & meta->mMsgFlags)) else
{ {
std::cerr << "checkMsgFilter() Accepting Msg as FlagMatches: "; #ifdef DATA_DEBUG
std::cerr << " Mask: " << opts.mMsgFlagMask << " FlagFilter: " << opts.mMsgFlagFilter; std::cerr << __PRETTY_FUNCTION__
std::cerr << " MsgFlag: " << meta->mMsgFlags << " MsgId: " << meta->mMsgId; << " Flags check not requested"
std::cerr << std::endl; << " mMsgFlagMask: " << opts.mMsgFlagMask
<< " MsgId: " << meta->mMsgId << std::endl;
#endif
}
flagMatch = true; return true;
}
else
{
std::cerr << "checkMsgFilter() Dropping Msg due to !FlagMatch ";
std::cerr << " Mask: " << opts.mMsgFlagMask << " FlagFilter: " << opts.mMsgFlagFilter;
std::cerr << " MsgFlag: " << meta->mMsgFlags << " MsgId: " << meta->mMsgId;
std::cerr << std::endl;
flagMatch = false;
}
}else{
flagMatch = true;
}
return statusMatch && flagMatch;
} }

View File

@ -858,7 +858,7 @@ void p3GxsChannels::request_GroupUnprocessedPosts(const std::list<RsGxsGroupId>
} }
void p3GxsChannels::load_SpecificUnprocessedPosts(uint32_t token) void p3GxsChannels::load_unprocessedPosts(uint32_t token)
{ {
#ifdef GXSCHANNELS_DEBUG #ifdef GXSCHANNELS_DEBUG
std::cerr << "p3GxsChannels::load_SpecificUnprocessedPosts" << std::endl; std::cerr << "p3GxsChannels::load_SpecificUnprocessedPosts" << std::endl;
@ -880,32 +880,6 @@ void p3GxsChannels::load_SpecificUnprocessedPosts(uint32_t token)
} }
} }
void p3GxsChannels::load_GroupUnprocessedPosts(const uint32_t &token)
{
#ifdef GXSCHANNELS_DEBUG
std::cerr << "p3GxsChannels::load_GroupUnprocessedPosts";
std::cerr << std::endl;
#endif
std::vector<RsGxsChannelPost> posts;
if (!getPostData(token, posts))
{
#ifdef GXSCHANNELS_DEBUG
std::cerr << "p3GxsChannels::load_GroupUnprocessedPosts ERROR";
std::cerr << std::endl;
#endif
return;
}
std::vector<RsGxsChannelPost>::iterator it;
for(it = posts.begin(); it != posts.end(); ++it)
{
handleUnprocessedPost(*it);
}
}
void p3GxsChannels::handleUnprocessedPost(const RsGxsChannelPost &msg) void p3GxsChannels::handleUnprocessedPost(const RsGxsChannelPost &msg)
{ {
#ifdef GXSCHANNELS_DEBUG #ifdef GXSCHANNELS_DEBUG
@ -915,8 +889,8 @@ void p3GxsChannels::handleUnprocessedPost(const RsGxsChannelPost &msg)
if (!IS_MSG_UNPROCESSED(msg.mMeta.mMsgStatus)) if (!IS_MSG_UNPROCESSED(msg.mMeta.mMsgStatus))
{ {
std::cerr << __PRETTY_FUNCTION__ << " ERROR Msg already Processed!" std::cerr << __PRETTY_FUNCTION__ << " ERROR Msg already Processed! "
<< std::endl; << "mMsgId: " << msg.mMeta.mMsgId << std::endl;
return; return;
} }
@ -999,19 +973,18 @@ void p3GxsChannels::handleResponse(uint32_t token, uint32_t req_type)
load_SubscribedGroups(token); load_SubscribedGroups(token);
break; break;
case GXSCHANNELS_UNPROCESSED_SPECIFIC: case GXSCHANNELS_UNPROCESSED_SPECIFIC:
load_SpecificUnprocessedPosts(token); load_unprocessedPosts(token);
break; break;
case GXSCHANNELS_UNPROCESSED_GENERIC: case GXSCHANNELS_UNPROCESSED_GENERIC:
load_SpecificUnprocessedPosts(token); load_unprocessedPosts(token);
break; break;
default: default:
/* error */ std::cerr << __PRETTY_FUNCTION__ << "ERROR Unknown Request Type: "
std::cerr << "p3GxsService::handleResponse() Unknown Request Type: " << req_type; << req_type << std::endl;
std::cerr << std::endl; break;
break;
} }
} }

View File

@ -220,10 +220,8 @@ static uint32_t channelsAuthenPolicy();
void load_SubscribedGroups(const uint32_t &token); void load_SubscribedGroups(const uint32_t &token);
void request_SpecificUnprocessedPosts(std::list<std::pair<RsGxsGroupId, RsGxsMessageId> > &ids); void request_SpecificUnprocessedPosts(std::list<std::pair<RsGxsGroupId, RsGxsMessageId> > &ids);
void load_SpecificUnprocessedPosts(uint32_t token);
void request_GroupUnprocessedPosts(const std::list<RsGxsGroupId> &grouplist); void request_GroupUnprocessedPosts(const std::list<RsGxsGroupId> &grouplist);
void load_GroupUnprocessedPosts(const uint32_t &token); void load_unprocessedPosts(uint32_t token);
void handleUnprocessedPost(const RsGxsChannelPost &msg); void handleUnprocessedPost(const RsGxsChannelPost &msg);