merge of branch v0.6-idclean 7180

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@7187 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2014-03-17 20:56:06 +00:00
parent 7815efb16f
commit 0f29d28b1b
397 changed files with 6503 additions and 5702 deletions

View file

@ -473,7 +473,7 @@ bool p3IdService::haveReputation(const RsGxsId &id)
return haveKey(id);
}
bool p3IdService::loadReputation(const RsGxsId &id, const std::list<std::string>& peers)
bool p3IdService::loadReputation(const RsGxsId &id, const std::list<PeerId>& peers)
{
if (haveKey(id))
return true;
@ -684,7 +684,7 @@ bool p3IdService::getGroupData(const uint32_t &token, std::vector<RsGxsIdGroup>
else
{
group.mPgpKnown = false;
group.mPgpId = "";
group.mPgpId.clear();
std::cerr << "p3IdService::getGroupData() Failed to decode ServiceString";
std::cerr << std::endl;
@ -720,7 +720,7 @@ bool p3IdService::createGroup(uint32_t& token, RsGxsIdGroup &group)
bool p3IdService::updateGroup(uint32_t& token, RsGxsIdGroup &group)
{
RsGxsId id = group.mMeta.mGroupId;
RsGxsId id = RsGxsId(group.mMeta.mGroupId.toStdString());
RsGxsIdGroupItem* item = new RsGxsIdGroupItem();
item->group = group;
item->meta = group.mMeta;
@ -730,8 +730,10 @@ bool p3IdService::updateGroup(uint32_t& token, RsGxsIdGroup &group)
RsGenExchange::updateGroup(token, item);
//RsGxsGroupUpdateMeta updateMeta(id);
//RsGenExchange::updateGroup(token, updateMeta, item);
RsGxsGroupUpdateMeta updateMeta(RsGxsGroupId(id.toStdString()));
RsGenExchange::updateGroup(token, updateMeta, item);
// if its in the cache - clear it.
{
@ -782,7 +784,8 @@ bool SSGxsIdPgp::load(const std::string &input)
if (1 == sscanf(input.c_str(), "K:1 I:%[^)]", pgpline))
{
idKnown = true;
pgpId = pgpline;
std::string str_line = pgpline;
pgpId = RsPgpId(str_line);
}
else if (1 == sscanf(input.c_str(), "K:0 T:%d", &timestamp))
{
@ -802,7 +805,7 @@ std::string SSGxsIdPgp::save() const
if (idKnown)
{
output += "K:1 I:";
output += pgpId;
output += pgpId.toStdString();
}
else
{
@ -819,7 +822,7 @@ std::string SSGxsIdPgp::save() const
bool SSGxsIdRecognTags::load(const std::string &input)
{
char pgpline[RSGXSID_MAX_SERVICE_STRING];
//char pgpline[RSGXSID_MAX_SERVICE_STRING];
int pubTs = 0;
int lastTs = 0;
uint32_t flags = 0;
@ -1122,7 +1125,7 @@ RsGxsIdCache::RsGxsIdCache(const RsGxsIdGroupItem *item, const RsTlvSecurityKey
// Fill in Details.
details.mNickname = item->meta.mGroupName;
details.mId = item->meta.mGroupId;
details.mId = item->meta.mGroupId.toStdString();
#ifdef DEBUG_IDS
std::cerr << "RsGxsIdCache::RsGxsIdCache() for: " << details.mId;
@ -1153,7 +1156,7 @@ void RsGxsIdCache::updateServiceString(std::string serviceString)
}
else
{
details.mPgpId = "";
details.mPgpId.clear();
}
}
@ -1211,7 +1214,7 @@ void RsGxsIdCache::updateServiceString(std::string serviceString)
else
{
details.mPgpKnown = false;
details.mPgpId = "";
details.mPgpId.clear();
details.mReputation.updateIdScore(false, false);
details.mReputation.update();
}
@ -1312,12 +1315,12 @@ bool p3IdService::cache_process_recogntaginfo(const RsGxsIdGroupItem *item, std:
recogn_extract_taginfo(item, tagItems);
time_t now = time(NULL);
//time_t now = time(NULL);
for(it = tagItems.begin(); it != tagItems.end(); it++)
{
RsRecognTag info((*it)->tag_class, (*it)->tag_type, false);
bool isPending = false;
if (recogn_checktag(item->meta.mGroupId, item->meta.mGroupName, *it, false, isPending))
if (recogn_checktag(RsGxsId(item->meta.mGroupId.toStdString()), item->meta.mGroupName, *it, false, isPending))
{
info.valid = true;
}
@ -1368,8 +1371,8 @@ bool p3IdService::cache_store(const RsGxsIdGroupItem *item)
bool pub_key_ok = false;
bool full_key_ok = false;
RsGxsId id = item->meta.mGroupId;
if (!getGroupKeys(id, keySet))
RsGxsId id (item->meta.mGroupId.toStdString());
if (!getGroupKeys(RsGxsGroupId(id.toStdString()), keySet))
{
std::cerr << "p3IdService::cache_store() ERROR getting GroupKeys for: ";
std::cerr << item->meta.mGroupId;
@ -1442,7 +1445,7 @@ bool p3IdService::cache_store(const RsGxsIdGroupItem *item)
#define MIN_CYCLE_GAP 2
bool p3IdService::cache_request_load(const RsGxsId &id, const std::list<std::string>& peers)
bool p3IdService::cache_request_load(const RsGxsId &id, const std::list<PeerId>& peers)
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::cache_request_load(" << id << ")";
@ -1483,14 +1486,14 @@ bool p3IdService::cache_start_load()
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
/* now we process the modGroupList -> a map so we can use it easily later, and create id list too */
std::map<RsGxsId, std::list<std::string> >::iterator it;
std::map<RsGxsId, std::list<RsPeerId> >::iterator it;
for(it = mCacheLoad_ToCache.begin(); it != mCacheLoad_ToCache.end(); it++)
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::cache_start_load() GroupId: " << it->first;
std::cerr << std::endl;
#endif // DEBUG_IDS
groupIds.push_back(it->first); // might need conversion?
groupIds.push_back(RsGxsGroupId(it->first.toStdString())); // might need conversion?
}
mPendingCache.insert(mCacheLoad_ToCache.begin(), mCacheLoad_ToCache.end());
@ -1550,7 +1553,7 @@ bool p3IdService::cache_load_for_token(uint32_t token)
{
// remove identities that are present
RsStackMutex stack(mIdMtx);
mPendingCache.erase(item->meta.mGroupId);
mPendingCache.erase(RsGxsId(item->meta.mGroupId.toStdString()));
}
/* cache the data */
@ -1584,8 +1587,8 @@ void p3IdService::requestIdsFromNet()
{
RsStackMutex stack(mIdMtx);
std::map<RsGxsId, std::list<std::string> >::const_iterator cit;
std::map<std::string, std::list<RsGxsId> > requests;
std::map<RsGxsId, std::list<RsPeerId> >::const_iterator cit;
std::map<RsPeerId, std::list<RsGxsId> > requests;
// transform to appropriate structure (<peer, std::list<RsGxsId> > map) to make request to nes
for(cit = mIdsNotPresent.begin(); cit != mIdsNotPresent.end(); cit++)
@ -1599,18 +1602,26 @@ void p3IdService::requestIdsFromNet()
}
const std::list<std::string>& peers = cit->second;
std::list<std::string>::const_iterator cit2;
const std::list<RsPeerId>& peers = cit->second;
std::list<RsPeerId>::const_iterator cit2;
for(cit2 = peers.begin(); cit2 != peers.end(); cit2++)
requests[*cit2].push_back(cit->first);
}
std::map<std::string, std::list<RsGxsId> >::const_iterator cit2;
std::map<RsPeerId, std::list<RsGxsId> >::const_iterator cit2;
for(cit2 = requests.begin(); cit2 != requests.end(); cit2++)
{
if(mNes)
mNes->requestGrp(cit2->second, cit2->first);
{
std::list<RsGxsId>::const_iterator gxs_id_it = cit2->second.begin();
std::list<RsGxsGroupId> grpIds;
for(; gxs_id_it != cit2->second.end(); gxs_id_it++)
grpIds.push_back(RsGxsGroupId(gxs_id_it->toStdString()));
mNes->requestGrp(grpIds, cit2->first);
}
}
mIdsNotPresent.clear();
@ -1711,7 +1722,7 @@ bool p3IdService::cache_load_ownids(uint32_t token)
if (item->meta.mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN)
{
mOwnIds.push_back(item->meta.mGroupId);
mOwnIds.push_back(RsGxsId(item->meta.mGroupId.toStdString()));
}
delete item ;
}
@ -1780,7 +1791,12 @@ bool p3IdService::cachetest_handlerequest(uint32_t token)
#endif // DEBUG_IDS
std::list<RsGxsId> grpIds;
bool ok = RsGenExchange::getGroupList(token, grpIds);
std::list<RsGxsGroupId> grpIdsC;
bool ok = RsGenExchange::getGroupList(token, grpIdsC);
std::list<RsGxsGroupId>::const_iterator cit = grpIdsC.begin();
for(; cit != grpIdsC.end(); cit++)
grpIds.push_back(RsGxsId(cit->toStdString()));
if(ok)
{
@ -2051,7 +2067,7 @@ RsGenExchange::ServiceCreate_Return p3IdService::service_CreateGroup(RsGxsGrpIte
/* */
PGPFingerprintType ownFinger;
PGPIdType ownId(AuthGPG::getAuthGPG()->getGPGOwnId());
RsPgpId ownId(AuthGPG::getAuthGPG()->getGPGOwnId());
std::cerr << "p3IdService::service_CreateGroup() OwnPgpID: " << ownId.toStdString();
std::cerr << std::endl;
@ -2059,7 +2075,7 @@ RsGenExchange::ServiceCreate_Return p3IdService::service_CreateGroup(RsGxsGrpIte
#ifdef GXSID_GEN_DUMMY_DATA
if (item->group.mMeta.mAuthorId != "")
{
ownId = PGPIdType(item->group.mMeta.mAuthorId);
ownId = RsPgpId(item->group.mMeta.mAuthorId);
}
#endif
@ -2075,7 +2091,7 @@ RsGenExchange::ServiceCreate_Return p3IdService::service_CreateGroup(RsGxsGrpIte
std::cerr << "p3IdService::service_CreateGroup() OwnFingerprint: " << ownFinger.toStdString();
std::cerr << std::endl;
calcPGPHash(item->group.mMeta.mGroupId, ownFinger, hash);
calcPGPHash(RsGxsId(item->group.mMeta.mGroupId.toStdString()), ownFinger, hash);
item->group.mPgpIdHash = hash.toStdString();
#ifdef DEBUG_IDS
@ -2332,7 +2348,7 @@ bool p3IdService::pgphash_process()
SSGxsIdGroup ssdata;
ssdata.load(pg.mMeta.mServiceString); // attempt load - okay if fails.
PGPIdType pgpId;
RsPgpId pgpId;
if (checkId(pg, pgpId))
{
@ -2347,7 +2363,7 @@ bool p3IdService::pgphash_process()
/* update */
ssdata.pgp.idKnown = true;
ssdata.pgp.pgpId = pgpId.toStdString();
ssdata.pgp.pgpId = pgpId;
}
else
@ -2370,7 +2386,7 @@ bool p3IdService::pgphash_process()
std::string serviceString = ssdata.save();
setGroupServiceString(dummyToken, pg.mMeta.mGroupId, serviceString);
cache_update_if_cached(pg.mMeta.mGroupId, serviceString);
cache_update_if_cached(RsGxsId(pg.mMeta.mGroupId.toStdString()), serviceString);
// Schedule Next Processing.
RsTickEvent::schedule_in(GXSID_EVENT_PGPHASH_PROC, PGPHASH_PROC_PERIOD);
@ -2379,7 +2395,7 @@ bool p3IdService::pgphash_process()
bool p3IdService::checkId(const RsGxsIdGroup &grp, PGPIdType &pgpId)
bool p3IdService::checkId(const RsGxsIdGroup &grp, RsPgpId &pgpId)
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::checkId() Starting Match Check for RsGxsId: ";
@ -2411,11 +2427,11 @@ bool p3IdService::checkId(const RsGxsIdGroup &grp, PGPIdType &pgpId)
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
std::map<PGPIdType, PGPFingerprintType>::iterator mit;
std::map<RsPgpId, PGPFingerprintType>::iterator mit;
for(mit = mPgpFingerprintMap.begin(); mit != mPgpFingerprintMap.end(); mit++)
{
GxsIdPgpHash hash;
calcPGPHash(grp.mMeta.mGroupId, mit->second, hash);
calcPGPHash(RsGxsId(grp.mMeta.mGroupId.toStdString()), mit->second, hash);
if (ans == hash)
{
std::cerr << "p3IdService::checkId() HASH MATCH!";
@ -2429,7 +2445,7 @@ bool p3IdService::checkId(const RsGxsIdGroup &grp, PGPIdType &pgpId)
/* check signature too */
if (AuthGPG::getAuthGPG()->VerifySignBin((void *) hash.toByteArray(), hash.SIZE_IN_BYTES,
(unsigned char *) grp.mPgpIdSign.c_str(), grp.mPgpIdSign.length(),
mit->second.toStdString()))
mit->second))
{
std::cerr << "p3IdService::checkId() Signature Okay too!";
std::cerr << std::endl;
@ -2489,17 +2505,17 @@ void p3IdService::getPgpIdList()
std::cerr << std::endl;
#endif // DEBUG_IDS
std::list<std::string> list;
std::list<RsPgpId> list;
AuthGPG::getAuthGPG()->getGPGFilteredList(list);
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mPgpFingerprintMap.clear();
std::list<std::string>::iterator it;
std::list<RsPgpId>::iterator it;
for(it = list.begin(); it != list.end(); it++)
{
PGPIdType pgpId(*it);
RsPgpId pgpId(*it);
PGPFingerprintType fp;
AuthGPG::getAuthGPG()->getKeyFingerprint(pgpId, fp);
@ -2525,7 +2541,7 @@ void calcPGPHash(const RsGxsId &id, const PGPFingerprintType &pgp, GxsIdPgpHash
SHA_CTX *sha_ctx = new SHA_CTX;
SHA1_Init(sha_ctx);
SHA1_Update(sha_ctx, id.c_str(), id.length()); // TO FIX ONE DAY.
SHA1_Update(sha_ctx, id.toStdString().c_str(), id.toStdString().length()); // TO FIX ONE DAY.
SHA1_Update(sha_ctx, pgp.toByteArray(), pgp.SIZE_IN_BYTES);
SHA1_Final(signature, sha_ctx);
hash = GxsIdPgpHash(signature);
@ -2737,7 +2753,7 @@ bool p3IdService::recogn_process()
for(it = tagItems.begin(); it != tagItems.end(); it++)
{
bool isTagPending = false;
bool isTagOk = recogn_checktag(item->meta.mGroupId, item->meta.mGroupName, *it, true, isPending);
bool isTagOk = recogn_checktag(RsGxsId(item->meta.mGroupId.toStdString()), item->meta.mGroupName, *it, true, isPending);
if (isTagOk)
{
tagValidFlags |= i;
@ -2773,7 +2789,7 @@ bool p3IdService::recogn_process()
std::string serviceString = ssdata.save();
setGroupServiceString(dummyToken, item->meta.mGroupId, serviceString);
cache_update_if_cached(item->meta.mGroupId, serviceString);
cache_update_if_cached(RsGxsId(item->meta.mGroupId.toStdString()), serviceString);
delete item;
@ -2962,10 +2978,10 @@ void p3IdService::generateDummy_OwnIds()
/* grab all the gpg ids... and make some ids */
std::string ownId = rsPeers->getGPGOwnId();
RsPgpId ownId = rsPeers->getGPGOwnId();
// generate some ownIds.
int genCount = 0;
//int genCount = 0;
int i;
int nIds = 2 + (RSRandom::random_u32() % 2);
@ -2976,15 +2992,15 @@ void p3IdService::generateDummy_OwnIds()
id.mMeta.mGroupFlags = RSGXSID_GROUPFLAG_REALID;
// HACK FOR DUMMY GENERATION.
id.mMeta.mAuthorId = ownId;
if (rsPeers->getPeerDetails(ownId, details))
{
std::ostringstream out;
out << details.name << "_" << i + 1;
id.mMeta.mGroupName = out.str();
}
// // HACK FOR DUMMY GENERATION.
// id.mMeta.mAuthorId = ownId.toStdString();
// if (rsPeers->getPeerDetails(ownId, details))
// {
// std::ostringstream out;
// out << details.name << "_" << i + 1;
//
// id.mMeta.mGroupName = out.str();
// }
uint32_t dummyToken = 0;
createGroup(dummyToken, id);
@ -2997,8 +3013,8 @@ void p3IdService::generateDummy_FriendPGP()
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
// Now Generate for friends.
std::list<std::string> gpgids;
std::list<std::string>::iterator it;
std::list<RsPgpId> gpgids;
std::list<RsPgpId>::const_iterator it;
rsPeers->getGPGAllList(gpgids);
RsGxsIdGroup id;
@ -3010,10 +3026,10 @@ void p3IdService::generateDummy_FriendPGP()
for(int j = 0; j < idx; j++, it++) ;
// HACK FOR DUMMY GENERATION.
id.mMeta.mAuthorId = *it;
id.mMeta.mAuthorId = it->toStdString();
RsPeerDetails details;
if (rsPeers->getPeerDetails(*it, details))
if (/*rsPeers->getPeerDetails(*it, details)*/false)
{
std::ostringstream out;
out << details.name << "_" << RSRandom::random_u32() % 1000;
@ -3210,6 +3226,616 @@ Processing Algorithm:
*/
bool p3IdService::reputation_start()
{
if (!CacheArbitration(BG_REPUTATION))
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::reputation_start() Other Events running... Rescheduling";
std::cerr << std::endl;
#endif // DEBUG_IDS
/* reschedule in a bit */
RsTickEvent::schedule_in(GXSID_EVENT_REPUTATION, REPUTATION_RETRY_PERIOD);
return false;
}
CacheArbitrationDone(BG_REPUTATION);
// SCHEDULE NEXT ONE.
RsTickEvent::schedule_in(GXSID_EVENT_REPUTATION, REPUTATION_PERIOD);
return true;
}
#define ID_BACKGROUND_PERIOD 60
int p3IdService::background_tick()
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_tick()";
std::cerr << std::endl;
#endif // DEBUG_IDS
// Run Background Stuff.
background_checkTokenRequest();
/* every minute - run a background check */
time_t now = time(NULL);
bool doCheck = false;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
if (now - mLastBgCheck > ID_BACKGROUND_PERIOD)
{
doCheck = true;
mLastBgCheck = now;
}
}
if (doCheck)
{
//addExtraDummyData();
background_requestGroups();
}
// Add in new votes + comments.
return 0;
}
/***** Background Processing ****
*
* Process Each Message - as it arrives.
*
* Update
*
*/
#define ID_BG_IDLE 0
#define ID_BG_REQUEST_GROUPS 1
#define ID_BG_REQUEST_UNPROCESSED 2
#define ID_BG_REQUEST_FULLCALC 3
bool p3IdService::background_checkTokenRequest()
{
uint32_t token = 0;
uint32_t phase = 0;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
if (!mBgProcessing)
{
return false;
}
token = mBgToken;
phase = mBgPhase;
}
uint32_t status;
//uint32_t reqtype;
//uint32_t anstype;
time_t ts;
status = RsGenExchange::getTokenService()->requestStatus(token);
//checkRequestStatus(token, status, reqtype, anstype, ts);
if (status == RsTokenService::GXS_REQUEST_V2_STATUS_COMPLETE)
{
switch(phase)
{
case ID_BG_REQUEST_GROUPS:
background_requestNewMessages();
break;
case ID_BG_REQUEST_UNPROCESSED:
background_processNewMessages();
break;
case ID_BG_REQUEST_FULLCALC:
background_processFullCalc();
break;
default:
break;
}
}
return true;
}
bool p3IdService::background_requestGroups()
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_requestGroups()";
std::cerr << std::endl;
#endif // DEBUG_IDS
// grab all the subscribed groups.
uint32_t token = 0;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
if (mBgProcessing)
{
std::cerr << "p3IdService::background_requestGroups() ERROR Already processing, Skip this cycle";
std::cerr << std::endl;
return false;
}
mBgProcessing = true;
mBgPhase = ID_BG_REQUEST_GROUPS;
mBgToken = 0;
}
uint32_t ansType = RS_TOKREQ_ANSTYPE_SUMMARY;
RsTokReqOptions opts;
std::list<RsGxsGroupId> groupIds;
/**
TODO
opts.mStatusFilter = RSGXS_GROUP_STATUS_NEWMSG;
opts.mStatusMask = RSGXS_GROUP_STATUS_NEWMSG;
**/
RsGenExchange::getTokenService()->requestGroupInfo(token, ansType, opts, groupIds);
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgToken = token;
}
return true;
}
bool p3IdService::background_requestNewMessages()
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_requestNewMessages()";
std::cerr << std::endl;
#endif // DEBUG_IDS
std::list<RsGroupMetaData> modGroupList;
std::list<RsGroupMetaData>::iterator it;
std::list<RsGxsGroupId> groupIds;
uint32_t token = 0;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
token = mBgToken;
}
if (!getGroupSummary(token, modGroupList))
{
std::cerr << "p3IdService::background_requestNewMessages() ERROR No Group List";
std::cerr << std::endl;
background_cleanup();
return false;
}
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgPhase = ID_BG_REQUEST_UNPROCESSED;
mBgToken = 0;
/* now we process the modGroupList -> a map so we can use it easily later, and create id list too */
for(it = modGroupList.begin(); it != modGroupList.end(); it++)
{
/*** TODO
uint32_t dummyToken = 0;
setGroupStatusFlags(dummyToken, it->mGroupId, 0, RSGXS_GROUP_STATUS_NEWMSG);
***/
mBgGroupMap[it->mGroupId] = *it;
groupIds.push_back(it->mGroupId);
}
}
uint32_t ansType = RS_TOKREQ_ANSTYPE_SUMMARY;
RsTokReqOptions opts;
token = 0;
/* TODO
opts.mStatusFilter = RSGXS_MSG_STATUS_UNPROCESSED;
opts.mStatusMask = RSGXS_MSG_STATUS_UNPROCESSED;
*/
RsGenExchange::getTokenService()->requestMsgInfo(token, ansType, opts, groupIds);
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgToken = token;
}
return true;
}
bool p3IdService::background_processNewMessages()
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processNewMessages()";
std::cerr << std::endl;
#endif // DEBUG_IDS
GxsMsgMetaMap newMsgMap;
GxsMsgMetaMap::iterator it;
uint32_t token = 0;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
token = mBgToken;
}
if (!getMsgSummary(token, newMsgMap))
{
std::cerr << "p3IdService::background_processNewMessages() ERROR No New Msgs";
std::cerr << std::endl;
background_cleanup();
return false;
}
/* iterate through the msgs.. update the mBgGroupMap with new data,
* and flag these items as modified - so we rewrite them to the db later.
*
* If a message is not an original -> store groupId for requiring full analysis later.
*/
std::map<RsGxsGroupId, RsGroupMetaData>::iterator mit;
for (it = newMsgMap.begin(); it != newMsgMap.end(); it++)
{
std::vector<RsMsgMetaData>::iterator vit;
for(vit = it->second.begin(); vit != it->second.end(); vit++)
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processNewMessages() new MsgId: " << vit->mMsgId;
std::cerr << std::endl;
#endif // DEBUG_IDS
/* flag each new vote as processed */
/**
TODO
uint32_t dummyToken = 0;
setMsgStatusFlags(dummyToken, it->mMsgId, 0, RSGXS_MSG_STATUS_UNPROCESSED);
**/
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mit = mBgGroupMap.find(it->first);
if (mit == mBgGroupMap.end())
{
std::cerr << "p3IdService::background_processNewMessages() ERROR missing GroupId: ";
std::cerr << vit->mGroupId;
std::cerr << std::endl;
/* error */
continue;
}
if (mit->second.mGroupStatus & ID_LOCAL_STATUS_FULL_CALC_FLAG)
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processNewMessages() Group Already marked FULL_CALC";
std::cerr << std::endl;
#endif // DEBUG_IDS
/* already marked */
continue;
}
if (vit->mMsgId != vit->mOrigMsgId)
{
/*
* not original -> hard, redo calc (alt: could substract previous score)
*/
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processNewMessages() Update, mark for FULL_CALC";
std::cerr << std::endl;
#endif // DEBUG_IDS
mit->second.mGroupStatus |= ID_LOCAL_STATUS_FULL_CALC_FLAG;
}
else
{
/*
* Try incremental calculation.
* - extract parameters from group.
* - increment, & save back.
* - flag group as modified.
*/
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processNewMessages() NewOpt, Try Inc Calc";
std::cerr << std::endl;
#endif // DEBUG_IDS
mit->second.mGroupStatus |= ID_LOCAL_STATUS_INC_CALC_FLAG;
SSGxsIdGroup ssdata;
if (!ssdata.load(mit->second.mServiceString))
{
/* error */
std::cerr << "p3IdService::background_processNewMessages() ERROR Extracting";
std::cerr << std::endl;
}
#ifdef DEBUG_IDS
/* do calcs */
std::cerr << "p3IdService::background_processNewMessages() Extracted: ";
std::cerr << std::endl;
/* store it back in */
std::cerr << "p3IdService::background_processNewMessages() Stored: ";
std::cerr << std::endl;
#endif // DEBUG_IDS
std::string serviceString = ssdata.save();
if (0)
{
/* error */
std::cerr << "p3IdService::background_processNewMessages() ERROR Storing";
std::cerr << std::endl;
}
}
}
}
/* now iterate through groups again
* -> update status as we go
* -> record one requiring a full analyssis
*/
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processNewMessages() Checking Groups for Calc Type";
std::cerr << std::endl;
#endif // DEBUG_IDS
for(mit = mBgGroupMap.begin(); mit != mBgGroupMap.end(); mit++)
{
if (mit->second.mGroupStatus & ID_LOCAL_STATUS_FULL_CALC_FLAG)
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processNewMessages() FullCalc for: ";
std::cerr << mit->second.mGroupId;
std::cerr << std::endl;
#endif // DEBUG_IDS
mBgFullCalcGroups.push_back(mit->second.mGroupId);
}
else if (mit->second.mGroupStatus & ID_LOCAL_STATUS_INC_CALC_FLAG)
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processNewMessages() IncCalc done for: ";
std::cerr << mit->second.mGroupId;
std::cerr << std::endl;
#endif // DEBUG_IDS
/* set Cache */
uint32_t dummyToken = 0;
setGroupServiceString(dummyToken, mit->second.mGroupId, mit->second.mServiceString);
}
else
{
/* why is it here? error. */
std::cerr << "p3IdService::background_processNewMessages() ERROR for: ";
std::cerr << mit->second.mGroupId;
std::cerr << std::endl;
}
}
}
return background_FullCalcRequest();
}
bool p3IdService::background_FullCalcRequest()
{
/*
* grab an GroupId from List.
* - If empty, we are finished.
* - request all latest mesgs
*/
std::list<RsGxsGroupId> groupIds;
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgPhase = ID_BG_REQUEST_FULLCALC;
mBgToken = 0;
mBgGroupMap.clear();
if (mBgFullCalcGroups.empty())
{
/* finished! */
background_cleanup();
return true;
}
groupIds.push_back(mBgFullCalcGroups.front());
mBgFullCalcGroups.pop_front();
}
/* request the summary info from the parents */
uint32_t ansType = RS_TOKREQ_ANSTYPE_DATA;
uint32_t token = 0;
RsTokReqOptions opts;
opts.mOptions = RS_TOKREQOPT_MSG_LATEST;
RsGenExchange::getTokenService()->requestMsgInfo(token, ansType, opts, groupIds);
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgToken = token;
}
return true;
}
bool p3IdService::background_processFullCalc()
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processFullCalc()";
std::cerr << std::endl;
#endif // DEBUG_IDS
std::list<RsMsgMetaData> msgList;
std::list<RsMsgMetaData>::iterator it;
bool validmsgs = false;
/* calc variables */
uint32_t opinion_count = 0;
uint32_t opinion_nullcount = 0;
double opinion_sum = 0;
double opinion_sumsq = 0;
uint32_t rep_count = 0;
uint32_t rep_nullcount = 0;
double rep_sum = 0;
double rep_sumsq = 0;
std::vector<RsGxsIdOpinion> opinions;
std::vector<RsGxsIdOpinion>::iterator vit;
if (!getMsgData(mBgToken, opinions))
{
std::cerr << "p3IdService::background_processFullCalc() ERROR Failed to get Opinions";
std::cerr << std::endl;
return false;
}
RsGxsGroupId groupId;
for(vit = opinions.begin(); vit != opinions.end(); vit++)
{
RsGxsIdOpinion &opinion = *vit;
/* These should all be same group - should check for sanity! */
if (groupId.isNull())
{
groupId = opinion.mMeta.mGroupId;
}
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_processFullCalc() Msg:";
std::cerr << opinion;
std::cerr << std::endl;
#endif // DEBUG_IDS
validmsgs = true;
/* for each opinion.... extract score, and reputation */
if (opinion.mOpinion != 0)
{
opinion_count++;
opinion_sum += opinion.mOpinion;
opinion_sum += (opinion.mOpinion * opinion.mOpinion);
}
else
{
opinion_nullcount++;
}
/* for each opinion.... extract score, and reputation */
if (opinion.mReputation != 0)
{
rep_nullcount++;
rep_sum += opinion.mReputation;
rep_sum += (opinion.mReputation * opinion.mReputation);
}
else
{
rep_nullcount++;
}
}
double opinion_avg = 0;
double opinion_var = 0;
double opinion_frac = 0;
double rep_avg = 0;
double rep_var = 0;
double rep_frac = 0;
if (opinion_count)
{
opinion_avg = opinion_sum / opinion_count;
opinion_var = (opinion_sumsq - opinion_count * opinion_avg * opinion_avg) / opinion_count;
opinion_frac = opinion_count / ((float) (opinion_count + opinion_nullcount));
}
if (rep_count)
{
rep_avg = rep_sum / rep_count;
rep_var = (rep_sumsq - rep_count * rep_avg * rep_avg) / rep_count;
rep_frac = rep_count / ((float) (rep_count + rep_nullcount));
}
if (validmsgs)
{
SSGxsIdGroup ssdata;
std::string serviceString = ssdata.save();
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_updateVoteCounts() Encoded String: " << serviceString;
std::cerr << std::endl;
#endif // DEBUG_IDS
/* store new result */
uint32_t dummyToken = 0;
setGroupServiceString(dummyToken, groupId, serviceString);
}
{
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
mBgPhase = ID_BG_IDLE;
mBgToken = 0;
}
return background_FullCalcRequest();
}
bool p3IdService::background_cleanup()
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::background_cleanup()";
std::cerr << std::endl;
#endif // DEBUG_IDS
RsStackMutex stack(mIdMtx); /********** STACK LOCKED MTX ******/
// Cleanup.
mBgProcessing = false;
mBgPhase = ID_BG_IDLE;
mBgToken = 0;
mBgGroupMap.clear();
mBgFullCalcGroups.clear();
return true;
}
std::ostream &operator<<(std::ostream &out, const RsGxsIdGroup &grp)
{
out << "RsGxsIdGroup: Meta: " << grp.mMeta;