Improved the Id Reputation System to the point of compilation.

-> Its not tested, or running with the checkin.
 * Added NEWMSG Flag to groups in test backend.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@5348 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2012-07-29 13:09:36 +00:00
parent ceb4298208
commit 6a24bdc62c
4 changed files with 271 additions and 88 deletions

View File

@ -186,6 +186,7 @@ class RsTokReqOptions
#define RSGXS_GROUP_STATUS_MASK 0x0000000f
#define RSGXS_GROUP_STATUS_UPDATED 0x00000001
#define RSGXS_GROUP_STATUS_NEWGROUP 0x00000002
#define RSGXS_GROUP_STATUS_NEWMSG 0x00000004
#define RSGXS_GROUP_STATUS_SERVICE_MASK 0xffff0000
@ -384,6 +385,8 @@ class RsIdGroup
std::string mGpgIdHash; // SHA(KeyId + Gpg Fingerprint) -> can only be IDed if GPG known.
// NOTE: These cannot be transmitted as part of underlying messages....
// Must use ServiceString.
bool mGpgIdKnown; // if GpgIdHash has been identified.
std::string mGpgId; // if known.
std::string mGpgName; // if known.
@ -402,13 +405,19 @@ class RsIdMsg
//std::string mKeyId; (mGroupId)
//std::string mPeerId; (mAuthorId) ???
int mRating;
int mPeersRating;
std::string mComment;
int mOpinion;
double mReputation;
//int mRating;
//int mPeersRating;
//std::string mComment;
};
std::ostream &operator<<(std::ostream &out, const RsIdGroup &meta);
std::ostream &operator<<(std::ostream &out, const RsIdMsg &meta);
#if 0
class RsIdReputation
@ -465,9 +474,30 @@ virtual bool createMsg(uint32_t &token, RsIdMsg &msg, bool isNew) = 0;
* Below is the additional interface to look at reputation.
*/
/* So we will want to cache much of the identity stuff, so that we have quick access to the results.
* The following bits of data will not use the request/response interface, and should be available immediately.
*
* ID => Nickname, knownGPG, reputation.
*
* This will require quite a bit of data...
* 20 Bytes + 50 + 1 + 4 Bytes? (< 100 Bytes).
* x 10,000 IDs. => ~1 MB of cache (Good).
* x 100,000 IDs. => ~10 MB of cache (Good).
* x 1,000,000 IDs. => ~100 MB of cache (Too Big).
*
* We also need to store quick access to your OwnIds.
*/
//virtual uint32_t getIdDetails(const std::string &id, std::string &nickname, bool &isGpgKnown,
uint32_t &ownOpinion, float &reputation);
//virtual uint32_t getOwnIds(std::list<std::string> &ownIds);
//virtual bool setOpinion(const std::string &id, uint32_t opinion);
virtual void generateDummyData() = 0;
#if 0
/* Data Requests */
virtual bool requestIdentityList(uint32_t &token) = 0;
virtual bool requestIdentities(uint32_t &token, const std::list<std::string> &ids) = 0;

View File

@ -891,7 +891,7 @@ bool GxsDataProxy::createMsg(void *msgData)
/* find the group */
std::map<std::string, RsGroupMetaData>::iterator git;
std::map<std::string, RsGroupMetaData>::iterator git;
git = mGroupMetaData.find(meta.mGroupId);
if (git == mGroupMetaData.end())
{
@ -901,7 +901,7 @@ bool GxsDataProxy::createMsg(void *msgData)
}
/* flag the group as changed */
git->second.mGroupStatus |= RSGXS_GROUP_STATUS_UPDATED;
git->second.mGroupStatus |= (RSGXS_GROUP_STATUS_UPDATED | RSGXS_GROUP_STATUS_NEWMSG);
/* Set the Msg Status Flags */
meta.mMsgStatus |= (RSGXS_MSG_STATUS_UNREAD_BY_USER | RSGXS_MSG_STATUS_UNPROCESSED);
@ -909,8 +909,8 @@ bool GxsDataProxy::createMsg(void *msgData)
/* Set the Msg->GroupId Status Flags */
/* push into maps */
mMsgData[meta.mMsgId] = msgData;
mMsgMetaData[meta.mMsgId] = meta;
mMsgData[meta.mMsgId] = msgData;
mMsgMetaData[meta.mMsgId] = meta;
return true;
}

View File

