From 02b2c72f61fc02e07716c123d99c9c19b18f1738 Mon Sep 17 00:00:00 2001 From: drbob Date: Fri, 27 Jul 2012 12:45:52 +0000 Subject: [PATCH] * Added a bunch of unfinished code for Id reputation management (#ifdef'd out). * Disabled GXS services from compilation, or running as services. * moved retrodb.cc into newcache group, as it introduces new sqlite dependancy git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-new_cache_system@5333 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/libretroshare.pro | 12 +- libretroshare/src/rsserver/rsinit.cc | 6 + libretroshare/src/services/p3idservice.cc | 579 ++++++++++++++++++++++ 3 files changed, 591 insertions(+), 6 deletions(-) diff --git a/libretroshare/src/libretroshare.pro b/libretroshare/src/libretroshare.pro index c7d5a3342..709da784c 100644 --- a/libretroshare/src/libretroshare.pro +++ b/libretroshare/src/libretroshare.pro @@ -1,8 +1,8 @@ TEMPLATE = lib #CONFIG += staticlib release #CONFIG += staticlib testnetwork -CONFIG += staticlib bitdht debug newservices -#newcache +CONFIG += staticlib bitdht debug +#newcache newservices # CONFIG -= qt TARGET = retroshare @@ -482,7 +482,6 @@ HEADERS += util/folderiterator.h \ util/rswin.h \ util/rsrandom.h \ util/pugiconfig.h \ - util/retrodb.h SOURCES += dbase/cachestrapper.cc \ dbase/fimonitor.cc \ @@ -675,8 +674,8 @@ HEADERS += serialiser/rsnxsitems.h \ gxs/rstokenservice.h \ gxs/rsgxsdataaccess.h \ retroshare/rsgxsservice.h \ - serialiser/rsgxsitems.h - + serialiser/rsgxsitems.h \ + util/retrodb.h SOURCES += serialiser/rsnxsitems.cc \ gxs/rsdataservice.cc \ @@ -684,7 +683,8 @@ SOURCES += serialiser/rsnxsitems.cc \ gxs/rsgxsnetservice.cc \ gxs/rsgxsdata.cc \ services/p3photoserviceV2.cc \ - gxs/rsgxsdataaccess.cc + gxs/rsgxsdataaccess.cc \ + util/retrodb.cc } diff --git a/libretroshare/src/rsserver/rsinit.cc b/libretroshare/src/rsserver/rsinit.cc index f1ba13b8c..e345433be 100644 --- a/libretroshare/src/rsserver/rsinit.cc +++ b/libretroshare/src/rsserver/rsinit.cc @@ -1732,12 +1732,14 @@ RsTurtle *rsTurtle = NULL ; #include "services/p3blogs.h" #include "turtle/p3turtle.h" +#ifdef ENABLE_GXS_SERVICES #include "services/p3photoservice.h" #include "services/p3wikiservice.h" #include "services/p3wire.h" #include "services/p3idservice.h" #include "services/p3forumsv2.h" #include "services/p3posted.h" +#endif #ifndef PQI_DISABLE_TUNNEL #include "services/p3tunnel.h" @@ -2136,6 +2138,7 @@ int RsServer::StartupRetroShare() mPluginsManager->registerClientServices(pqih) ; mPluginsManager->registerCacheServices() ; +#ifdef ENABLE_GXS_SERVICES // Testing New Cache Services. p3PhotoService *mPhotos = new p3PhotoService(RS_SERVICE_TYPE_PHOTO); pqih -> addService(mPhotos); @@ -2159,6 +2162,7 @@ int RsServer::StartupRetroShare() // Testing New Cache Services. p3PostedService *mPosted = new p3PostedService(RS_SERVICE_TYPE_POSTED); pqih -> addService(mPosted); +#endif // ENABLE_GXS_SERVICES #ifndef RS_RELEASE p3GameLauncher *gameLauncher = new p3GameLauncher(mLinkMgr); @@ -2426,6 +2430,7 @@ int RsServer::StartupRetroShare() rsForums = mForums; rsChannels = mChannels; +#ifdef ENABLE_GXS_SERVICES // Testing of new cache system interfaces. rsIdentity = mIdentity; rsPhoto = mPhotos; @@ -2433,6 +2438,7 @@ int RsServer::StartupRetroShare() rsWire = mWire; rsForumsV2 = mForumsV2; rsPosted = mPosted; +#endif // ENABLE_GXS_SERVICES #ifdef RS_USE_BLOGS rsBlogs = mBlogs; diff --git a/libretroshare/src/services/p3idservice.cc b/libretroshare/src/services/p3idservice.cc index 69db3d537..32c7b8492 100644 --- a/libretroshare/src/services/p3idservice.cc +++ b/libretroshare/src/services/p3idservice.cc @@ -870,3 +870,582 @@ std::string rsIdTypeToString(uint32_t idtype) +/************************************************************************************/ +/************************************************************************************/ +/************************************************************************************/ +/************************************************************************************/ + + +/* here we are running a background process that calculates the reputation scores + * for each of the IDs.... + * + * As this class will be extensively used by many other threads... it is best + * that we don't block at all. This should be in a background thread. + * Perhaps a generic method to handle this will be advisable.... but we do that later. + * + * To start with we will work from the Posted service. + * + * + * + * So Reputation.... + * Three components: + * 1) Your Opinion: Should override everything else. + * 2) Implicit Factors: Know the associated GPG Key. + * 3) Your Friends Opinions: + * 4) Your Friends Calculated Reputation Scores. + * + * Must make sure that there is no Feedback loop in the Reputation calculation. + * + * So: Our Score + Friends Scores => Local Reputation. + * Local Reputation + Friends Reputations => Final Reputation? + * + * Do we need to 'ignore' Non-scores? + * ---> This becomes like the "Best Comment" algorithm from Reddit... + * Use a statistical mechanism to work out a lower bound on Reputation. + * + * But what if your opinion is wrong?.... well likely your friends will + * get their messages and reply... you'll see the missing message - request it - check reputation etc. + * + * So we are going to have three different scores (Own, Peers, (the neighbour) Hood)... + * + * So next question, when do we need to incrementally calculate the score? + * .... how often do we need to recalculate everything -> this could lead to a flux of messages. + */ + +/************************************************************************************/ +/* + * Processing Algorithm: + * - Grab all Groups which have received messages. + * (opt 1)-> grab latest msgs for each of these and process => score. + * (opt 2)-> try incremental system (people probably won't change opinions often -> just set them once) + * --> if not possible, fallback to full calculation. + * + * + */ + +#if 0 // DISABLED FOR MERGE + + +#define ID_BACKGROUND_PERIOD 60 + +int p3IdService::background_tick() +{ + std::cerr << "p3IdService::background_tick()"; + std::cerr << std::endl; + + // Run Background Stuff. + background_checkTokenRequest(); + + /* every minute - run a background check */ + time_t now = time(NULL); + bool doCheck = false; + { + RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/ + if (now - mLastBgCheck > ID_BACKGROUND_PERIOD) + { + doCheck = true; + mLastBgCheck = now; + } + } + + if (doCheck) + { + addExtraDummyData(); + background_requestGroups(); + } + + + + // Add in new votes + comments. + return 0; +} + + + + +/***** Background Processing **** + * + * Process Each Message - as it arrives. + * + * Update + * + */ + +#define ID_BG_REQUEST_GROUPS 1 +#define ID_BG_REQUEST_UNPROCESSED 2 +#define ID_BG_REQUEST_PARENTS 3 +#define ID_BG_PROCESS_VOTES 4 + +bool p3IdService::background_checkTokenRequest() +{ + uint32_t token = 0; + uint32_t phase = 0; + { + RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/ + if (!mBgProcessing) + { + return false; + } + + token = mBgToken; + phase = mBgPhase; + } + + + uint32_t status; + uint32_t reqtype; + uint32_t anstype; + time_t ts; + checkRequestStatus(token, status, reqtype, anstype, ts); + + if (status == GXS_REQUEST_STATUS_COMPLETE) + { + switch(phase) + { + case ID_BG_REQUEST_GROUPS: + background_requestNewMessages(); + break; + case ID_BG_REQUEST_UNPROCESSED: + background_processNewMessages(); + break; + case ID_BG_REQUEST_PARENTS: + background_updateVoteCounts(); + break; + default: + break; + } + } + return true; +} + + +bool p3IdService::background_requestGroups() +{ + std::cerr << "p3IdService::background_requestGroups()"; + std::cerr << std::endl; + + // grab all the subscribed groups. + uint32_t token = 0; + + { + RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/ + + if (mBgProcessing) + { + std::cerr << "p3IdService::background_requestGroups() ERROR Already processing, Skip this cycle"; + std::cerr << std::endl; + return false; + } + + mBgProcessing = true; + mBgPhase = ID_BG_REQUEST_GROUPS; + mBgToken = 0; + } + + uint32_t ansType = RS_TOKREQ_ANSTYPE_SUMMARY; + RsTokReqOptions opts; + std::list groupIds; + + opts.mStatusFilter = RSGXS_GROUP_STATUS_NEWMSG; + opts.mStatusMask = RSGXS_GROUP_STATUS_NEWMSG; + + requestGroupInfo(token, ansType, opts, groupIds); + { + RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/ + mBgToken = token; + } + + return true; +} + + + +bool p3IdService::background_requestNewMessages() +{ + std::cerr << "p3IdService::background_requestNewMessages()"; + std::cerr << std::endl; + + std::list modGroupList; + std::list groupIds; + uint32_t token = 0; + + { + RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/ + token = mBgToken; + } + + if (!getGroupSummary(token, modGroupList)) + { + std::cerr << "p3IdService::background_requestNewMessages() ERROR No Group List"; + std::cerr << std::endl; + background_cleanup(); + return false; + } + + { + RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/ + mBgPhase = ID_BG_REQUEST_UNPROCESSED; + mBgToken = 0; + + /* now we process the modGroupList -> a map so we can use it easily later, and create id list too */ + for(it = modGroupList.begin(); it != modGroupList.end(); it++) + { + setGroupStatus(it->mMsgId, 0, RSGXS_GROUP_STATUS_NEWMSG); + + mBgGroupMap[it->mGroupId] = *it; + groupIds.push_back(*it); + } + } + + uint32_t ansType = RS_TOKREQ_ANSTYPE_SUMMARY; + RsTokReqOptions opts; + token = 0; + + opts.mStatusFilter = RSGXS_MSG_STATUS_UNPROCESSED; + opts.mStatusMask = RSGXS_MSG_STATUS_UNPROCESSED; + + requestMsgInfo(token, ansType, opts, groupIds); + + { + RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/ + mBgToken = token; + } + return true; +} + + +bool p3IdService::background_processNewMessages() +{ + std::cerr << "p3IdService::background_processNewMessages()"; + std::cerr << std::endl; + + std::list newMsgList; + std::list::iterator it; + uint32_t token = 0; + + { + RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/ + token = mBgToken; + } + + if (!getMsgSummary(token, newMsgList)) + { + std::cerr << "p3IdService::background_processNewMessages() ERROR No New Msgs"; + std::cerr << std::endl; + background_cleanup(); + return false; + } + + + /* iterate through the msgs.. update the mBgGroupMap with new data, + * and flag these items as modified - so we rewrite them to the db later. + * + * If a message is not an original -> store groupId for requiring full analysis later. + */ + + for(it = newMsgList.begin(); it != newMsgList.end(); it++) + { + /* flag each new vote as processed */ + setMessageStatus(it->mMsgId, 0, RSGXS_MSG_STATUS_UNPROCESSED); + + RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/ + + mit = mBgGroupMap.find(it->mGroupId); + if (mit == mBgGroupMap.end()) + { + /* error */ + continue; + } + + if (mit->mStatusFlags & FULL_CALC_FLAG) + { + /* already marked */ + continue; + } + + if (it->mMsgId != it->mOrigMsgId) + { + /* + * not original -> hard, redo calc (alt: could substract previous score) + */ + mit->mStatusFlags |= FULL_CALC_FLAG; + } + else + { + /* + * Try incremental calculation. + * - extract parameters from group. + * - increment, & save back. + * - flag group as modified. + */ + + mit->mStatusFlags |= INCREMENTAL_CALC_FLAG; + + if (!extractIdGroupCache(const std::string &str, uint32_t &votes, uint32_t &comments)) + { + /* error */ + } + + /* do calcs */ + + /* store it back in */ + + if (!encodeIdGroupCache(std::string &str, uint32_t votes, uint32_t comments)) + { + /* error */ + } + + } + } + + + /* now iterate through groups again + * -> update status as we go + * -> record one requiring a full analyssis + */ + + { + RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/ + + for(mit = mBgGroupMap.begin(); mit != mBgGroupMap.end(); mit++) + { + if (mit->mStatusFlags & FULL_CALC_FLAG) + { + mBgFullCalcGroups.push_back(mit->mGroupId); + } + else if (mit->mStatusFlags & INCREMENTAL_CALC_FLAG) + { + /* set Cache */ + setGroupServiceString(mit->mGroupId, mit->ServiceString); + } + else + { + /* why is it here? error. */ + + } + } + } + + return backgroundFullCalcRequest(); +} + +class RepCumulScore +{ + public: + uint32_t count; + uint32_t nullcount; + double sum; + double sumsq; + + // derived parameters: + + +}; + +bool p3IdService::encodeIdGroupCache(std::string &str, uint32_t ownScore, RepCumulScore &opinion, RepCumulScore &rep) +{ + char line[RSGXS_MAX_SERVICE_STRING]; + + snprintf(line, RSGXS_MAX_SERVICE_STRING, "v1 Y:%d O:%d %d %f %f R:%d %d %f %f", ownScore, + opinion.count, opinion.nullcount, opinion.sum, opinion.sumsq, + rep.count, rep.nullcount, rep.sum, rep.sumsq); + + str = line; + return true; +} + + +bool p3IdService::extractIdGroupCache(std::string &str, uint32_t &ownScore, RepCumulScore &opinion, RepCumulScore &rep) +{ + uint32_t iOwnScore; + RepCumulScore iOpin; + RepCumulScore iRep; + + if (9 == sscanf(str.c_str(), "v1 Y:%d O:%d %d %f %f R:%d %d %f %f", &iOwnScore, + &(iOpin.count), &(iOpin.nullcount), &(iOpin.sum), &(iOpin.sumsq), + &(iRep.count), &(iRep.nullcount), &(iRep.sum), &(iRep.sumsq))); + { + ownScore = iOwnScore; + opinion = iOpin; + rep = iRep; + return true; + } + + return false; +} + + + +bool p3IdService::background_FullCalcRequest() +{ + /* + * grab an GroupId from List. + * - If empty, we are finished. + * - request all latest mesgs + */ + + std::list groupIds; + { + RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/ + mBgPhase = ID_BG_FULLCALC_REQUEST; + mBgToken = 0; + mBgGroupMap.clear(); + + if (mBgFullCalcGroups.empty()) + { + /* finished! */ + return; + + } + + groupIds.push_back(mBgFullCalcGroups.front()); + mBgFullCalcGroups.pop_front(); + + } + + /* request the summary info from the parents */ + uint32_t ansType = RS_TOKREQ_ANSTYPE_DATA; + token = 0; + RsTokReqOptions opts; + opt.mOptions = RS_TOKREQOPT_MSG_LATEST; + + requestMsgInfo(token, ansType, opts, groupIds); + + { + RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/ + mBgToken = token; + } + return true; +} + + + + +bool p3IdService::background_processFullCalc() +{ + std::cerr << "p3IdService::background_processFullCalc()"; + std::cerr << std::endl; + + std::list msgList; + std::list::iterator it; + + RsIdOpinion msg; + + bool validmsgs = false; + + /* calc variables */ + uint32_t opinion_count = 0; + uint32_t opinion_nullcount = 0; + double opinion_sum = 0; + double opinion_sumsq = 0; + + uint32_t rep_count = 0; + uint32_t rep_nullcount = 0; + double rep_sum = 0; + double rep_sumsq = 0; + + while(getMsgData(mBgToken, msg)) + { + std::cerr << "p3IdService::background_processFullCalc() Msg:"; + std::cerr << msg; + std::cerr << std::endl; + + validmsgs = true; + + /* for each msg ... extract score, and reputation */ + if (msg.mOpinion != 0) + { + opinion_count++; + opinion_sum += msg.mOpinion; + opinion_sum += (msg.mOpinion * msg.mOpinion); + } + else + { + opinion_nullcount++; + } + + + /* for each msg ... extract score, and reputation */ + if (msg.mReputation != 0) + { + rep_nullcount++; + rep_sum += msg.mReputation; + rep_sum += (msg.mReputation * msg.mReputation); + } + else + { + rep_nullcount++; + } + } + + double opinion_avg = 0; + double opinion_var = 0; + double opinion_frac = 0; + + double rep_avg = 0; + double rep_var = 0; + double rep_frac = 0; + + + if (opinion_count) + { + opinion_avg = opinion_sum / opinion_count; + opinion_var = (opinion_sumsq - opinion_count * opinion_avg * opinion_avg) / opinion_count; + opinion_frac = opinion_count / ((float) (opinion_count + opinion_nullcount)); + } + + if (rep_count) + { + rep_avg = rep_sum / rep_count; + rep_var = (rep_sumsq - rep_count * rep_avg * rep_avg) / rep_count; + rep_frac = rep_count / ((float) (rep_count + rep_nullcount)); + } + + + if (validmsgs) + { + std::string groupId = msg.mMeta.mGroupId; + + std::string str; + if (!encodePostedCache(str, votes, comments)) + { + std::cerr << "p3IdService::background_updateVoteCounts() Failed to encode Votes"; + std::cerr << std::endl; + } + else + { + std::cerr << "p3IdService::background_updateVoteCounts() Encoded String: " << str; + std::cerr << std::endl; + /* store new result */ + setMessageServiceString(it->mMsgId, str); + } + } + + { + RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/ + mBgPhase = ID_BG_PROCESS_VOTES; + mBgToken = 0; + } + + return background_FullCalcRequest(); +} + + +bool p3IdService::background_cleanup() +{ + std::cerr << "p3IdService::background_cleanup()"; + std::cerr << std::endl; + + RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/ + + // Cleanup. + mBgVoteMap.clear(); + mBgCommentMap.clear(); + mBgProcessing = false; + mBgToken = 0; + + return true; +} + + +#endif // DISABLED FOR MERGE.