compile fixes

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@7188 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2014-03-18 22:29:24 +00:00
parent 0f29d28b1b
commit 8cd578f09e
2 changed files with 5 additions and 613 deletions

View File

@ -1464,7 +1464,7 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
RsNxsSyncMsgItem* item = msgItemL.front(); RsNxsSyncMsgItem* item = msgItemL.front();
const RsGxsGroupId& grpId = item->grpId; const RsGxsGroupId& grpId = item->grpId;
std::map<std::string, RsGxsGrpMetaData*> grpMetaMap; std::map<RsGxsGroupId, RsGxsGrpMetaData*> grpMetaMap;
grpMetaMap[grpId] = NULL; grpMetaMap[grpId] = NULL;
mDataStore->retrieveGxsGrpMetaData(grpMetaMap); mDataStore->retrieveGxsGrpMetaData(grpMetaMap);
RsGxsGrpMetaData* grpMeta = grpMetaMap[grpId]; RsGxsGrpMetaData* grpMeta = grpMetaMap[grpId];

View File

@ -533,7 +533,7 @@ bool p3IdService::submitOpinion(uint32_t& token, const RsGxsId &id, bool absOpin
uint32_t intToken; uint32_t intToken;
std::list<RsGxsGroupId> groups; std::list<RsGxsGroupId> groups;
groups.push_back(id); groups.push_back(RsGxsGroupId(id));
RsGenExchange::getTokenService()->requestGroupInfo(intToken, ansType, opts, groups); RsGenExchange::getTokenService()->requestGroupInfo(intToken, ansType, opts, groups);
GxsTokenQueue::queueRequest(intToken, GXSIDREQ_OPINION); GxsTokenQueue::queueRequest(intToken, GXSIDREQ_OPINION);
@ -594,7 +594,7 @@ bool p3IdService::opinion_handlerequest(uint32_t token)
} }
RsGroupMetaData &meta = *(groups.begin()); RsGroupMetaData &meta = *(groups.begin());
if (meta.mGroupId != req.mId) if (meta.mGroupId != RsGxsGroupId(req.mId))
{ {
std::cerr << "p3IdService::opinion_handlerequest() ERROR Id mismatch"; std::cerr << "p3IdService::opinion_handlerequest() ERROR Id mismatch";
std::cerr << std::endl; std::cerr << std::endl;
@ -631,7 +631,7 @@ bool p3IdService::opinion_handlerequest(uint32_t token)
/* set new Group ServiceString */ /* set new Group ServiceString */
uint32_t dummyToken = 0; uint32_t dummyToken = 0;
setGroupServiceString(dummyToken, meta.mGroupId, serviceString); setGroupServiceString(dummyToken, meta.mGroupId, serviceString);
cache_update_if_cached(meta.mGroupId, serviceString); cache_update_if_cached(RsGxsId(meta.mGroupId), serviceString);
updatePublicRequestStatus(req.mToken, RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE); updatePublicRequestStatus(req.mToken, RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE);
return true; return true;
@ -732,7 +732,7 @@ bool p3IdService::updateGroup(uint32_t& token, RsGxsIdGroup &group)
RsGxsGroupUpdateMeta updateMeta(RsGxsGroupId(id.toStdString())); RsGxsGroupUpdateMeta updateMeta(RsGxsGroupId(id.toStdString()));
RsGenExchange::updateGroup(token, updateMeta, item); RsGenExchange::updateGroup(token, item);
// if its in the cache - clear it. // if its in the cache - clear it.
@ -3228,614 +3228,6 @@ Processing Algorithm:
bool p3IdService::reputation_start()
{
if (!CacheArbitration(BG_REPUTATION))
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::reputation_start() Other Events running... Rescheduling";
std::cerr << std::endl;
#endif // DEBUG_IDS
/* reschedule in a bit */
RsTickEvent::schedule_in(GXSID_EVENT_REPUTATION, REPUTATION_RETRY_PERIOD);
return false;
}
CacheArbitrationDone(BG_REPUTATION);
// SCHEDULE NEXT ONE.
RsTickEvent::schedule_in(GXSID_EVENT_REPUTATION, REPUTATION_PERIOD);
return true;
}
#define ID_BACKGROUND_PERIOD 60
int p3IdService::background_tick()
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_tick()";
std::cerr << std::endl;
#endif // DEBUG_IDS
// Run Background Stuff.
background_checkTokenRequest();
/* every minute - run a background check */
time_t now = time(NULL);
bool doCheck = false;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
if (now - mLastBgCheck > ID_BACKGROUND_PERIOD)
{
doCheck = true;
mLastBgCheck = now;
}
}
if (doCheck)
{
//addExtraDummyData();
background_requestGroups();
}
// Add in new votes + comments.
return 0;
}
/***** Background Processing ****
*
* Process Each Message - as it arrives.
*
* Update
*
*/
#define ID_BG_IDLE 0
#define ID_BG_REQUEST_GROUPS 1
#define ID_BG_REQUEST_UNPROCESSED 2
#define ID_BG_REQUEST_FULLCALC 3
bool p3IdService::background_checkTokenRequest()
{
uint32_t token = 0;
uint32_t phase = 0;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
if (!mBgProcessing)
{
return false;
}
token = mBgToken;
phase = mBgPhase;
}
uint32_t status;
//uint32_t reqtype;
//uint32_t anstype;
time_t ts;
status = RsGenExchange::getTokenService()->requestStatus(token);
//checkRequestStatus(token, status, reqtype, anstype, ts);
if (status == RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE)
{
switch(phase)
{
case ID_BG_REQUEST_GROUPS:
background_requestNewMessages();
break;
case ID_BG_REQUEST_UNPROCESSED:
background_processNewMessages();
break;
case ID_BG_REQUEST_FULLCALC:
background_processFullCalc();
break;
default:
break;
}
}
return true;
}
bool p3IdService::background_requestGroups()
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_requestGroups()";
std::cerr << std::endl;
#endif // DEBUG_IDS
// grab all the subscribed groups.
uint32_t token = 0;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
if (mBgProcessing)
{
std::cerr << "p3IdService::background_requestGroups() ERROR Already processing, Skip this cycle";
std::cerr << std::endl;
return false;
}
mBgProcessing = true;
mBgPhase = ID_BG_REQUEST_GROUPS;
mBgToken = 0;
}
uint32_t ansType = RS_TOKREQ_ANSTYPE_SUMMARY;
RsTokReqOptions opts;
std::list<RsGxsGroupId> groupIds;
/**
TODO
opts.mStatusFilter = RSGXS_GROUP_STATUS_NEWMSG;
opts.mStatusMask = RSGXS_GROUP_STATUS_NEWMSG;
**/
RsGenExchange::getTokenService()->requestGroupInfo(token, ansType, opts, groupIds);
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgToken = token;
}
return true;
}
bool p3IdService::background_requestNewMessages()
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_requestNewMessages()";
std::cerr << std::endl;
#endif // DEBUG_IDS
std::list<RsGroupMetaData> modGroupList;
std::list<RsGroupMetaData>::iterator it;
std::list<RsGxsGroupId> groupIds;
uint32_t token = 0;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
token = mBgToken;
}
if (!getGroupSummary(token, modGroupList))
{
std::cerr << "p3IdService::background_requestNewMessages() ERROR No Group List";
std::cerr << std::endl;
background_cleanup();
return false;
}
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgPhase = ID_BG_REQUEST_UNPROCESSED;
mBgToken = 0;
/* now we process the modGroupList -> a map so we can use it easily later, and create id list too */
for(it = modGroupList.begin(); it != modGroupList.end(); it++)
{
/*** TODO
uint32_t dummyToken = 0;
setGroupStatusFlags(dummyToken, it->mGroupId, 0, RSGXS_GROUP_STATUS_NEWMSG);
***/
mBgGroupMap[it->mGroupId] = *it;
groupIds.push_back(it->mGroupId);
}
}
uint32_t ansType = RS_TOKREQ_ANSTYPE_SUMMARY;
RsTokReqOptions opts;
token = 0;
/* TODO
opts.mStatusFilter = RSGXS_MSG_STATUS_UNPROCESSED;
opts.mStatusMask = RSGXS_MSG_STATUS_UNPROCESSED;
*/
RsGenExchange::getTokenService()->requestMsgInfo(token, ansType, opts, groupIds);
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgToken = token;
}
return true;
}
bool p3IdService::background_processNewMessages()
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processNewMessages()";
std::cerr << std::endl;
#endif // DEBUG_IDS
GxsMsgMetaMap newMsgMap;
GxsMsgMetaMap::iterator it;
uint32_t token = 0;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
token = mBgToken;
}
if (!getMsgSummary(token, newMsgMap))
{
std::cerr << "p3IdService::background_processNewMessages() ERROR No New Msgs";
std::cerr << std::endl;
background_cleanup();
return false;
}
/* iterate through the msgs.. update the mBgGroupMap with new data,
* and flag these items as modified - so we rewrite them to the db later.
*
* If a message is not an original -> store groupId for requiring full analysis later.
*/
std::map<RsGxsGroupId, RsGroupMetaData>::iterator mit;
for (it = newMsgMap.begin(); it != newMsgMap.end(); it++)
{
std::vector<RsMsgMetaData>::iterator vit;
for(vit = it->second.begin(); vit != it->second.end(); vit++)
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processNewMessages() new MsgId: " << vit->mMsgId;
std::cerr << std::endl;
#endif // DEBUG_IDS
/* flag each new vote as processed */
/**
TODO
uint32_t dummyToken = 0;
setMsgStatusFlags(dummyToken, it->mMsgId, 0, RSGXS_MSG_STATUS_UNPROCESSED);
**/
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mit = mBgGroupMap.find(it->first);
if (mit == mBgGroupMap.end())
{
std::cerr << "p3IdService::background_processNewMessages() ERROR missing GroupId: ";
std::cerr << vit->mGroupId;
std::cerr << std::endl;
/* error */
continue;
}
if (mit->second.mGroupStatus & ID_LOCAL_STATUS_FULL_CALC_FLAG)
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processNewMessages() Group Already marked FULL_CALC";
std::cerr << std::endl;
#endif // DEBUG_IDS
/* already marked */
continue;
}
if (vit->mMsgId != vit->mOrigMsgId)
{
/*
* not original -> hard, redo calc (alt: could substract previous score)
*/
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processNewMessages() Update, mark for FULL_CALC";
std::cerr << std::endl;
#endif // DEBUG_IDS
mit->second.mGroupStatus |= ID_LOCAL_STATUS_FULL_CALC_FLAG;
}
else
{
/*
* Try incremental calculation.
* - extract parameters from group.
* - increment, & save back.
* - flag group as modified.
*/
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processNewMessages() NewOpt, Try Inc Calc";
std::cerr << std::endl;
#endif // DEBUG_IDS
mit->second.mGroupStatus |= ID_LOCAL_STATUS_INC_CALC_FLAG;
SSGxsIdGroup ssdata;
if (!ssdata.load(mit->second.mServiceString))
{
/* error */
std::cerr << "p3IdService::background_processNewMessages() ERROR Extracting";
std::cerr << std::endl;
}
#ifdef DEBUG_IDS
/* do calcs */
std::cerr << "p3IdService::background_processNewMessages() Extracted: ";
std::cerr << std::endl;
/* store it back in */
std::cerr << "p3IdService::background_processNewMessages() Stored: ";
std::cerr << std::endl;
#endif // DEBUG_IDS
std::string serviceString = ssdata.save();
if (0)
{
/* error */
std::cerr << "p3IdService::background_processNewMessages() ERROR Storing";
std::cerr << std::endl;
}
}
}
}
/* now iterate through groups again
* -> update status as we go
* -> record one requiring a full analyssis
*/
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processNewMessages() Checking Groups for Calc Type";
std::cerr << std::endl;
#endif // DEBUG_IDS
for(mit = mBgGroupMap.begin(); mit != mBgGroupMap.end(); mit++)
{
if (mit->second.mGroupStatus & ID_LOCAL_STATUS_FULL_CALC_FLAG)
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processNewMessages() FullCalc for: ";
std::cerr << mit->second.mGroupId;
std::cerr << std::endl;
#endif // DEBUG_IDS
mBgFullCalcGroups.push_back(mit->second.mGroupId);
}
else if (mit->second.mGroupStatus & ID_LOCAL_STATUS_INC_CALC_FLAG)
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processNewMessages() IncCalc done for: ";
std::cerr << mit->second.mGroupId;
std::cerr << std::endl;
#endif // DEBUG_IDS
/* set Cache */
uint32_t dummyToken = 0;
setGroupServiceString(dummyToken, mit->second.mGroupId, mit->second.mServiceString);
}
else
{
/* why is it here? error. */
std::cerr << "p3IdService::background_processNewMessages() ERROR for: ";
std::cerr << mit->second.mGroupId;
std::cerr << std::endl;
}
}
}
return background_FullCalcRequest();
}
bool p3IdService::background_FullCalcRequest()
{
/*
* grab an GroupId from List.
* - If empty, we are finished.
* - request all latest mesgs
*/
std::list<RsGxsGroupId> groupIds;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgPhase = ID_BG_REQUEST_FULLCALC;
mBgToken = 0;
mBgGroupMap.clear();
if (mBgFullCalcGroups.empty())
{
/* finished! */
background_cleanup();
return true;
}
groupIds.push_back(mBgFullCalcGroups.front());
mBgFullCalcGroups.pop_front();
}
/* request the summary info from the parents */
uint32_t ansType = RS_TOKREQ_ANSTYPE_DATA;
uint32_t token = 0;
RsTokReqOptions opts;
opts.mOptions = RS_TOKREQOPT_MSG_LATEST;
RsGenExchange::getTokenService()->requestMsgInfo(token, ansType, opts, groupIds);
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgToken = token;
}
return true;
}
bool p3IdService::background_processFullCalc()
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processFullCalc()";
std::cerr << std::endl;
#endif // DEBUG_IDS
std::list<RsMsgMetaData> msgList;
std::list<RsMsgMetaData>::iterator it;
bool validmsgs = false;
/* calc variables */
uint32_t opinion_count = 0;
uint32_t opinion_nullcount = 0;
double opinion_sum = 0;
double opinion_sumsq = 0;
uint32_t rep_count = 0;
uint32_t rep_nullcount = 0;
double rep_sum = 0;
double rep_sumsq = 0;
std::vector<RsGxsIdOpinion> opinions;
std::vector<RsGxsIdOpinion>::iterator vit;
if (!getMsgData(mBgToken, opinions))
{
std::cerr << "p3IdService::background_processFullCalc() ERROR Failed to get Opinions";
std::cerr << std::endl;
return false;
}
RsGxsGroupId groupId;
for(vit = opinions.begin(); vit != opinions.end(); vit++)
{
RsGxsIdOpinion &opinion = *vit;
/* These should all be same group - should check for sanity! */
if (groupId.isNull())
{
groupId = opinion.mMeta.mGroupId;
}
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processFullCalc() Msg:";
std::cerr << opinion;
std::cerr << std::endl;
#endif // DEBUG_IDS
validmsgs = true;
/* for each opinion.... extract score, and reputation */
if (opinion.mOpinion != 0)
{
opinion_count++;
opinion_sum += opinion.mOpinion;
opinion_sum += (opinion.mOpinion * opinion.mOpinion);
}
else
{
opinion_nullcount++;
}
/* for each opinion.... extract score, and reputation */
if (opinion.mReputation != 0)
{
rep_nullcount++;
rep_sum += opinion.mReputation;
rep_sum += (opinion.mReputation * opinion.mReputation);
}
else
{
rep_nullcount++;
}
}
double opinion_avg = 0;
double opinion_var = 0;
double opinion_frac = 0;
double rep_avg = 0;
double rep_var = 0;
double rep_frac = 0;
if (opinion_count)
{
opinion_avg = opinion_sum / opinion_count;
opinion_var = (opinion_sumsq - opinion_count * opinion_avg * opinion_avg) / opinion_count;
opinion_frac = opinion_count / ((float) (opinion_count + opinion_nullcount));
}
if (rep_count)
{
rep_avg = rep_sum / rep_count;
rep_var = (rep_sumsq - rep_count * rep_avg * rep_avg) / rep_count;
rep_frac = rep_count / ((float) (rep_count + rep_nullcount));
}
if (validmsgs)
{
SSGxsIdGroup ssdata;
std::string serviceString = ssdata.save();
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_updateVoteCounts() Encoded String: " << serviceString;
std::cerr << std::endl;
#endif // DEBUG_IDS
/* store new result */
uint32_t dummyToken = 0;
setGroupServiceString(dummyToken, groupId, serviceString);
}
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgPhase = ID_BG_IDLE;
mBgToken = 0;
}
return background_FullCalcRequest();
}
bool p3IdService::background_cleanup()
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_cleanup()";
std::cerr << std::endl;
#endif // DEBUG_IDS
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
// Cleanup.
mBgProcessing = false;
mBgPhase = ID_BG_IDLE;
mBgToken = 0;
mBgGroupMap.clear();
mBgFullCalcGroups.clear();
return true;
}
std::ostream &operator<<(std::ostream &out, const RsGxsIdGroup &grp) std::ostream &operator<<(std::ostream &out, const RsGxsIdGroup &grp)
{ {
out << "RsGxsIdGroup: Meta: " << grp.mMeta; out << "RsGxsIdGroup: Meta: " << grp.mMeta;