@ -61,7 +61,9 @@ int p3IdService::tick()
std::cerr << std::endl;
fakeprocessrequests();
// Disable for now.
// background_tick();
return 0;
}
@ -910,6 +912,16 @@ std::string rsIdTypeToString(uint32_t idtype)
*
* So next question, when do we need to incrementally calculate the score?
* .... how often do we need to recalculate everything -> this could lead to a flux of messages.
*
*
*
* MORE NOTES:
*
* The Opinion Messages will have to be signed by PGP or SSL Keys, to guarantee that we don't
* multiple votes per person... As the message system doesn't handle uniqueness in this respect,
* we might have to do FULL_CALC for everything - This bit TODO.
*
* This will make IdService quite different to the other GXS services.
*/
/************************************************************************************/
@ -923,8 +935,6 @@ std::string rsIdTypeToString(uint32_t idtype)
*
*/
#if 0 // DISABLED FOR MERGE
#define ID_BACKGROUND_PERIOD 60
@ -940,7 +950,7 @@ int p3IdService::background_tick()
time_t now = time(NULL);
bool doCheck = false;
{
RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
if (now - mLastBgCheck > ID_BACKGROUND_PERIOD)
{
doCheck = true;
@ -950,7 +960,7 @@ int p3IdService::background_tick()
if (doCheck)
{
addExtraDummyData();
//addExtraDummyData();
background_requestGroups();
}
@ -970,18 +980,17 @@ int p3IdService::background_tick()
* Update
*
*/
#define ID_BG_REQUEST_GROUPS 1
#define ID_BG_IDLE 0
#define ID_BG_REQUEST_GROUPS 1
#define ID_BG_REQUEST_UNPROCESSED 2
#define ID_BG_REQUEST_PARENTS 3
#define ID_BG_PROCESS_VOTES 4
#define ID_BG_REQUEST_FULLCALC 3
bool p3IdService::background_checkTokenRequest()
{
uint32_t token = 0;
uint32_t phase = 0;
{
RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
if (!mBgProcessing)
{
return false;
@ -1008,8 +1017,8 @@ bool p3IdService::background_checkTokenRequest()
case ID_BG_REQUEST_UNPROCESSED:
background_processNewMessages();
break;
case ID_BG_REQUEST_PARENTS:
background_updateVoteCounts();
case ID_BG_REQUEST_FULLCALC:
background_processFullCalc();
break;
default:
break;
@ -1028,7 +1037,7 @@ bool p3IdService::background_requestGroups()
uint32_t token = 0;
{
RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
if (mBgProcessing)
{
@ -1051,7 +1060,7 @@ bool p3IdService::background_requestGroups()
requestGroupInfo(token, ansType, opts, groupIds);
{
RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgToken = token;
}
@ -1065,12 +1074,14 @@ bool p3IdService::background_requestNewMessages()
std::cerr << "p3IdService::background_requestNewMessages()";
std::cerr << std::endl;
std::list<RsMsgMetaData> modGroupList;
std::list<RsGroupMetaData> modGroupList;
std::list<RsGroupMetaData>::iterator it;
std::list<std::string> groupIds;
uint32_t token = 0;
{
RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
token = mBgToken;
}
@ -1083,17 +1094,17 @@ bool p3IdService::background_requestNewMessages()
}
{
RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgPhase = ID_BG_REQUEST_UNPROCESSED;
mBgToken = 0;
/* now we process the modGroupList -> a map so we can use it easily later, and create id list too */
for(it = modGroupList.begin(); it != modGroupList.end(); it++)
{
setGroupStatus(it->mMsgId, 0, RSGXS_GROUP_STATUS_NEWMSG);
setGroupStatus(it->mGroupId, 0, RSGXS_GROUP_STATUS_NEWMSG);
mBgGroupMap[it->mGroupId] = *it;
groupIds.push_back(*it);
groupIds.push_back(it->mGroupId);
}
}
@ -1107,7 +1118,7 @@ bool p3IdService::background_requestNewMessages()
requestMsgInfo(token, ansType, opts, groupIds);
{
RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgToken = token;
}
return true;
@ -1124,7 +1135,7 @@ bool p3IdService::background_processNewMessages()
uint32_t token = 0;
{
RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
token = mBgToken;
}
@ -1143,22 +1154,33 @@ bool p3IdService::background_processNewMessages()
* If a message is not an original -> store groupId for requiring full analysis later.
*/
std::map<std::string, RsGroupMetaData>::iterator mit;
for(it = newMsgList.begin(); it != newMsgList.end(); it++)
{
std::cerr << "p3IdService::background_processNewMessages() new MsgId: " << it->mMsgId;
std::cerr << std::endl;
/* flag each new vote as processed */
setMessageStatus(it->mMsgId, 0, RSGXS_MSG_STATUS_UNPROCESSED);
RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mit = mBgGroupMap.find(it->mGroupId);
if (mit == mBgGroupMap.end())
{
std::cerr << "p3IdService::background_processNewMessages() ERROR missing GroupId: ";
std::cerr << it->mGroupId;
std::cerr << std::endl;
/* error */
continue;
}
if (mit->mStatusFlags & FULL_CALC_FLAG)
if (mit->second.mGroupStatus & ID_LOCAL_STATUS_FULL_CALC_FLAG)
{
std::cerr << "p3IdService::background_processNewMessages() Group Already marked FULL_CALC";
std::cerr << std::endl;
/* already marked */
continue;
}
@ -1168,7 +1190,11 @@ bool p3IdService::background_processNewMessages()
/*
* not original -> hard, redo calc (alt: could substract previous score)
*/
mit->mStatusFlags |= FULL_CALC_FLAG;
std::cerr << "p3IdService::background_processNewMessages() Update, mark for FULL_CALC";
std::cerr << std::endl;
mit->second.mGroupStatus |= ID_LOCAL_STATUS_FULL_CALC_FLAG;
}
else
{
@ -1179,22 +1205,35 @@ bool p3IdService::background_processNewMessages()
* - flag group as modified.
*/
mit->mStatusFlags |= INCREMENTAL_CALC_FLAG;
std::cerr << "p3IdService::background_processNewMessages() NewOpt, Try Inc Calc";
std::cerr << std::endl;
if (!extractIdGroupCache(const std::string &str, uint32_t &votes, uint32_t &comments))
mit->second.mGroupStatus |= ID_LOCAL_STATUS_INC_CALC_FLAG;
std::string serviceString;
IdGroupServiceStrData ssData;
if (!extractIdGroupCache(serviceString, ssData))
{
/* error */
std::cerr << "p3IdService::background_processNewMessages() ERROR Extracting";
std::cerr << std::endl;
}
/* do calcs */
std::cerr << "p3IdService::background_processNewMessages() Extracted: ";
std::cerr << std::endl;
/* store it back in */
std::cerr << "p3IdService::background_processNewMessages() Stored: ";
std::cerr << std::endl;
if (!encodeIdGroupCache(std::string &str, uint32_t votes, uint32_t comments))
if (!encodeIdGroupCache(serviceString, ssData))
{
/* error */
std::cerr << "p3IdService::background_processNewMessages() ERROR Storing";
std::cerr << std::endl;
}
}
}
@ -1205,72 +1244,109 @@ bool p3IdService::background_processNewMessages()
*/
{
RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
std::cerr << "p3IdService::background_processNewMessages() Checking Groups for Calc Type";
std::cerr << std::endl;
for(mit = mBgGroupMap.begin(); mit != mBgGroupMap.end(); mit++)
{
if (mit->mStatusFlags & FULL_CALC_FLAG)
if (mit->second.mGroupStatus & ID_LOCAL_STATUS_FULL_CALC_FLAG)
{
mBgFullCalcGroups.push_back(mit->mGroupId);
std::cerr << "p3IdService::background_processNewMessages() FullCalc for: ";
std::cerr << mit->second.mGroupId;
std::cerr << std::endl;
mBgFullCalcGroups.push_back(mit->second.mGroupId);
}
else if (mit->mStatusFlags & INCREMENTAL_CALC_FLAG)
else if (mit->second.mGroupStatus & ID_LOCAL_STATUS_INC_CALC_FLAG)
{
std::cerr << "p3IdService::background_processNewMessages() IncCalc done for: ";
std::cerr << mit->second.mGroupId;
std::cerr << std::endl;
/* set Cache */
setGroupServiceString(mit->mGroupId, mit->ServiceString);
setGroupServiceString(mit->second.mGroupId, mit->second.mServiceString);
}
else
{
/* why is it here? error. */
std::cerr << "p3IdService::background_processNewMessages() ERROR for: ";
std::cerr << mit->second.mGroupId;
std::cerr << std::endl;
}
}
}
return backgroundFullCalcRequest();
return background_FullCalcRequest();
}
class RepCumulScore
{
public:
uint32_t count;
uint32_t nullcount;
double sum;
double sumsq;
// derived parameters:
};
bool p3IdService::encodeIdGroupCache(std::string &str, uint32_t ownScore, RepCumulScore &opinion, RepCumulScore &rep)
bool p3IdService::encodeIdGroupCache(std::string &str, const IdGroupServiceStrData &data)
{
char line[RSGXS_MAX_SERVICE_STRING];
snprintf(line, RSGXS_MAX_SERVICE_STRING, "v1 Y:%d O:%d %d %f %f R:%d %d %f %f", ownScore,
opinion.count, opinion.nullcount, opinion.sum, opinion.sumsq,
rep.count, rep.nullcount, rep.sum, rep.sumsq);
snprintf(line, RSGXS_MAX_SERVICE_STRING, "v1 {%s} {Y:%d O:%d %d %f %f R:%d %d %f %f}",
data.pgpId.c_str(), data.ownScore,
data.opinion.count, data.opinion.nullcount, data.opinion.sum, data.opinion.sumsq,
data.reputation.count, data.reputation.nullcount, data.reputation.sum, data.reputation.sumsq);
str = line;
return true;
}
bool p3IdService::extractIdGroupCache(std::string &str, uint32_t &ownScore, RepCumulScore &opinion, RepCumulScore &rep)
bool p3IdService::extractIdGroupCache(std::string &str, IdGroupServiceStrData &data)
{
char pgpline[RSGXS_MAX_SERVICE_STRING];
char scoreline[RSGXS_MAX_SERVICE_STRING];
uint32_t iOwnScore;
RepCumulScore iOpin;
RepCumulScore iRep;
IdRepCumulScore iOpin;
IdRepCumulScore iRep;
if (9 == sscanf(str.c_str(), "v1 Y:%d O:%d %d %f %f R:%d %d %f %f", &iOwnScore,
&(iOpin.count), &(iOpin.nullcount), &(iOpin.sum), &(iOpin.sumsq),
&(iRep.count), &(iRep.nullcount), &(iRep.sum), &(iRep.sumsq)));
// split into two parts.
if (2 != sscanf(str.c_str(), "v1 {%[^}]} {%[^}]", pgpline, scoreline))
{
ownScore = iOwnScore;
opinion = iOpin;
rep = iRep;
std::cerr << "p3IdService::extractIdGroupCache() Failed to extract Two Parts";
std::cerr << std::endl;
return false;
}
std::cerr << "p3IdService::extractIdGroupCache() pgpline: " << pgpline;
std::cerr << std::endl;
std::cerr << "p3IdService::extractIdGroupCache() scoreline: " << scoreline;
std::cerr << std::endl;
std::string pgptmp = pgpline;
if (pgptmp.length() > 5)
{
std::cerr << "p3IdService::extractIdGroupCache() Believe to have pgpId: " << pgptmp;
std::cerr << std::endl;
data.pgpIdKnown = true;
data.pgpId = pgptmp;
}
else
{
std::cerr << "p3IdService::extractIdGroupCache() Think pgpId Invalid";
std::cerr << std::endl;
data.pgpIdKnown = false;
}
if (9 == sscanf(scoreline, " Y:%d O:%d %d %lf %lf R:%d %d %lf %lf", &iOwnScore,
&(iOpin.count), &(iOpin.nullcount), &(iOpin.sum), &(iOpin.sumsq),
&(iRep.count), &(iRep.nullcount), &(iRep.sum), &(iRep.sumsq)))
{
data.ownScore = iOwnScore;
data.opinion = iOpin;
data.reputation = iRep;
return true;
}
std::cerr << "p3IdService::extractIdGroupCache() Failed to extract scores";
std::cerr << std::endl;
return false;
}
@ -1283,18 +1359,19 @@ bool p3IdService::background_FullCalcRequest()
* - If empty, we are finished.
* - request all latest mesgs
*/
std::list<std::string> groupIds;
{
RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/
mBgPhase = ID_BG_FULLCALC_REQUEST;
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgPhase = ID_BG_REQUEST_FULLCALC;
mBgToken = 0;
mBgGroupMap.clear();
if (mBgFullCalcGroups.empty())
{
/* finished! */
return;
background_cleanup();
return true;
}
@ -1305,14 +1382,14 @@ bool p3IdService::background_FullCalcRequest()
/* request the summary info from the parents */
uint32_t ansType = RS_TOKREQ_ANSTYPE_DATA;
token = 0;
uint32_t token = 0;
RsTokReqOptions opts;
opt.mOptions = RS_TOKREQOPT_MSG_LATEST;
opts.mOptions = RS_TOKREQOPT_MSG_LATEST;
requestMsgInfo(token, ansType, opts, groupIds);
{
RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgToken = token;
}
return true;
@ -1329,7 +1406,7 @@ bool p3IdService::background_processFullCalc()
std::list<RsMsgMetaData> msgList;
std::list<RsMsgMetaData>::iterator it;
RsIdOpinion msg;
RsIdMsg msg;
bool validmsgs = false;
@ -1406,24 +1483,27 @@ bool p3IdService::background_processFullCalc()
{
std::string groupId = msg.mMeta.mGroupId;
std::string str;
if (!encodePostedCache(str, votes, comments))
std::string serviceString;
IdGroupServiceStrData ssData;
if (!encodeIdGroupCache(serviceString, ssData))
{
std::cerr << "p3IdService::background_updateVoteCounts() Failed to encode Votes";
std::cerr << std::endl;
}
else
{
std::cerr << "p3IdService::background_updateVoteCounts() Encoded String: " << str;
std::cerr << "p3IdService::background_updateVoteCounts() Encoded String: " << serviceString;
std::cerr << std::endl;
/* store new result */
setMessageServiceString(it->mMsgId, str);
setGroupServiceString(it->mMsgId, serviceString);
}
}
{
RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/
mBgPhase = ID_BG_PROCESS_VOTES;
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgPhase = ID_BG_IDLE;
mBgToken = 0;
}
@ -1436,16 +1516,36 @@ bool p3IdService::background_cleanup()
std::cerr << "p3IdService::background_cleanup()";
std::cerr << std::endl;
RsStackMutex stack(mPostedMtx); /********** STACK LOCKED MTX ******/
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
// Cleanup.
mBgVoteMap.clear();
mBgCommentMap.clear();
mBgProcessing = false;
mBgPhase = ID_BG_IDLE;
mBgToken = 0;
mBgGroupMap.clear();
mBgFullCalcGroups.clear();
return true;
}
#endif // DISABLED FOR MERGE.
std::ostream &operator<<(std::ostream &out, const RsIdGroup &grp)
{
out << "RsIdGroup: Meta: " << grp.mMeta;
out << " IdType: " << grp.mIdType << " GpgIdHash: " << grp.mGpgIdHash;
out << "(((Unusable: ( GpgIdKnown: " << grp.mGpgIdKnown << " GpgId: " << grp.mGpgId;
out << " GpgName: " << grp.mGpgName << " GpgEmail: " << grp.mGpgEmail << ") )))";
out << std::endl;
return out;
}
std::ostream &operator<<(std::ostream &out, const RsIdMsg &msg)
{
out << "RsIdMsg: Meta: " << msg.mMeta;
//out << " IdType: " << grp.mIdType << " GpgIdHash: " << grp.mGpgIdHash;
out << std::endl;
return out;
}

View File

@ -55,6 +55,37 @@ virtual bool convertMsgToMetaData(void *msgData, RsMsgMetaData &meta);
};
// INTERNAL DATA TYPES...
// Describes data stored in GroupServiceString.
class IdRepCumulScore
{
public:
uint32_t count;
uint32_t nullcount;
double sum;
double sumsq;
// derived parameters:
};
class IdGroupServiceStrData
{
public:
IdGroupServiceStrData() { pgpIdKnown = false; }
bool pgpIdKnown;
std::string pgpId;
uint32_t ownScore;
IdRepCumulScore opinion;
IdRepCumulScore reputation;
};
#define ID_LOCAL_STATUS_FULL_CALC_FLAG 0x00010000
#define ID_LOCAL_STATUS_INC_CALC_FLAG 0x00020000
class p3IdService: public p3GxsDataService, public RsIdentity
{
public:
@ -113,12 +144,34 @@ virtual void generateDummyData();
std::string genRandomId();
IdDataProxy *mIdProxy;
int background_tick();
bool background_checkTokenRequest();
bool background_requestGroups();
bool background_requestNewMessages();
bool background_processNewMessages();
bool background_FullCalcRequest();
bool background_processFullCalc();
bool background_cleanup();
bool encodeIdGroupCache(std::string &str, const IdGroupServiceStrData &data);
bool extractIdGroupCache(std::string &str, IdGroupServiceStrData &data);
IdDataProxy *mIdProxy;
RsMutex mIdMtx;
/***** below here is locked *****/
bool mLastBgCheck;
bool mBgProcessing;
uint32_t mBgToken;
uint32_t mBgPhase;
std::map<std::string, RsGroupMetaData> mBgGroupMap;
std::list<std::string> mBgFullCalcGroups;
bool mUpdated;
#if 0