added option to RsTokReqOptions for group subscription filter

added code for rank calculation best,top, and newest (not enabled, not working yet...)
added variables for circles to grp meta type and modified storage and serialisation accordingly




git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-gxs-b1@5930 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2012-12-02 19:40:17 +00:00
parent 083c4411b8
commit bcf9f443b4
14 changed files with 660 additions and 49 deletions

View file

@ -1,9 +1,17 @@
#include <algorithm>
#include <math.h>
#include "p3posted.h"
#include "gxs/rsgxsflags.h"
#include "serialiser/rsposteditems.h"
const uint32_t RsPosted::FLAG_MSGTYPE_COMMENT = 0x0001;
const uint32_t RsPosted::FLAG_MSGTYPE_POST = 0x0002;
const uint32_t RsPosted::FLAG_MSGTYPE_VOTE = 0x0004;
const uint32_t RsPosted::FLAG_MSGTYPE_MASK = 0x000f;
#define POSTED_MAX_SERVICE_STRING 50
RsPosted *rsPosted = NULL;
@ -13,9 +21,17 @@ RsPostedComment::RsPostedComment(const RsGxsPostedCommentItem & item)
mMeta = item.meta;
}
p3Posted::p3Posted(RsGeneralDataService *gds, RsNetworkExchangeService *nes)
: RsGenExchange(gds, nes, new RsGxsPostedSerialiser(), RS_SERVICE_GXSV1_TYPE_POSTED), RsPosted(this), mPostedMutex("Posted")
RsPostedVote::RsPostedVote(const RsGxsPostedVoteItem& item)
{
mDirection = item.mVote.mDirection;
mMeta = item.meta;
}
p3Posted::p3Posted(RsGeneralDataService *gds, RsNetworkExchangeService *nes)
: RsGenExchange(gds, nes, new RsGxsPostedSerialiser(), RS_SERVICE_GXSV1_TYPE_POSTED), RsPosted(this), mPostedMutex("Posted"),
mTokenService(NULL)
{
mTokenService = RsGenExchange::getTokenService();
}
void p3Posted::notifyChanges(std::vector<RsGxsNotify *> &changes)
@ -154,7 +170,7 @@ bool p3Posted::submitVote(uint32_t &token, RsPostedVote &vote)
RsGxsPostedVoteItem* voteItem = new RsGxsPostedVoteItem();
voteItem->mVote = vote;
voteItem->meta = vote.mMeta;
voteItem->meta.mMsgFlags |= FLAG_MSGTYPE_POST;
voteItem->meta.mMsgFlags |= FLAG_MSGTYPE_VOTE;
RsGenExchange::publishMsg(token, voteItem);
return true;
@ -221,6 +237,7 @@ void p3Posted::processMessageRanks()
RsStackMutex stack(mPostedMutex);
std::map<uint32_t, GxsPostedPostRanking*>::iterator mit =mPendingPostRanks.begin();
// go through all pending posts
for(; mit !=mPendingPostRanks.begin(); mit++)
{
uint32_t token;
@ -235,7 +252,7 @@ void p3Posted::processMessageRanks()
while(true)
{
uint32_t status = RsGenExchange::getTokenService()->requestStatus(token);
uint32_t status = mTokenService->requestStatus(token);
if(RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE
== status)
@ -259,38 +276,417 @@ void p3Posted::processMessageRanks()
void p3Posted::discardCalc(const uint32_t &token)
{
mTokenService->cancelRequest(token);
}
bool PostedTopScoreComp(const PostedScore& i, const PostedScore& j)
{
if((i.upVotes + (-i.downVotes)) == (j.upVotes + (-j.downVotes))){
return i.date < j.date;
}else
return (i.upVotes + (-i.downVotes)) < (j.upVotes + (-j.downVotes));
}
bool PostedNewScoreComp(const PostedScore& i, const PostedScore& j)
{
return i.date < j.date;
}
bool PostedBestScoreComp(const PostedScore& i, const PostedScore& j)
{
// n = ups + downs if n == 0: return 0 z = 1.0
// #1.0 = 85%, 1.6 = 95% phat = float(ups)
// / n return sqrt(phat+z*z/(2*n)-z*((phat*(1-phat)+z*z/(4*n))/n))/(1+z*z/n)
// def confidence(ups, downs): if ups + downs == 0: return 0 else:
// return _confidence(ups, downs)
// very expensive!!
static float z = 1.0;
float phat;
float i_score;
int n = i.upVotes + (-i.downVotes);
if(n==0)
i_score = 0.;
else
{
phat = float(i.upVotes);
i_score = sqrt(phat+z*z/(2*n)-z*((phat*(1-phat)+z*z/(4*n))/n))/(1+z*z/n);
}
float j_score;
n = j.upVotes + (-j.downVotes);
if(n==0)
j_score = 0.;
else
{
phat = float(j.upVotes);
j_score = sqrt(phat+z*z/(2*n)-z*((phat*(1-phat)+z*z/(4*n))/n))/(1+z*z/n);
}
if(j_score == i_score)
return i.date < j.date;
else
return i_score < j_score;
}
void p3Posted::completePostedPostCalc(GxsPostedPostRanking *gpp)
{
GxsMsgMetaMap msgMetas;
getMsgMeta(gpp->reqToken, msgMetas);
GxsMsgMetaMap::iterator mit = msgMetas.begin();
for(; mit != msgMetas.end(); mit++ )
if(getMsgMeta(gpp->reqToken, msgMetas))
{
RsGxsMsgMetaData* m = NULL;
//retrieveScores(m->mServiceString, upVotes, downVotes, nComments);
std::vector<RsMsgMetaData> msgMetaV = msgMetas[gpp->grpId];
switch(gpp->rType)
{
case NewRankType:
calcPostedPostRank(msgMetaV, gpp->rankingResult, PostedNewScoreComp);
break;
case TopRankType:
calcPostedPostRank(msgMetaV, gpp->rankingResult, PostedTopScoreComp);
break;
default:
std::cerr << "Unknown ranking tpye: " << gpp->rType << std::endl;
}
}
}
// then dependent on rank request type process for that way
void p3Posted::calcPostedPostRank(const std::vector<RsMsgMetaData > msgMeta, PostedRanking &ranking,
bool comp(const PostedScore &, const PostedScore &)) const
{
std::vector<RsMsgMetaData>::const_iterator cit = msgMeta.begin();
std::vector<PostedScore> scores;
for(; cit != msgMeta.begin(); )
{
const RsMsgMetaData& m = *cit;
uint32_t upVotes, downVotes, nComments;
retrieveScores(m.mServiceString, upVotes, downVotes, nComments);
PostedScore c;
c.upVotes = upVotes;
c.downVotes = downVotes;
c.date = m.mPublishTs;
scores.push_back(c);
}
std::sort(scores.begin(), scores.end(), comp);
std::vector<PostedScore>::iterator vit = scores.begin();
int i = 1;
for(; vit != scores.end(); vit)
{
const PostedScore& p = *vit;
ranking.insert(std::make_pair(p.msgId, i++));
}
}
void p3Posted::calcPostedCommentsRank(const std::map<RsGxsMessageId, std::vector<RsGxsMessageId> > &msgBranches,
std::map<RsGxsMessageId, RsMsgMetaData >& msgMetas, PostedRanking &ranking, bool comp(const PostedScore &, const PostedScore &)) const
{
std::map<RsGxsMessageId, std::vector<RsGxsMessageId> >::const_iterator cit = msgBranches.begin();
for(; cit != msgBranches.end(); cit++)
{
const std::vector<RsGxsMessageId>& branch = cit->second;
std::vector<PostedScore> scores;
std::vector<RsGxsMessageId>::const_iterator vit = branch.begin();
for(; vit != branch.end(); vit++)
{
std::map<RsGxsMessageId, RsMsgMetaData >::iterator mit =
msgMetas.find(*vit);
if(mit != msgMetas.end())
{
uint32_t upVotes, downVotes, nComments;
const RsMsgMetaData& m = mit->second;
retrieveScores(m.mServiceString, upVotes, downVotes, nComments);
PostedScore c;
c.upVotes = upVotes;
c.downVotes = downVotes;
c.date = m.mPublishTs;
scores.push_back(c);
}
}
std::sort(scores.begin(), scores.end(), comp);
std::vector<PostedScore>::iterator cvit = scores.begin();
int i = 1;
for(; cvit != scores.end(); cvit)
{
const PostedScore& p = *cvit;
ranking.insert(std::make_pair(p.msgId, i++));
}
}
}
bool p3Posted::retrieveScores(const std::string &serviceString, uint32_t &upVotes, uint32_t downVotes, uint32_t nComments)
void p3Posted::completePostedCommentRanking(GxsPostedCommentRanking *gpc)
{
if (2 == sscanf(serviceString.c_str(), "%d %d %d", &upVotes, &downVotes, &nComments))
GxsMsgRelatedMetaMap msgMetas;
if(getMsgRelatedMeta(gpc->reqToken, msgMetas))
{
return true;
// create map of msgs
std::vector<RsMsgMetaData>& msgV = msgMetas[gpc->msgId];
std::map<RsGxsMessageId, std::vector<RsGxsMessageId> > msgBranches;
std::map<RsGxsMessageId, RsMsgMetaData> remappedMsgMeta;
std::vector<RsMsgMetaData>::iterator vit = msgV.begin();
for(; vit != msgV.end(); vit++)
{
const RsMsgMetaData& m = *vit;
if(!m.mParentId.empty())
{
msgBranches[m.mParentId].push_back(m.mMsgId);
}
remappedMsgMeta.insert(std::make_pair(m.mMsgId, m));
}
switch(gpc->rType)
{
case BestRankType:
calcPostedCommentsRank(msgBranches, remappedMsgMeta, gpc->result, PostedBestScoreComp);
break;
case TopRankType:
calcPostedCommentsRank(msgBranches, remappedMsgMeta, gpc->result, PostedTopScoreComp);
break;
case NewRankType:
calcPostedCommentsRank(msgBranches, remappedMsgMeta, gpc->result, PostedNewScoreComp);
break;
default:
std::cerr << "Unknown Rank type" << gpc->rType << std::endl;
break;
}
}
}
bool p3Posted::retrieveScores(const std::string &serviceString, uint32_t &upVotes, uint32_t downVotes, uint32_t nComments) const
{
if (3 == sscanf(serviceString.c_str(), "%d %d %d", &upVotes, &downVotes, &nComments))
{
return true;
}
return false;
}
bool p3Posted::storeScores(std::string &serviceString, uint32_t &upVotes, uint32_t downVotes, uint32_t nComments) const
{
char line[POSTED_MAX_SERVICE_STRING];
bool ok = snprintf(line, POSTED_MAX_SERVICE_STRING, "%d %d %d", upVotes, downVotes, nComments) > -1;
serviceString = line;
return ok;
}
void p3Posted::processCommentRanks()
{
}
void p3Posted::updateVotes()
{
if(!mUpdateTokenQueued)
{
mUpdateTokenQueued = true;
switch(mUpdatePhase)
{
// case UPDATE_PHASE_GRP_REQUEST:
// {
// updateRequestGroups(mUpda);
// break;
// }
// case UPDATE_PHASE_GRP_MSG_REQUEST:
// {
// updateRequestMessages(mVoteUpdataToken);
// break;
// }
// case UPDATE_VOTE_COMMENT_REQUEST:
// {
// updateRequestVotesComments(mVoteUpdataToken);
// break;
// }
// case UPDATE_COMPLETE_UPDATE:
// {
// updateCompleteUpdate();
// break;
// }
// default:
// break;
}
// first get all msgs for groups for which you are subscribed to.
// then request comments for them
}
}
bool p3Posted::updateRequestGroups(uint32_t &token)
{
RsTokReqOptions opts;
opts.mReqType = GXS_REQUEST_TYPE_GROUP_IDS;
opts.mSubscribeMask = GXS_SERV::GROUP_SUBSCRIBE_MASK;
opts.mSubscribeFilter = GXS_SERV::GROUP_SUBSCRIBE_ADMIN |
GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED;
mTokenService->requestGroupInfo(token, 0, opts);
mUpdatePhase = UPDATE_PHASE_GRP_MSG_REQUEST;
}
bool p3Posted::updateRequestMessages(uint32_t &token)
{
uint32_t status = mTokenService->requestStatus(token);
if(status == RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE)
{
std::list<RsGxsGroupId> grpIds;
RsGenExchange::getGroupList(token, grpIds);
RsTokReqOptions opts;
opts.mReqType = GXS_REQUEST_TYPE_MSG_IDS;
opts.mOptions = RS_TOKREQOPT_MSG_LATEST | RS_TOKREQOPT_MSG_THREAD;
mTokenService->requestMsgInfo(token, 0, opts, grpIds);
mUpdatePhase = UPDATE_VOTE_COMMENT_REQUEST;
return true;
}
else if(status == RsTokenService::GXS_REQUEST_V2_STATUS_FAILED)
{
mTokenService->cancelRequest(token);
return false;
}
}
bool p3Posted::updateRequestVotesComments(uint32_t &token)
{
uint32_t status = mTokenService->requestStatus(token);
if(status == RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE)
{
GxsMsgIdResult result;
RsGenExchange::getMsgList(token, result);
std::vector<RsGxsGrpMsgIdPair> msgIds;
GxsMsgIdResult::iterator mit = result.begin();
for(; mit != result.end(); mit++)
{
std::vector<RsGxsMessageId>& msgIdV = mit->second;
std::vector<RsGxsMessageId>::const_iterator cit = msgIdV.begin();
for(; cit != msgIdV.end(); cit++)
msgIds.push_back(std::make_pair(mit->first, *cit));
}
// only need ids for comments
RsTokReqOptions opts;
opts.mReqType = GXS_REQUEST_TYPE_MSG_IDS;
opts.mOptions = RS_TOKREQOPT_MSG_LATEST | RS_TOKREQOPT_MSG_PARENT;
opts.mMsgFlagMask = RsPosted::FLAG_MSGTYPE_MASK;
opts.mMsgFlagFilter = RsPosted::FLAG_MSGTYPE_COMMENT;
mTokenService->requestMsgRelatedInfo(mCommentToken, 0, opts, msgIds);
// need actual data from votes
opts.mReqType = GXS_REQUEST_TYPE_MSG_DATA;
opts.mOptions = RS_TOKREQOPT_MSG_LATEST | RS_TOKREQOPT_MSG_PARENT;
opts.mMsgFlagMask = RsPosted::FLAG_MSGTYPE_MASK;
opts.mMsgFlagFilter = RsPosted::FLAG_MSGTYPE_VOTE;
mTokenService->requestMsgRelatedInfo(mVoteToken, 0, opts, msgIds);
mUpdatePhase = UPDATE_COMPLETE_UPDATE;
mMsgsPendingUpdate = msgIds;
return true;
}
else if(status == RsTokenService::GXS_REQUEST_V2_STATUS_FAILED)
{
mTokenService->cancelRequest(token);
return false;
}
}
bool p3Posted::updateCompleteUpdate()
{
uint32_t commentStatus = mTokenService->requestStatus(mCommentToken);
uint32_t voteStatus = mTokenService->requestStatus(mVoteToken);
bool ready = commentStatus == RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE;
ready &= voteStatus == RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE;
bool failed = commentStatus == RsTokenService::GXS_REQUEST_V2_STATUS_FAILED;
failed &= voteStatus == RsTokenService::GXS_REQUEST_V2_STATUS_FAILED;
if(ready)
{
std::map<RsGxsGrpMsgIdPair, std::vector<RsGxsMessageId> > msgCommentIds;
std::map<RsGxsGrpMsgIdPair, std::vector<RsPostedVote> > votes;
getMsgRelatedDataT<RsGxsPostedVoteItem, RsPostedVote>(mVoteToken, votes);
std::vector<RsGxsGrpMsgIdPair>::iterator vit = mMsgsPendingUpdate.begin();
for(; vit != mMsgsPendingUpdate.end();vit++)
{
updateMsg(*vit, votes[*vit], msgCommentIds[*vit]);
}
mUpdatePhase = 0;
}
else if(failed)
{
mTokenService->cancelRequest(mCommentToken);
mTokenService->cancelRequest(mVoteToken);
return false;
}else
{
return true;
}
}
bool p3Posted::updateMsg(const RsGxsGrpMsgIdPair& msgId, const std::vector<RsPostedVote> &msgVotes,
const std::vector<RsGxsMessageId>& msgCommentIds)
{
uint32_t nComments = msgCommentIds.size();
uint32_t nUp = 0, nDown = 0;
std::vector<RsPostedVote>::const_iterator cit = msgVotes.begin();
for(; cit != msgVotes.end(); cit++)
{
const RsPostedVote& v = *cit;
if(v.mDirection == 0)
{
nDown++;
}else
{
nUp++;
}
}
std::string servStr;
storeScores(servStr, nUp, nDown, nComments);
uint32_t token;
setMsgServiceString(token, msgId, servStr);
}

View file

@ -15,7 +15,7 @@ public:
uint32_t reqToken;
RsPosted::RankType rType;
RsGxsGroupId grpId;
PostedRanking result;
PostedRanking rankingResult;
};
class GxsPostedCommentRanking
@ -29,6 +29,19 @@ public:
PostedRanking result;
};
class PostedScore {
public:
int32_t upVotes, downVotes;
time_t date;
RsGxsMessageId msgId;
};
#define UPDATE_PHASE_GRP_REQUEST 1
#define UPDATE_PHASE_GRP_MSG_REQUEST 2
#define UPDATE_VOTE_COMMENT_REQUEST 3
#define UPDATE_COMPLETE_UPDATE 4
class p3Posted : public RsGenExchange, public RsPosted
{
public:
@ -67,21 +80,57 @@ public:
private:
/* Functions for processing rankings */
void processRankings();
void processMessageRanks();
void processCommentRanks();
void discardCalc(const uint32_t& token);
void completePostedPostCalc(GxsPostedPostRanking* gpp);
bool retrieveScores(const std::string& serviceString, uint32_t& upVotes, uint32_t downVotes, uint32_t nComments);
void completePostedCommentRanking(GxsPostedCommentRanking* gpc);
bool retrieveScores(const std::string& serviceString, uint32_t& upVotes, uint32_t downVotes, uint32_t nComments) const;
bool storeScores(std::string& serviceString, uint32_t& upVotes, uint32_t downVotes, uint32_t nComments) const;
// for posts
void calcPostedPostRank(const std::vector<RsMsgMetaData>, PostedRanking& ranking, bool com(const PostedScore& i, const PostedScore &j)) const;
// for comments
void calcPostedCommentsRank(const std::map<RsGxsMessageId, std::vector<RsGxsMessageId> >& msgBranches, std::map<RsGxsMessageId, RsMsgMetaData>& msgMetas,
PostedRanking& ranking, bool com(const PostedScore& i, const PostedScore &j)) const;
/* Functions for maintaing vote counts in meta data */
/*!
* Update votes should only be called when a vote comes in
* Several phases to calculating votes.
* First get all messages for groups which you are subscribed
* Then for these messages get all the votes accorded to them
* Then do the calculation and update messages
* Also stores updates for messages which have new scores
*/
void updateVotes();
bool updateRequestGroups(uint32_t& token);
bool updateRequestMessages(uint32_t& token);
bool updateRequestVotesComments(uint32_t& token);
bool updateCompleteUpdate();
bool updateMsg(const RsGxsGrpMsgIdPair& msgId, const std::vector<RsPostedVote>& msgVotes,
const std::vector<RsGxsMessageId>& msgCommentIds);
private:
// for calculating ranks
std::map<uint32_t, GxsPostedPostRanking*> mPendingPostRanks;
std::map<uint32_t, GxsPostedPostRanking*> mPendingCalculationPostRanks;
std::map<uint32_t, GxsPostedCommentRanking*> mPendingCommentRanks;
std::map<uint32_t, GxsPostedCommentRanking*> mPendingCalculationCommentRanks;
// for maintaining vote counts in msg meta
uint32_t mVoteUpdataToken, mVoteToken, mCommentToken;
bool mUpdateTokenQueued;
uint32_t mUpdatePhase;
std::vector<RsGxsGrpMsgIdPair> mMsgsPendingUpdate;
RsTokenService* mTokenService;
RsMutex mPostedMutex;
};