added more debug output

This commit is contained in:
csoler 2021-01-04 21:24:06 +01:00
parent d0dffaa2a4
commit 94afc17629
5 changed files with 324 additions and 237 deletions

View file

@ -135,7 +135,7 @@ public:
* @param withMeta true will also retrieve metadata * @param withMeta true will also retrieve metadata
* @return error code * @return error code
*/ */
int retrieveNxsMsgs(const GxsMsgReq& reqIds, GxsMsgResult& msg, bool withMeta = false); int retrieveNxsMsgs(const GxsMsgReq& reqIds, GxsMsgResult& msg, bool withMeta = false) override;
/*! /*!
* Retrieves groups, if empty, retrieves all grps, if map is not empty * Retrieves groups, if empty, retrieves all grps, if map is not empty
@ -144,14 +144,14 @@ public:
* @param withMeta this initialise the metaData member of the nxsgroups retrieved * @param withMeta this initialise the metaData member of the nxsgroups retrieved
* @return error code * @return error code
*/ */
int retrieveNxsGrps(std::map<RsGxsGroupId, RsNxsGrp*>& grp, bool withMeta); int retrieveNxsGrps(std::map<RsGxsGroupId, RsNxsGrp*>& grp, bool withMeta) override;
/*! /*!
* Retrieves meta data of all groups stored (most current versions only) * Retrieves meta data of all groups stored (most current versions only)
* @param grp output group meta data * @param grp output group meta data
* @return error code * @return error code
*/ */
int retrieveGxsGrpMetaData(std::map<RsGxsGroupId, std::shared_ptr<RsGxsGrpMetaData> > &grp); int retrieveGxsGrpMetaData(std::map<RsGxsGroupId, std::shared_ptr<RsGxsGrpMetaData> > &grp) override;
/*! /*!
* Retrieves meta data of all groups stored (most current versions only) * Retrieves meta data of all groups stored (most current versions only)
@ -159,7 +159,7 @@ public:
* @param msgMeta meta data result as map of grpIds to array of metadata for that grpId * @param msgMeta meta data result as map of grpIds to array of metadata for that grpId
* @return error code * @return error code
*/ */
int retrieveGxsMsgMetaData(const GxsMsgReq& reqIds, GxsMsgMetaResult& msgMeta); int retrieveGxsMsgMetaData(const GxsMsgReq& reqIds, GxsMsgMetaResult& msgMeta) override;
/*! /*!
* remove msgs in data store * remove msgs in data store
@ -167,21 +167,21 @@ public:
* @param msgIds ids of messages to be removed * @param msgIds ids of messages to be removed
* @return error code * @return error code
*/ */
int removeMsgs(const GxsMsgReq& msgIds); int removeMsgs(const GxsMsgReq& msgIds) override;
/*! /*!
* remove groups in data store listed in grpIds param * remove groups in data store listed in grpIds param
* @param grpIds ids of groups to be removed * @param grpIds ids of groups to be removed
* @return error code * @return error code
*/ */
int removeGroups(const std::vector<RsGxsGroupId>& grpIds); int removeGroups(const std::vector<RsGxsGroupId>& grpIds) override;
/*! /*!
* Retrieves all group ids in store * Retrieves all group ids in store
* @param grpIds all grpids in store is inserted into this vector * @param grpIds all grpids in store is inserted into this vector
* @return error code * @return error code
*/ */
int retrieveGroupIds(std::vector<RsGxsGroupId> &grpIds); int retrieveGroupIds(std::vector<RsGxsGroupId> &grpIds) override;
/*! /*!
* Retrives all msg ids in store * Retrives all msg ids in store
@ -189,50 +189,57 @@ public:
* @param msgId msgsids retrieved * @param msgId msgsids retrieved
* @return error code * @return error code
*/ */
int retrieveMsgIds(const RsGxsGroupId& grpId, RsGxsMessageId::std_set& msgId); int retrieveMsgIds(const RsGxsGroupId& grpId, RsGxsMessageId::std_set& msgId) override;
/*! /*!
* @return the cache size set for this RsGeneralDataService in bytes * @return the cache size set for this RsGeneralDataService in bytes
*/ */
uint32_t cacheSize() const; uint32_t cacheSize() const override;
/*!
* \brief serviceType
* \return
* The service type for the current data service.
*/
virtual uint16_t serviceType() const override { return mServType; }
/*! /*!
* @param size size of cache to set in bytes * @param size size of cache to set in bytes
*/ */
int setCacheSize(uint32_t size); int setCacheSize(uint32_t size) override;
/*! /*!
* Stores a list of signed messages into data store * Stores a list of signed messages into data store
* @param msg map of message and decoded meta data information * @param msg map of message and decoded meta data information
* @return error code * @return error code
*/ */
int storeMessage(const std::list<RsNxsMsg*>& msg); int storeMessage(const std::list<RsNxsMsg*>& msg) override;
/*! /*!
* Stores a list of groups in data store * Stores a list of groups in data store
* @param grp map of group and decoded meta data * @param grp map of group and decoded meta data
* @return error code * @return error code
*/ */
int storeGroup(const std::list<RsNxsGrp*>& grp); int storeGroup(const std::list<RsNxsGrp*>& grp) override;
/*! /*!
* Updates group entries in Db * Updates group entries in Db
* @param grp map of group and decoded meta data * @param grp map of group and decoded meta data
* @return error code * @return error code
*/ */
int updateGroup(const std::list<RsNxsGrp*>& grsp); int updateGroup(const std::list<RsNxsGrp*>& grsp) override;
/*! /*!
* @param metaData The meta data item to update * @param metaData The meta data item to update
* @return error code * @return error code
*/ */
int updateMessageMetaData(const MsgLocMetaData& metaData); int updateMessageMetaData(const MsgLocMetaData& metaData) override;
/*! /*!
* @param metaData The meta data item to update * @param metaData The meta data item to update
* @return error code * @return error code
*/ */
int updateGroupMetaData(const GrpLocMetaData &meta); int updateGroupMetaData(const GrpLocMetaData &meta) override;
/*! /*!
* Completely clear out data stored in * Completely clear out data stored in
@ -240,10 +247,10 @@ public:
* as it was when first constructed * as it was when first constructed
* @return error code * @return error code
*/ */
int resetDataStore(); int resetDataStore() override;
bool validSize(RsNxsMsg* msg) const; bool validSize(RsNxsMsg* msg) const override;
bool validSize(RsNxsGrp* grp) const; bool validSize(RsNxsGrp* grp) const override;
/*! /*!
* Convenience function used to only update group keys. This is used when sending * Convenience function used to only update group keys. This is used when sending
@ -251,7 +258,7 @@ public:
* @return SQL error code * @return SQL error code
*/ */
int updateGroupKeys(const RsGxsGroupId& grpId,const RsTlvSecurityKeySet& keys, uint32_t subscribe_flags) ; int updateGroupKeys(const RsGxsGroupId& grpId,const RsTlvSecurityKeySet& keys, uint32_t subscribe_flags) override;
void debug_printCacheSize() ; void debug_printCacheSize() ;

