WIP Index GXS channels with xapian

Use temporary DB ATM
This commit is contained in:
Gioacchino Mazzurco 2018-06-09 18:06:14 +02:00
parent ce61174d79
commit c0e92ddc6b
No known key found for this signature in database
GPG Key ID: A1FBCA3872E87051
6 changed files with 247 additions and 62 deletions

View File

@ -0,0 +1,106 @@
#pragma once
/*
* RetroShare Content Search and Indexing.
* Copyright (C) 2018 Gioacchino Mazzurco <gio@eigenlab.org>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <xapian.h>
#include "retroshare/rsgxschannels.h"
struct DeepSearch
{
//DeepSearch(const std::string& dbPath) : mDbPath(dbPath) {}
static void search(/*query*/) { /*return all matching results*/ }
static void indexChannelGroup(const RsGxsChannelGroup& chan)
{
Xapian::WritableDatabase db(mDbPath, Xapian::DB_CREATE_OR_OPEN);
// Set up a TermGenerator that we'll use in indexing.
Xapian::TermGenerator termgenerator;
//termgenerator.set_stemmer(Xapian::Stem("en"));
// We make a document and tell the term generator to use this.
Xapian::Document doc;
termgenerator.set_document(doc);
// Index each field with a suitable prefix.
termgenerator.index_text(chan.mMeta.mGroupName, 1, "G");
termgenerator.index_text(chan.mDescription, 1, "XD");
// Index fields without prefixes for general search.
termgenerator.index_text(chan.mMeta.mGroupName);
termgenerator.increase_termpos();
termgenerator.index_text(chan.mDescription);
// We use the identifier to ensure each object ends up in the
// database only once no matter how many times we run the
// indexer.
std::string idTerm("Qretroshare://channel?id=");
idTerm += chan.mMeta.mGroupId.toStdString();
doc.add_boolean_term(idTerm);
db.replace_document(idTerm, doc);
}
static void removeChannelFromIndex(RsGxsGroupId grpId)
{
std::string idTerm("Qretroshare://channel?id=");
idTerm += grpId.toStdString();
Xapian::WritableDatabase db(mDbPath, Xapian::DB_CREATE_OR_OPEN);
db.delete_document(idTerm);
}
static void indexChannelPost(const RsGxsChannelPost& post)
{
Xapian::WritableDatabase db(mDbPath, Xapian::DB_CREATE_OR_OPEN);
// Set up a TermGenerator that we'll use in indexing.
Xapian::TermGenerator termgenerator;
//termgenerator.set_stemmer(Xapian::Stem("en"));
// We make a document and tell the term generator to use this.
Xapian::Document doc;
termgenerator.set_document(doc);
// Index each field with a suitable prefix.
termgenerator.index_text(post.mMeta.mMsgName, 1, "S");
termgenerator.index_text(post.mMsg, 1, "XD");
// Index fields without prefixes for general search.
termgenerator.index_text(post.mMeta.mMsgName);
termgenerator.increase_termpos();
termgenerator.index_text(post.mMsg);
// We use the identifier to ensure each object ends up in the
// database only once no matter how many times we run the
// indexer.
std::string idTerm("Qretroshare://channel?id=");
idTerm += post.mMeta.mGroupId.toStdString();
idTerm += "&msgid=";
idTerm += post.mMeta.mMsgId.toStdString();
doc.add_boolean_term(idTerm);
db.replace_document(idTerm, doc);
}
static std::string mDbPath;
};
std::string DeepSearch::mDbPath = "/tmp/deep_search_xapian_db";

View File

