added pruning of opinions to limit data in memory. Fixed up types and sending/receiving friends opinions

This commit is contained in:
csoler 2015-10-07 23:44:24 -04:00
parent 78e6f67c69
commit 80ed6d1815
2 changed files with 152 additions and 60 deletions

View File

@ -42,8 +42,11 @@
/* DEFINE INTERFACE POINTER! */ /* DEFINE INTERFACE POINTER! */
//RsGxsReputation *rsGxsReputation = NULL; //RsGxsReputation *rsGxsReputation = NULL;
const int kMaximumPeerAge = 180; // half a year. static const int kMaximumPeerAge = 180; // half a year.
const int kMaximumSetSize = 100; static const int kMaximumSetSize = 100;
static const int ACTIVE_FRIENDS_UPDATE_PERIOD = 600 ; // 10 minutes
static const int ACTIVE_FRIENDS_ONLINE_DELAY = 86400*7 ; // 1 week.
/************ IMPLEMENTATION NOTES ********************************* /************ IMPLEMENTATION NOTES *********************************
* *
@ -61,8 +64,12 @@ const int kMaximumSetSize = 100;
* last update -----------> * last update ----------->
* <----------- modified opinions. * <----------- modified opinions.
* *
* This service will have to store a huge amount of data. * If not clever enough, this service will have to store a huge amount of data.
* need to workout how to reduce it. * To make things tractable we do this:
* - do not store reputations when no data is present, or when all friends are neutral
* - only send a neutral opinion when they are a true change over someone's opinion
* - only send a neutral opinion when it is a true change over someone's opinion
* - auto-clean reputations for default values
* *
* std::map<RsGxsId, Reputation> mReputations. * std::map<RsGxsId, Reputation> mReputations.
* std::multimap<time_t, RsGxsId> mUpdated. * std::multimap<time_t, RsGxsId> mUpdated.
@ -122,6 +129,7 @@ const int kMaximumSetSize = 100;
* [ ] Opinions are transmitted to friends when updated * [ ] Opinions are transmitted to friends when updated
* *
* To do: * To do:
* [ ] Add debug info
* [ ] Test the whole thing * [ ] Test the whole thing
* [ ] Implement a system to allow not storing info when we don't have it * [ ] Implement a system to allow not storing info when we don't have it
*/ */
@ -139,6 +147,7 @@ p3GxsReputation::p3GxsReputation(p3LinkMgr *lm)
mRequestTime = 0; mRequestTime = 0;
mStoreTime = 0; mStoreTime = 0;
mReputationsUpdated = false; mReputationsUpdated = false;
mLastActiveFriendsUpdate = 0 ;
} }
const std::string GXS_REPUTATION_APP_NAME = "gxsreputation"; const std::string GXS_REPUTATION_APP_NAME = "gxsreputation";
@ -163,6 +172,14 @@ int p3GxsReputation::tick()
{ {
processIncoming(); processIncoming();
sendPackets(); sendPackets();
time_t now = time(NULL);
if(mLastActiveFriendsUpdate + ACTIVE_FRIENDS_UPDATE_PERIOD < now)
{
updateActiveFriends() ;
mLastActiveFriendsUpdate = now ;
}
return 0; return 0;
} }
@ -172,7 +189,34 @@ int p3GxsReputation::status()
return 1; return 1;
} }
void p3GxsReputation::updateActiveFriends()
{
RsStackMutex stack(mReputationMtx); /****** LOCKED MUTEX *******/
// keep track of who is recently connected. That will give a value to average friend
// for this, we count all friends that have been online in the last week.
time_t now = time(NULL) ;
std::list<RsPeerId> idList ;
mLinkMgr->getFriendList(idList) ;
mAverageActiveFriends = 0 ;
for(std::list<RsPeerId>::const_iterator it(idList.begin());it!=idList.end();++it)
{
peerConnectState state ;
if(mLinkMgr->getFriendNetStatus(*it, state) && now < state.lastavailable + ACTIVE_FRIENDS_ONLINE_DELAY)
++mAverageActiveFriends ;
}
}
static RsReputations::Opinion safe_convert_uint32t_to_opinion(uint32_t op)
{
return RsReputations::Opinion(std::min((uint32_t)op,UPPER_LIMIT)) ;
}
/***** Implementation ******/ /***** Implementation ******/
bool p3GxsReputation::processIncoming() bool p3GxsReputation::processIncoming()
@ -200,31 +244,22 @@ bool p3GxsReputation::processIncoming()
case RS_PKT_SUBTYPE_GXS_REPUTATION_REQUEST_ITEM: case RS_PKT_SUBTYPE_GXS_REPUTATION_REQUEST_ITEM:
{ {
RsGxsReputationRequestItem *requestItem = RsGxsReputationRequestItem *requestItem = dynamic_cast<RsGxsReputationRequestItem *>(item);
dynamic_cast<RsGxsReputationRequestItem *>(item);
if (requestItem) if (requestItem)
{
SendReputations(requestItem); SendReputations(requestItem);
}
else else
{
itemOk = false; itemOk = false;
}
} }
break; break;
case RS_PKT_SUBTYPE_GXS_REPUTATION_UPDATE_ITEM: case RS_PKT_SUBTYPE_GXS_REPUTATION_UPDATE_ITEM:
{ {
RsGxsReputationUpdateItem *updateItem = RsGxsReputationUpdateItem *updateItem = dynamic_cast<RsGxsReputationUpdateItem *>(item);
dynamic_cast<RsGxsReputationUpdateItem *>(item);
if (updateItem) if (updateItem)
{
RecvReputations(updateItem); RecvReputations(updateItem);
}
else else
{
itemOk = false; itemOk = false;
}
} }
break; break;
} }
@ -324,42 +359,93 @@ bool p3GxsReputation::SendReputations(RsGxsReputationRequestItem *request)
return true; return true;
} }
void p3GxsReputation::locked_updateOpinion(const RsPeerId& from,const RsGxsId& about,RsReputations::Opinion op)
{
/* find matching Reputation */
std::map<RsGxsId, Reputation>::iterator rit = mReputations.find(about);
RsReputations::Opinion new_opinion = safe_convert_uint32t_to_opinion(op);
RsReputations::Opinion old_opinion = RsReputations::OPINION_NEUTRAL ; // default if not set
bool updated = false ;
// now 4 cases;
// Opinion already stored
// New opinion is same: nothing to do
// New opinion is different: if neutral, remove entry
// Nothing stored
// New opinion is neutral: nothing to do
// New opinion is != 1: create entry and store
if (rit == mReputations.end())
{
if(new_opinion != RsReputations::OPINION_NEUTRAL)
{
mReputations[about] = Reputation(about);
rit = mReputations.find(about);
}
else
return ; // nothing to do
}
Reputation& reputation = rit->second;
std::map<RsPeerId,RsReputations::Opinion>::iterator it2 = reputation.mOpinions.find(from) ;
if(it2 == reputation.mOpinions.end())
{
if(new_opinion != RsReputations::OPINION_NEUTRAL)
{
reputation.mOpinions[from] = new_opinion; // filters potentially tweaked reputation score sent by friend
updated = true ;
}
}
else
{
old_opinion = it2->second ;
if(new_opinion == RsReputations::OPINION_NEUTRAL)
{
reputation.mOpinions.erase(it2) ; // don't store when the opinion is neutral
updated = true ;
}
else if(new_opinion != old_opinion)
{
it2->second = new_opinion ;
updated = true ;
}
}
if(reputation.mOpinions.empty() && reputation.mOwnOpinion == RsReputations::OPINION_NEUTRAL)
mReputations.erase(rit) ;
else if(updated)
{
reputation.CalculateReputation(mAverageActiveFriends) ;
IndicateConfigChanged() ;
}
}
bool p3GxsReputation::RecvReputations(RsGxsReputationUpdateItem *item) bool p3GxsReputation::RecvReputations(RsGxsReputationUpdateItem *item)
{ {
std::cerr << "p3GxsReputation::RecvReputations()"; std::cerr << "p3GxsReputation::RecvReputations()";
std::cerr << std::endl; std::cerr << std::endl;
RsPeerId peerid = item->PeerId(); RsPeerId peerid = item->PeerId();
std::map<RsGxsId, uint32_t>::iterator it; for( std::map<RsGxsId, uint32_t>::iterator it = item->mOpinions.begin(); it != item->mOpinions.end(); ++it)
for(it = item->mOpinions.begin(); it != item->mOpinions.end(); ++it) {
{ RsStackMutex stack(mReputationMtx); /****** LOCKED MUTEX *******/
RsStackMutex stack(mReputationMtx); /****** LOCKED MUTEX *******/
/* find matching Reputation */ locked_updateOpinion(peerid,it->first,safe_convert_uint32t_to_opinion(it->second));
std::map<RsGxsId, Reputation>::iterator rit; }
RsGxsId gxsId(it->first);
rit = mReputations.find(gxsId);
if (rit == mReputations.end())
{
mReputations[gxsId] = Reputation(gxsId);
rit = mReputations.find(gxsId);
}
Reputation& reputation = rit->second; updateLatestUpdate(peerid, item->mLatestUpdate);
reputation.mOpinions[peerid] = std::min(it->second,UPPER_LIMIT); // filters potentially tweaked reputation score sent by friend
int previous = reputation.mReputation; // now update all reputations of IDs for which some opinions have changed.
if (previous != reputation.CalculateReputation())
{ return true;
// updated from the network.
mUpdatedReputations.insert(gxsId);
}
}
updateLatestUpdate(peerid, item->mLatestUpdate);
return true;
} }
@ -462,7 +548,7 @@ bool p3GxsReputation::setOwnOpinion(const RsGxsId& gxsid, const RsReputations::O
time_t now = time(NULL); time_t now = time(NULL);
reputation.mOwnOpinion = opinion; reputation.mOwnOpinion = opinion;
reputation.mOwnOpinionTs = now; reputation.mOwnOpinionTs = now;
reputation.CalculateReputation(); reputation.CalculateReputation(mAverageActiveFriends);
mUpdated.insert(std::make_pair(now, gxsid)); mUpdated.insert(std::make_pair(now, gxsid));
mUpdatedReputations.insert(gxsid); mUpdatedReputations.insert(gxsid);
@ -517,11 +603,11 @@ bool p3GxsReputation::saveList(bool& cleanup, std::list<RsItem*> &savelist)
item->mOwnOpinionTS = rit->second.mOwnOpinionTs; item->mOwnOpinionTS = rit->second.mOwnOpinionTs;
item->mReputation = rit->second.mReputation; item->mReputation = rit->second.mReputation;
std::map<RsPeerId, uint32_t>::iterator oit; std::map<RsPeerId, RsReputations::Opinion>::iterator oit;
for(oit = rit->second.mOpinions.begin(); oit != rit->second.mOpinions.end(); ++oit) for(oit = rit->second.mOpinions.begin(); oit != rit->second.mOpinions.end(); ++oit)
{ {
// should be already limited. // should be already limited.
item->mOpinions[oit->first] = oit->second; item->mOpinions[oit->first] = (uint32_t)oit->second;
} }
savelist.push_back(item); savelist.push_back(item);
@ -590,7 +676,7 @@ bool p3GxsReputation::loadReputationSet(RsGxsReputationSetItem *item, const std:
// expensive ... but necessary. // expensive ... but necessary.
RsPeerId peerId(oit->first); RsPeerId peerId(oit->first);
if (peerSet.end() != peerSet.find(peerId)) if (peerSet.end() != peerSet.find(peerId))
reputation.mOpinions[peerId] = oit->second; reputation.mOpinions[peerId] = safe_convert_uint32t_to_opinion(oit->second);
} }
reputation.mOwnOpinion = item->mOwnOpinion; reputation.mOwnOpinion = item->mOwnOpinion;
@ -598,7 +684,7 @@ bool p3GxsReputation::loadReputationSet(RsGxsReputationSetItem *item, const std:
// if dropping entries has changed the score -> must update. // if dropping entries has changed the score -> must update.
int previous = item->mReputation; int previous = item->mReputation;
if (previous != reputation.CalculateReputation()) if (previous != reputation.CalculateReputation(mAverageActiveFriends))
{ {
mUpdatedReputations.insert(gxsId); mUpdatedReputations.insert(gxsId);
} }
@ -686,8 +772,6 @@ void p3GxsReputation::sendReputationRequests()
} }
} }
int p3GxsReputation::sendReputationRequest(RsPeerId peerid) int p3GxsReputation::sendReputationRequest(RsPeerId peerid)
{ {
std::cerr << "p3GxsReputation::sendReputationRequest(" << peerid << ")"; std::cerr << "p3GxsReputation::sendReputationRequest(" << peerid << ")";
@ -720,21 +804,23 @@ int p3GxsReputation::sendReputationRequest(RsPeerId peerid)
return 1; return 1;
} }
float Reputation::CalculateReputation() float Reputation::CalculateReputation(uint32_t average_active_friends)
{ {
// the calculation of reputation makes the whole thing // the calculation of reputation makes the whole thing
if(mOwnOpinion == RsReputations::OPINION_NEUTRAL) if(mOwnOpinion == RsReputations::OPINION_NEUTRAL)
{ {
uint32_t friend_total = 0; int friend_total = 0;
for(std::map<RsPeerId,uint32_t>::const_iterator it(mOpinions.begin());it!=mOpinions.end();++it) // accounts for all friends. Neutral opinions count for 1-1=0
friend_total += it->second ;
if(friend_total == 0) // includes the case of no friends! for(std::map<RsPeerId,RsReputations::Opinion>::const_iterator it(mOpinions.begin());it!=mOpinions.end();++it)
return 0.0f ; friend_total += it->second - 1;
if(mOpinions.empty()) // includes the case of no friends!
return 1.0f;
else else
return friend_total / float(mOpinions.size()) ; return 1.0f + friend_total / float(std::max(average_active_friends,(uint32_t)mOpinions.size())) ;
} }
else else
return float(mOwnOpinion) ; return float(mOwnOpinion) ;

View File

@ -64,9 +64,9 @@ public:
Reputation(const RsGxsId& about) Reputation(const RsGxsId& about)
:mOwnOpinion(RsReputations::OPINION_NEUTRAL), mOwnOpinionTs(0), mReputation(RsReputations::OPINION_NEUTRAL) { } :mOwnOpinion(RsReputations::OPINION_NEUTRAL), mOwnOpinionTs(0), mReputation(RsReputations::OPINION_NEUTRAL) { }
float CalculateReputation(); float CalculateReputation(uint32_t average_active_friends);
std::map<RsPeerId, uint32_t> mOpinions; std::map<RsPeerId, RsReputations::Opinion> mOpinions;
int32_t mOwnOpinion; int32_t mOwnOpinion;
time_t mOwnOpinionTs; time_t mOwnOpinionTs;
@ -112,6 +112,10 @@ class p3GxsReputation: public p3Service, public p3Config, public RsReputations /
bool SendReputations(RsGxsReputationRequestItem *request); bool SendReputations(RsGxsReputationRequestItem *request);
bool RecvReputations(RsGxsReputationUpdateItem *item); bool RecvReputations(RsGxsReputationUpdateItem *item);
bool updateLatestUpdate(RsPeerId peerid, time_t ts); bool updateLatestUpdate(RsPeerId peerid, time_t ts);
void updateActiveFriends() ;
// internal update of data. Takes care of cleaning empty boxes.
void locked_updateOpinion(const RsPeerId &from, const RsGxsId &about, RsReputations::Opinion op);
bool loadReputationSet(RsGxsReputationSetItem *item, bool loadReputationSet(RsGxsReputationSetItem *item,
const std::set<RsPeerId> &peerSet); const std::set<RsPeerId> &peerSet);
@ -123,9 +127,11 @@ class p3GxsReputation: public p3Service, public p3Config, public RsReputations /
private: private:
RsMutex mReputationMtx; RsMutex mReputationMtx;
time_t mLastActiveFriendsUpdate;
time_t mRequestTime; time_t mRequestTime;
time_t mStoreTime; time_t mStoreTime;
bool mReputationsUpdated; bool mReputationsUpdated;
uint32_t mAverageActiveFriends ;
p3LinkMgr *mLinkMgr; p3LinkMgr *mLinkMgr;
@ -135,7 +141,7 @@ class p3GxsReputation: public p3Service, public p3Config, public RsReputations /
std::multimap<time_t, RsGxsId> mUpdated; std::multimap<time_t, RsGxsId> mUpdated;
// set of Reputations to send to p3IdService. // set of Reputations to send to p3IdService.
std::set<RsGxsId> mUpdatedReputations; std::set<RsGxsId> mUpdatedReputations;
}; };
#endif //SERVICE_RSGXSREPUTATION_HEADER #endif //SERVICE_RSGXSREPUTATION_HEADER