View file

@ -206,6 +206,13 @@ public:
*/ */
virtual uint32_t cacheSize() const = 0; virtual uint32_t cacheSize() const = 0;
/*!
* \brief serviceType
* \return
* The service type for the current data service.
*/
virtual uint16_t serviceType() const = 0 ;
/*! /*!
* @param size size of cache to set in bytes * @param size size of cache to set in bytes
*/ */

View file

@ -59,8 +59,8 @@ static const uint32_t INDEX_AUTHEN_IDENTITY = 0x00000010; // identity
static const uint32_t INDEX_AUTHEN_PUBLISH = 0x00000020; // publish key static const uint32_t INDEX_AUTHEN_PUBLISH = 0x00000020; // publish key
static const uint32_t INDEX_AUTHEN_ADMIN = 0x00000040; // admin key static const uint32_t INDEX_AUTHEN_ADMIN = 0x00000040; // admin key
static const uint32_t MSG_CLEANUP_PERIOD = 60*59; // 59 minutes static const uint32_t MSG_CLEANUP_PERIOD = 60*5; // 59 minutes
static const uint32_t INTEGRITY_CHECK_PERIOD = 60*31; // 31 minutes static const uint32_t INTEGRITY_CHECK_PERIOD = 60*3; // 31 minutes
#define GXS_MASK "GXS_MASK_HACK" #define GXS_MASK "GXS_MASK_HACK"