@ -31,6 +31,12 @@
#include "pqi/pqihash.h"
#include "gxs/rsgixs.h"
#ifdef RS_DEEP_SEARCH
# include "deep_search/deep_search.h"
# include "services/p3gxschannels.h"
# include "rsitems/rsgxschannelitems.h"
#endif
static const uint32_t MAX_GXS_IDS_REQUESTS_NET = 10 ; // max number of requests from cache/net (avoids killing the system!)
//#define DEBUG_GXSUTIL 1
@ -141,20 +147,28 @@ bool RsGxsMessageCleanUp::clean()
return mGrpMeta.empty();
}
RsGxsIntegrityCheck::RsGxsIntegrityCheck(RsGeneralDataService* const dataService, RsGenExchange *genex, RsGixs *gixs) :
mDs(dataService),mGenExchangeClient(genex), mDone(false), mIntegrityMutex("integrity"),mGixs(gixs)
{ }
RsGxsIntegrityCheck::RsGxsIntegrityCheck(
RsGeneralDataService* const dataService, RsGenExchange* genex,
RsGixs* gixs ) :
mDs(dataService), mGenExchangeClient(genex), mDone(false),
mIntegrityMutex("integrity"), mGixs(gixs) {}
void RsGxsIntegrityCheck::run()
{
check();
RsStackMutex stack(mIntegrityMutex);
mDone = true;
RS_STACK_MUTEX(mIntegrityMutex);
mDone = true;
}
bool RsGxsIntegrityCheck::check()
{
#ifdef RS_DEEP_SEARCH
bool isGxsChannels = dynamic_cast<p3GxsChannels*>(mGenExchangeClient);
std::cout << __PRETTY_FUNCTION__ << " isGxsChannels: " << isGxsChannels
<< std::endl;
#endif
// first take out all the groups
std::map<RsGxsGroupId, RsNxsGrp*> grp;
mDs->retrieveNxsGrps(grp, true, true);
@ -166,67 +180,113 @@ bool RsGxsIntegrityCheck::check()
std::set<RsGxsGroupId> subscribed_groups ;
// compute hash and compare to stored value, if it fails then simply add it
// to list
std::map<RsGxsGroupId, RsNxsGrp*>::iterator git = grp.begin();
for(; git != grp.end(); ++git)
{
RsNxsGrp* grp = git->second;
RsFileHash currHash;
pqihash pHash;
pHash.addData(grp->grp.bin_data, grp->grp.bin_len);
pHash.Complete(currHash);
// to list
for( std::map<RsGxsGroupId, RsNxsGrp*>::iterator git = grp.begin();
git != grp.end(); ++git )
{
RsNxsGrp* grp = git->second;
RsFileHash currHash;
pqihash pHash;
pHash.addData(grp->grp.bin_data, grp->grp.bin_len);
pHash.Complete(currHash);
if(currHash == grp->metaData->mHash)
{
// get all message ids of group
if (mDs->retrieveMsgIds(grp->grpId, msgIds[grp->grpId]) == 1)
{
// store the group for retrieveNxsMsgs
grps[grp->grpId];
if(currHash == grp->metaData->mHash)
{
// get all message ids of group
if (mDs->retrieveMsgIds(grp->grpId, msgIds[grp->grpId]) == 1)
{
// store the group for retrieveNxsMsgs
grps[grp->grpId];
if(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED)
{
subscribed_groups.insert(git->first) ;
if(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED)
{
subscribed_groups.insert(git->first);
if(!grp->metaData->mAuthorId.isNull())
{
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << "TimeStamping group authors' key ID " << grp->metaData->mAuthorId << " in group ID " << grp->grpId << std::endl;
#ifdef RS_DEEP_SEARCH
if(isGxsChannels)
{
RsGxsChannelGroup cg;
RsGxsGrpMetaData meta;
meta.deserialise(grp->meta.bin_data, grp->meta.bin_len);
/* TODO: Apparently a copy of the pointer to
* grp.bin_data is stored into grp.bin_data thus
* breaking the deserialization, skipping the pointer
* (8 bytes on x86_64 debug build) fix the
* deserilization, talk to Cyril how to properly fix
* this.*/
RsGenericSerializer::SerializeContext ctx(
static_cast<uint8_t*>(grp->grp.bin_data)+8,
grp->grp.bin_len-8 );
RsGxsChannelGroupItem cgIt;
cgIt.serial_process( RsGenericSerializer::DESERIALIZE,
ctx );
if(ctx.mOk)
{
cgIt.toChannelGroup(cg, false);
cg.mMeta = meta;
DeepSearch::indexChannelGroup(cg);
std::cout << __PRETTY_FUNCTION__ << " ||Channel: "
<< meta.mGroupName << " ||Description: "
<< cg.mDescription << std::endl;
}
else
std::cout << __PRETTY_FUNCTION__ << " ||Group: "
<< meta.mGroupName
<< " ||doesn't seems a channel"
<< " ||ctx.mOk: " << ctx.mOk
<< " ||ctx.mData: " << (void*)ctx.mData
<< " ||ctx.mSize: " << ctx.mSize
<< " ||grp->grp.bin_data: " << grp->grp.bin_data
<< " ||grp->grp.bin_len: " << grp->grp.bin_len
<< std::endl;
}
#endif
if(rsReputations!=NULL && rsReputations->overallReputationLevel(grp->metaData->mAuthorId) > RsReputations::REPUTATION_LOCALLY_NEGATIVE)
used_gxs_ids.insert(std::make_pair(grp->metaData->mAuthorId,RsIdentityUsage(mGenExchangeClient->serviceType(),RsIdentityUsage::GROUP_AUTHOR_KEEP_ALIVE,grp->grpId))) ;
}
}
}
else
{
msgIds.erase(msgIds.find(grp->grpId));
// grpsToDel.push_back(grp->grpId);
}
}
else
{
grpsToDel.push_back(grp->grpId);
}
if(!(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED) && !(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN) && !(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_PUBLISH))
{
RsGroupNetworkStats stats ;
mGenExchangeClient->getGroupNetworkStats(grp->grpId,stats);
if(stats.mSuppliers == 0 && stats.mMaxVisibleCount == 0 && stats.mGrpAutoSync)
{
if(!grp->metaData->mAuthorId.isNull())
{
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << "Scheduling group \"" << grp->metaData->mGroupName << "\" ID=" << grp->grpId << " in service " << std::hex << mGenExchangeClient->serviceType() << std::dec << " for deletion because it has no suppliers not any visible data at friends." << std::endl;
GXSUTIL_DEBUG() << "TimeStamping group authors' key ID " << grp->metaData->mAuthorId << " in group ID " << grp->grpId << std::endl;
#endif
if( rsReputations && rsReputations->overallReputationLevel(grp->metaData->mAuthorId ) > RsReputations::REPUTATION_LOCALLY_NEGATIVE )
used_gxs_ids.insert(std::make_pair(grp->metaData->mAuthorId, RsIdentityUsage(mGenExchangeClient->serviceType(), RsIdentityUsage::GROUP_AUTHOR_KEEP_ALIVE,grp->grpId)));
}
}
}
else msgIds.erase(msgIds.find(grp->grpId));
}
else
{
grpsToDel.push_back(grp->grpId);
#ifdef RS_DEEP_SEARCH
if(isGxsChannels) DeepSearch::removeChannelFromIndex(grp->grpId);
#endif
}
if( !(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED) &&
!(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN) &&
!(grp->metaData->mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_PUBLISH) )
{
RsGroupNetworkStats stats;
mGenExchangeClient->getGroupNetworkStats(grp->grpId,stats);
if( stats.mSuppliers == 0 && stats.mMaxVisibleCount == 0
&& stats.mGrpAutoSync )
{
#ifdef DEBUG_GXSUTIL
GXSUTIL_DEBUG() << "Scheduling group \"" << grp->metaData->mGroupName << "\" ID=" << grp->grpId << " in service " << std::hex << mGenExchangeClient->serviceType() << std::dec << " for deletion because it has no suppliers not any visible data at friends." << std::endl;
#endif
grpsToDel.push_back(grp->grpId);
}
}
}
}
delete grp;
}
delete grp;
}
mDs->removeGroups(grpsToDel);
@ -299,6 +359,10 @@ bool RsGxsIntegrityCheck::check()
}
}
#ifdef RS_DEEP_SEARCH
// TODO:remove msgsToDel from deep search index too
#endif
mDs->removeMsgs(msgsToDel);
{
@ -373,14 +437,13 @@ bool RsGxsIntegrityCheck::check()
bool RsGxsIntegrityCheck::isDone()
{
RsStackMutex stack(mIntegrityMutex);
RS_STACK_MUTEX(mIntegrityMutex);
return mDone;
}
void RsGxsIntegrityCheck::getDeletedIds(std::list<RsGxsGroupId>& grpIds, std::map<RsGxsGroupId, std::set<RsGxsMessageId> >& msgIds)
{
RsStackMutex stack(mIntegrityMutex);
RS_STACK_MUTEX(mIntegrityMutex);
grpIds = mDeletedGrps;
msgIds = mDeletedMsgs;
}

View File

@ -846,7 +846,9 @@ rs_gxs_trans {
SOURCES += gxstrans/p3gxstransitems.cc gxstrans/p3gxstrans.cc
}
rs_deep_search {
HEADERS += deep_search/deep_search.h
}
###########################################################################################################

View File

@ -293,7 +293,8 @@ public:
virtual void clear();
virtual void serial_process(RsGenericSerializer::SerializeJob j,RsGenericSerializer::SerializeContext& ctx);
virtual void serial_process( RsGenericSerializer::SerializeJob j,
RsGenericSerializer::SerializeContext& ctx );
RsGxsGroupId grpId; /// group Id, needed to complete version Id (ncvi)
static int refcount;

View File

@ -26,6 +26,10 @@ linux-* {
mLibs += dl
}
rs_deep_search {
mLibs += xapian
}
static {
sLibs *= $$mLibs
} else {

View File

@ -115,6 +115,11 @@ rs_macos10.9:CONFIG -= rs_macos10.11
rs_macos10.10:CONFIG -= rs_macos10.11
rs_macos10.12:CONFIG -= rs_macos10.11
# To disable deep search append the following assignation to qmake command line
# "CONFIG+=no_rs_deep_search"
CONFIG *= rs_deep_search
no_rs_deep_search:CONFIG -= rs_deep_search
###########################################################################################################################################################
#
# V07_NON_BACKWARD_COMPATIBLE_CHANGE_001:
@ -313,6 +318,10 @@ rs_chatserver {
DEFINES *= RS_CHATSERVER
}
rs_deep_search {
DEFINES *= RS_DEEP_SEARCH
}
debug {
QMAKE_CXXFLAGS -= -O2 -fomit-frame-pointer
QMAKE_CFLAGS -= -O2 -fomit-frame-pointer