View file

@ -210,245 +210,80 @@ void RsGxsIntegrityCheck::run()
std::vector<RsGxsGroupId> grps_to_delete; std::vector<RsGxsGroupId> grps_to_delete;
GxsMsgReq msgs_to_delete; GxsMsgReq msgs_to_delete;
check(mGenExchangeClient->serviceType(), mGixs, mDs check(mGenExchangeClient->serviceType(), mGixs, mDs);
#ifdef RS_DEEP_CHANNEL_INDEX
, mGenExchangeClient, mSerializer
#endif
, mDeletedGrps, mDeletedMsgs);
RS_STACK_MUTEX(mIntegrityMutex); RS_STACK_MUTEX(mIntegrityMutex);
mDone = true; mDone = true;
} }
bool RsGxsIntegrityCheck::check(uint16_t service_type, RsGixs *mgixs, RsGeneralDataService *mds bool RsGxsIntegrityCheck::check(uint16_t service_type, RsGixs *mgixs, RsGeneralDataService *mds)
#ifdef RS_DEEP_CHANNEL_INDEX
, RsGenExchange* mGenExchangeClient, RsSerialType& mSerializer
#endif
, std::vector<RsGxsGroupId>& grpsToDel, GxsMsgReq& msgsToDel)
{ {
#ifdef RS_DEEP_CHANNEL_INDEX #ifdef DEBUG_GXSUTIL
bool isGxsChannels = mGenExchangeClient->serviceType() == RS_SERVICE_GXS_TYPE_CHANNELS; GXSUTIL_DEBUG() << "Parsing all groups and messages MetaData in service " << std::hex << mds->serviceType() << std::endl;
std::set<RsGxsGroupId> indexedGroups;
#endif #endif
// first take out all the groups // first take out all the groups
std::map<RsGxsGroupId, RsNxsGrp*> grp; std::map<RsGxsGroupId, std::shared_ptr<RsGxsGrpMetaData> > grp;
mds->retrieveNxsGrps(grp, true);
mds->retrieveGxsGrpMetaData(grp);
GxsMsgReq msgIds; GxsMsgReq msgIds;
GxsMsgReq grps;
std::map<RsGxsId,RsIdentityUsage> used_gxs_ids ; std::map<RsGxsId,RsIdentityUsage> used_gxs_ids ;
std::set<RsGxsGroupId> subscribed_groups ; std::set<RsGxsGroupId> subscribed_groups ;
// compute hash and compare to stored value, if it fails then simply add it // Check that message ids...
// to list
for( std::map<RsGxsGroupId, RsNxsGrp*>::iterator git = grp.begin(); git != grp.end(); ++git )
{
RsNxsGrp* grp = git->second;
RsFileHash currHash;
pqihash pHash;
pHash.addData(grp->grp.bin_data, grp->grp.bin_len);
pHash.Complete(currHash);
if(currHash == grp->metaData->mHash) for( auto git = grp.begin(); git != grp.end(); ++git )
{
// Get all message ids of group, store them in msgIds, creating the grp entry at the same time.
if (mds->retrieveMsgIds(grp->grpId, msgIds[grp->grpId]) == 1)
{
// store the group for retrieveNxsMsgs
grps[grp->grpId];
if(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED)
{
subscribed_groups.insert(git->first);
if(!grp->metaData->mAuthorId.isNull())
{
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << "TimeStamping group authors' key ID " << grp->metaData->mAuthorId << " in group ID " << grp->grpId << std::endl;
#endif
if( rsReputations &&
rsReputations->overallReputationLevel(
grp->metaData->mAuthorId ) >
RsReputationLevel::LOCALLY_NEGATIVE )
used_gxs_ids.insert(std::make_pair(grp->metaData->mAuthorId, RsIdentityUsage(RsServiceType(service_type), RsIdentityUsage::GROUP_AUTHOR_KEEP_ALIVE,grp->grpId)));
}
}
}
else
msgIds.erase(msgIds.find(grp->grpId)); // could not get them, so group is removed from list.
#ifdef RS_DEEP_CHANNEL_INDEX
// This should be moved to p3gxschannels. It is really not the place for this here!
if( isGxsChannels
&& grp->metaData->mCircleType == GXS_CIRCLE_TYPE_PUBLIC
&& grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED )
{
RsGxsGrpMetaData meta;
meta.deserialise(grp->meta.bin_data, grp->meta.bin_len);
uint32_t blz = grp->grp.bin_len;
RsItem* rIt = mSerializer.deserialise(grp->grp.bin_data,
&blz);
if( RsGxsChannelGroupItem* cgIt =
dynamic_cast<RsGxsChannelGroupItem*>(rIt) )
{
RsGxsChannelGroup cg;
cgIt->toChannelGroup(cg, false);
cg.mMeta = meta;
indexedGroups.insert(grp->grpId);
DeepChannelsIndex::indexChannelGroup(cg);
}
else
{
std::cerr << __PRETTY_FUNCTION__ << " Group: "
<< meta.mGroupId.toStdString() << " "
<< meta.mGroupName
<< " doesn't seems a channel, please "
<< "report to developers"
<< std::endl;
print_stacktrace();
}
delete rIt;
}
#endif // def RS_DEEP_CHANNEL_INDEX
}
else
{
std::cerr << __PRETTY_FUNCTION__ <<" (EE) deleting group " << grp->grpId << " with wrong hash or null/corrupted meta data. meta=" << grp->metaData << std::endl;
grpsToDel.push_back(grp->grpId);
#ifdef RS_DEEP_CHANNEL_INDEX
if(isGxsChannels)
DeepChannelsIndex::removeChannelFromIndex(grp->grpId);
#endif // def RS_DEEP_CHANNEL_INDEX
}
delete grp;
}
// now messages
GxsMsgResult msgs;
mds->retrieveNxsMsgs(grps, msgs, true);
// Check msg ids and messages. Go through all message IDs referred to by the db call
// and verify that the message belongs to the nxs msg data that was just retrieved.
for(auto& msgIdsIt:msgIds)
{ {
const RsGxsGroupId& grpId = msgIdsIt.first; const auto& grpMeta = git->second;
std::set<RsGxsMessageId>& msgIdV = msgIdsIt.second;
std::set<RsGxsMessageId> nxsMsgS; if (mds->retrieveMsgIds(grpMeta->mGroupId, msgIds[grpMeta->mGroupId]) == 1)
std::vector<RsNxsMsg*>& nxsMsgV = msgs[grpId]; {
if(grpMeta->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED)
{
subscribed_groups.insert(git->first);
// To make the search efficient, we first build a set of msgIds to search in. if(!grpMeta->mAuthorId.isNull())
// Set build and search are both O(n log(n)). {
#ifdef DEBUG_GXSUTIL
for(auto& nxsMsg:nxsMsgV) GXSUTIL_DEBUG() << "TimeStamping group authors' key ID " << grpMeta->mAuthorId << " in group ID " << grpMeta->mGroupId << std::endl;
if(nxsMsg) #endif
nxsMsgS.insert(nxsMsg->msgId); if( rsReputations && rsReputations->overallReputationLevel( grpMeta->mAuthorId ) > RsReputationLevel::LOCALLY_NEGATIVE )
used_gxs_ids.insert(std::make_pair(grpMeta->mAuthorId, RsIdentityUsage(RsServiceType(service_type), RsIdentityUsage::GROUP_AUTHOR_KEEP_ALIVE,grpMeta->mGroupId)));
for (auto& msgId:msgIdV) }
if(nxsMsgS.find(msgId) == nxsMsgS.end()) }
{ }
msgsToDel[grpId].insert(msgId); else
#ifdef RS_DEEP_CHANNEL_INDEX msgIds.erase(msgIds.find(grpMeta->mGroupId)); // could not get them, so group is removed from list.
if(isGxsChannels)
DeepChannelsIndex::removeChannelPostFromIndex(grpId, msgId);
#endif // def RS_DEEP_CHANNEL_INDEX
}
} }
GxsMsgResult::iterator mit = msgs.begin(); // now messages
for(; mit != msgs.end(); ++mit) GxsMsgMetaResult msgMetas;
mds->retrieveGxsMsgMetaData(msgIds, msgMetas);
for(auto mit=msgMetas.begin(); mit != msgMetas.end(); ++mit)
{ {
std::vector<RsNxsMsg*>& msgV = mit->second; const auto& msgM = mit->second;
std::vector<RsNxsMsg*>::iterator vit = msgV.begin();
for(; vit != msgV.end(); ++vit) for(auto vit=msgM.begin(); vit != msgM.end(); ++vit)
{ {
RsNxsMsg* msg = *vit; const auto& meta = *vit;
RsFileHash currHash;
pqihash pHash;
pHash.addData(msg->msg.bin_data, msg->msg.bin_len);
pHash.Complete(currHash);
if(msg->metaData == NULL || currHash != msg->metaData->mHash) if (subscribed_groups.count(meta->mGroupId))
{ if(meta->mAuthorId.isNull())
std::cerr << __PRETTY_FUNCTION__ <<" (EE) deleting message " << msg->msgId << " in group " << msg->grpId << " with wrong hash or null/corrupted meta data. meta=" << (void*)msg->metaData << std::endl;
msgsToDel[msg->grpId].insert(msg->msgId);
#ifdef RS_DEEP_CHANNEL_INDEX
if(isGxsChannels)
DeepChannelsIndex::removeChannelPostFromIndex(
msg->grpId, msg->msgId );
#endif // def RS_DEEP_CHANNEL_INDEX
}
else if (subscribed_groups.count(msg->metaData->mGroupId))
{
#ifdef RS_DEEP_CHANNEL_INDEX
// This should be moved to p3gxschannels. It is really not the place for this here!
if( isGxsChannels && indexedGroups.count(msg->metaData->mGroupId) )
{
RsGxsMsgMetaData meta;
meta.deserialise(msg->meta.bin_data, &msg->meta.bin_len);
uint32_t blz = msg->msg.bin_len;
RsItem* rIt = mSerializer.deserialise(msg->msg.bin_data,
&blz);
if( RsGxsChannelPostItem* cgIt =
dynamic_cast<RsGxsChannelPostItem*>(rIt) )
{
RsGxsChannelPost cg;
cgIt->toChannelPost(cg, false);
cg.mMeta = meta;
DeepChannelsIndex::indexChannelPost(cg);
}
else if(dynamic_cast<RsGxsCommentItem*>(rIt)) {}
else if(dynamic_cast<RsGxsVoteItem*>(rIt)) {}
else
{
std::cerr << __PRETTY_FUNCTION__ << " Message: "
<< meta.mMsgId.toStdString()
<< " in group: "
<< meta.mGroupId.toStdString() << " "
<< " doesn't seems a channel post, please "
<< "report to developers"
<< std::endl;
print_stacktrace();
}
delete rIt;
}
#endif // def RS_DEEP_CHANNEL_INDEX
if(!msg->metaData->mAuthorId.isNull())
{ {
#ifdef DEBUG_GXSUTIL #ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << "TimeStamping message authors' key ID " << msg->metaData->mAuthorId << " in message " << msg->msgId << ", group ID " << msg->grpId<< std::endl; GXSUTIL_DEBUG() << "TimeStamping message authors' key ID " << meta->mAuthorId << " in message " << meta->mMsgId << ", group ID " << meta->mGroupId<< std::endl;
#endif #endif
if( rsReputations && if( rsReputations && rsReputations->overallReputationLevel( meta->mAuthorId ) > RsReputationLevel::LOCALLY_NEGATIVE )
rsReputations->overallReputationLevel( used_gxs_ids.insert(std::make_pair(meta->mAuthorId,RsIdentityUsage(RsServiceType(service_type),
msg->metaData->mAuthorId ) >
RsReputationLevel::LOCALLY_NEGATIVE )
used_gxs_ids.insert(std::make_pair(msg->metaData->mAuthorId,RsIdentityUsage(RsServiceType(service_type),
RsIdentityUsage::MESSAGE_AUTHOR_KEEP_ALIVE, RsIdentityUsage::MESSAGE_AUTHOR_KEEP_ALIVE,
msg->metaData->mGroupId, meta->mGroupId,
msg->metaData->mMsgId, meta->mMsgId,
msg->metaData->mParentId, meta->mParentId,
msg->metaData->mThreadId))) ; meta->mThreadId))) ;
} }
}
delete msg;
} }
} }
@ -513,6 +348,211 @@ bool RsGxsIntegrityCheck::check(uint16_t service_type, RsGixs *mgixs, RsGeneralD
return true; return true;
} }
bool RsGxsSinglePassIntegrityCheck::check(uint16_t service_type, RsGixs *mgixs, RsGeneralDataService *mds
#ifdef RS_DEEP_CHANNEL_INDEX
, RsGenExchange* mGenExchangeClient, RsSerialType& mSerializer
#endif
, std::vector<RsGxsGroupId>& grpsToDel, GxsMsgReq& msgsToDel)
{
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << "Parsing all groups and messages data in service " << std::hex << mds->serviceType() << " for integrity check. Could take a while..." << std::endl;
#endif
#ifdef RS_DEEP_CHANNEL_INDEX
bool isGxsChannels = mGenExchangeClient->serviceType() == RS_SERVICE_GXS_TYPE_CHANNELS;
std::set<RsGxsGroupId> indexedGroups;
#endif
// first take out all the groups
std::map<RsGxsGroupId, RsNxsGrp*> grp;
mds->retrieveNxsGrps(grp, true);
GxsMsgReq msgIds;
GxsMsgReq grps;
std::map<RsGxsId,RsIdentityUsage> used_gxs_ids ;
std::set<RsGxsGroupId> subscribed_groups ;
// compute hash and compare to stored value, if it fails then simply add it
// to list
for( std::map<RsGxsGroupId, RsNxsGrp*>::iterator git = grp.begin(); git != grp.end(); ++git )
{
RsNxsGrp* grp = git->second;
RsFileHash currHash;
pqihash pHash;
pHash.addData(grp->grp.bin_data, grp->grp.bin_len);
pHash.Complete(currHash);
if(currHash == grp->metaData->mHash)
{
// Get all message ids of group, store them in msgIds, creating the grp entry at the same time.
if (mds->retrieveMsgIds(grp->grpId, msgIds[grp->grpId]) == 1)
{
// store the group for retrieveNxsMsgs
grps[grp->grpId];
if(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED)
subscribed_groups.insert(git->first);
}
else
msgIds.erase(msgIds.find(grp->grpId)); // could not get them, so group is removed from list.
#ifdef RS_DEEP_CHANNEL_INDEX
// This should be moved to p3gxschannels. It is really not the place for this here!
if( isGxsChannels
&& grp->metaData->mCircleType == GXS_CIRCLE_TYPE_PUBLIC
&& grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED )
{
RsGxsGrpMetaData meta;
meta.deserialise(grp->meta.bin_data, grp->meta.bin_len);
uint32_t blz = grp->grp.bin_len;
RsItem* rIt = mSerializer.deserialise(grp->grp.bin_data,
&blz);
if( RsGxsChannelGroupItem* cgIt =
dynamic_cast<RsGxsChannelGroupItem*>(rIt) )
{
RsGxsChannelGroup cg;
cgIt->toChannelGroup(cg, false);
cg.mMeta = meta;
indexedGroups.insert(grp->grpId);
DeepChannelsIndex::indexChannelGroup(cg);
}
else
{
std::cerr << __PRETTY_FUNCTION__ << " Group: "
<< meta.mGroupId.toStdString() << " "
<< meta.mGroupName
<< " doesn't seems a channel, please "
<< "report to developers"
<< std::endl;
print_stacktrace();
}
delete rIt;
}
#endif // def RS_DEEP_CHANNEL_INDEX
}
else
{
std::cerr << __PRETTY_FUNCTION__ <<" (EE) deleting group " << grp->grpId << " with wrong hash or null/corrupted meta data. meta=" << grp->metaData << std::endl;
grpsToDel.push_back(grp->grpId);
#ifdef RS_DEEP_CHANNEL_INDEX
if(isGxsChannels)
DeepChannelsIndex::removeChannelFromIndex(grp->grpId);
#endif // def RS_DEEP_CHANNEL_INDEX
}
delete grp;
}
// now messages
GxsMsgResult msgs;
mds->retrieveNxsMsgs(grps, msgs, true);
// Check msg ids and messages. Go through all message IDs referred to by the db call
// and verify that the message belongs to the nxs msg data that was just retrieved.
for(auto& msgIdsIt:msgIds)
{
const RsGxsGroupId& grpId = msgIdsIt.first;
std::set<RsGxsMessageId>& msgIdV = msgIdsIt.second;
std::set<RsGxsMessageId> nxsMsgS;
std::vector<RsNxsMsg*>& nxsMsgV = msgs[grpId];
// To make the search efficient, we first build a set of msgIds to search in.
// Set build and search are both O(n log(n)).
for(auto& nxsMsg:nxsMsgV)
if(nxsMsg)
nxsMsgS.insert(nxsMsg->msgId);
for (auto& msgId:msgIdV)
if(nxsMsgS.find(msgId) == nxsMsgS.end())
{
msgsToDel[grpId].insert(msgId);
#ifdef RS_DEEP_CHANNEL_INDEX
if(isGxsChannels)
DeepChannelsIndex::removeChannelPostFromIndex(grpId, msgId);
#endif // def RS_DEEP_CHANNEL_INDEX
}
}
for(auto mit = msgs.begin(); mit != msgs.end(); ++mit)
{
std::vector<RsNxsMsg*>& msgV = mit->second;
std::vector<RsNxsMsg*>::iterator vit = msgV.begin();
for(; vit != msgV.end(); ++vit)
{
RsNxsMsg* msg = *vit;
RsFileHash currHash;
pqihash pHash;
pHash.addData(msg->msg.bin_data, msg->msg.bin_len);
pHash.Complete(currHash);
if(msg->metaData == NULL || currHash != msg->metaData->mHash)
{
std::cerr << __PRETTY_FUNCTION__ <<" (EE) deleting message " << msg->msgId << " in group " << msg->grpId << " with wrong hash or null/corrupted meta data. meta=" << (void*)msg->metaData << std::endl;
msgsToDel[msg->grpId].insert(msg->msgId);
#ifdef RS_DEEP_CHANNEL_INDEX
if(isGxsChannels)
DeepChannelsIndex::removeChannelPostFromIndex(
msg->grpId, msg->msgId );
#endif // def RS_DEEP_CHANNEL_INDEX
}
else if (subscribed_groups.count(msg->metaData->mGroupId))
{
#ifdef RS_DEEP_CHANNEL_INDEX
// This should be moved to p3gxschannels. It is really not the place for this here!
if( isGxsChannels && indexedGroups.count(msg->metaData->mGroupId) )
{
RsGxsMsgMetaData meta;
meta.deserialise(msg->meta.bin_data, &msg->meta.bin_len);
uint32_t blz = msg->msg.bin_len;
RsItem* rIt = mSerializer.deserialise(msg->msg.bin_data,
&blz);
if( RsGxsChannelPostItem* cgIt =
dynamic_cast<RsGxsChannelPostItem*>(rIt) )
{
RsGxsChannelPost cg;
cgIt->toChannelPost(cg, false);
cg.mMeta = meta;
DeepChannelsIndex::indexChannelPost(cg);
}
else if(dynamic_cast<RsGxsCommentItem*>(rIt)) {}
else if(dynamic_cast<RsGxsVoteItem*>(rIt)) {}
else
{
std::cerr << __PRETTY_FUNCTION__ << " Message: "
<< meta.mMsgId.toStdString()
<< " in group: "
<< meta.mGroupId.toStdString() << " "
<< " doesn't seems a channel post, please "
<< "report to developers"
<< std::endl;
print_stacktrace();
}
delete rIt;
}
#endif // def RS_DEEP_CHANNEL_INDEX
}
delete msg;
}
}
return true;
}
bool RsGxsIntegrityCheck::isDone() bool RsGxsIntegrityCheck::isDone()
{ {
RS_STACK_MUTEX(mIntegrityMutex); RS_STACK_MUTEX(mIntegrityMutex);

View file

@ -175,11 +175,7 @@ public:
RsGenExchange *genex, RsSerialType& gxsSerialiser, RsGenExchange *genex, RsSerialType& gxsSerialiser,
RsGixs *gixs); RsGixs *gixs);
static bool check(uint16_t service_type, RsGixs *mgixs, RsGeneralDataService *mds static bool check(uint16_t service_type, RsGixs *mgixs, RsGeneralDataService *mds);
#ifdef RS_DEEP_CHANNEL_INDEX
, RsGenExchange* mGenExchangeClient, RsSerialType& mSerializer
#endif
, std::vector<RsGxsGroupId>& grpsToDel, GxsMsgReq& msgsToDel);
bool isDone(); bool isDone();
void run(); void run();
@ -190,6 +186,43 @@ private:
RsGeneralDataService* const mDs; RsGeneralDataService* const mDs;
RsGenExchange *mGenExchangeClient; RsGenExchange *mGenExchangeClient;
bool mDone;
RsMutex mIntegrityMutex;
std::vector<RsGxsGroupId> mDeletedGrps;
GxsMsgReq mDeletedMsgs;
RsGixs* mGixs;
};
/*!
* Checks the integrity message and groups
* in rsDataService using computed hash
*/
class RsGxsSinglePassIntegrityCheck
{
public:
/*!
*
* @param dataService
* @param mGroupTS
* @param chunkSize
* @param sleepPeriod
*/
RsGxsSinglePassIntegrityCheck( RsGeneralDataService* const dataService,
RsGenExchange *genex, RsSerialType& gxsSerialiser,
RsGixs *gixs);
static bool check(uint16_t service_type, RsGixs *mgixs, RsGeneralDataService *mds
#ifdef RS_DEEP_CHANNEL_INDEX
, RsGenExchange* mGenExchangeClient, RsSerialType& mSerializer
#endif
, std::vector<RsGxsGroupId>& grpsToDel, GxsMsgReq& msgsToDel);
private:
RsGeneralDataService *const mDs;
RsGenExchange *mGenExchangeClient;
#ifdef RS_DEEP_CHANNEL_INDEX #ifdef RS_DEEP_CHANNEL_INDEX
RsSerialType& mSerializer; RsSerialType& mSerializer;
#endif #endif