merged with latest trunk

This commit is contained in:
csoler 2016-01-01 15:15:19 -05:00
commit 6ecd2991e7
2342 changed files with 29868 additions and 19573 deletions

View file

@ -1,36 +0,0 @@
RS_TOP_DIR = ..
##### Define any flags that are needed for this section #######
###############################################################
###############################################################
include $(RS_TOP_DIR)/scripts/config.mk
###############################################################
RSOBJ = p3service.o p3chatservice.o p3msgservice.o \
p3gamelauncher.o p3ranking.o p3disc.o \
p3photoservice.o \
p3distrib.o \
p3status.o \
p3Qblog.o \
p3forums.o \
p3channels.o \
p3portservice.o
# dummy forums interface.
# p3forums-dummy.o \
TESTOBJ = forum_test.o
TESTS = forum_test
all: librs tests
forum_test : forum_test.o
$(CC) $(CFLAGS) -o forum_test forum_test.o $(OBJ) $(LIBS)
###############################################################
include $(RS_TOP_DIR)/scripts/rules.mk
###############################################################

View file

@ -439,8 +439,6 @@ void p3BanList::getBannedIps(std::list<BanListPeer> &lst)
lst.clear() ;
for(std::map<sockaddr_storage,BanListPeer>::const_iterator it(mBanSet.begin());it!=mBanSet.end();++it)
{
bool already_banned = false ;
if(!acceptedBanSet_locked(it->second))
continue ;
@ -498,7 +496,7 @@ bool p3BanList::addIpRange(const sockaddr_storage &addr, int masked_bytes,uint32
{
RS_STACK_MUTEX(mBanMtx) ;
if(getBitRange(addr) > masked_bytes)
if(getBitRange(addr) > uint32_t(masked_bytes))
{
std::cerr << "(EE) Input to p3BanList::addIpRange is inconsistent: ip=" << sockaddr_storage_iptostring(addr) << "/" << 32-8*masked_bytes << std::endl;
return false ;
@ -887,6 +885,7 @@ bool p3BanList::loadList(std::list<RsItem*>& load)
delete *it ;
}
load.clear() ;
return true ;
}
@ -955,7 +954,7 @@ bool p3BanList::addBanEntry(const RsPeerId &peerId, const struct sockaddr_storag
{
/* see if it needs an update */
if ((mit->second.reason != reason) ||
(mit->second.level != level) ||
(mit->second.level != uint32_t(level)) ||
(mit->second.mTs < time_stamp))
{
/* update */
@ -1044,7 +1043,7 @@ int p3BanList::condenseBanSources_locked()
continue;
}
int lvl = lit->second.level;
uint32_t lvl = lit->second.level;
if (it->first != ownId)
{
/* as from someone else, increment level */

View file

@ -122,7 +122,7 @@ uint32_t p3GxsChannels::channelsAuthenPolicy()
/** Overloaded to cache new groups **/
RsGenExchange::ServiceCreate_Return p3GxsChannels::service_CreateGroup(RsGxsGrpItem* grpItem, RsTlvSecurityKeySet& keySet)
RsGenExchange::ServiceCreate_Return p3GxsChannels::service_CreateGroup(RsGxsGrpItem* grpItem, RsTlvSecurityKeySet& /* keySet */)
{
updateSubscribedGroup(grpItem->meta);
return SERVICE_CREATE_SUCCESS;
@ -337,7 +337,6 @@ bool p3GxsChannels::getPostData(const uint32_t &token, std::vector<RsGxsChannelP
for(; mit != msgData.end(); ++mit)
{
RsGxsGroupId grpId = mit->first;
std::vector<RsGxsMsgItem*>& msgItems = mit->second;
std::vector<RsGxsMsgItem*>::iterator vit = msgItems.begin();

View file

@ -945,7 +945,7 @@ bool p3GxsCircles::cache_load_for_token(uint32_t token)
RsIdentityDetails details;
if (mIdentities->getIdDetails(*pit, details))
{
if (details.mPgpLinked && details.mPgpKnown)
if ((details.mFlags & RS_IDENTITY_FLAGS_PGP_LINKED) &&(details.mFlags & RS_IDENTITY_FLAGS_PGP_KNOWN))
{
#ifdef DEBUG_CIRCLES
std::cerr << "p3GxsCircles::cache_load_for_token() Is Known -> AllowedPeer: " << *pit;
@ -1128,7 +1128,7 @@ bool p3GxsCircles::cache_reloadids(const RsGxsCircleId &circleId)
RsIdentityDetails details;
if (mIdentities->getIdDetails(*pit, details))
{
if (details.mPgpLinked && details.mPgpKnown)
if ((details.mFlags & RS_IDENTITY_FLAGS_PGP_LINKED) &&(details.mFlags & RS_IDENTITY_FLAGS_PGP_KNOWN))
{
cache.addAllowedPeer(details.mPgpId, *pit);

View file

@ -199,8 +199,7 @@ bool p3GxsForums::getGroupData(const uint32_t &token, std::vector<RsGxsForumGrou
if (item)
{
RsGxsForumGroup grp = item->mGroup;
item->mGroup.mMeta = item->meta;
grp.mMeta = item->mGroup.mMeta;
grp.mMeta = item->meta;
delete item;
groups.push_back(grp);
}
@ -230,7 +229,6 @@ bool p3GxsForums::getMsgData(const uint32_t &token, std::vector<RsGxsForumMsg> &
for(; mit != msgData.end(); ++mit)
{
RsGxsGroupId grpId = mit->first;
std::vector<RsGxsMsgItem*>& msgItems = mit->second;
std::vector<RsGxsMsgItem*>::iterator vit = msgItems.begin();

View file

@ -23,6 +23,7 @@
*
*/
#include <math.h>
#include "pqi/p3linkmgr.h"
#include "retroshare/rspeers.h"
@ -38,13 +39,6 @@
* #define DEBUG_REPUTATION 1
****/
/* DEFINE INTERFACE POINTER! */
//RsGxsReputation *rsGxsReputation = NULL;
const int kMaximumPeerAge = 180; // half a year.
const int kMaximumSetSize = 100;
/************ IMPLEMENTATION NOTES *********************************
*
* p3GxsReputation shares opinions / reputations with peers.
@ -61,8 +55,12 @@ const int kMaximumSetSize = 100;
* last update ----------->
* <----------- modified opinions.
*
* This service will have to store a huge amount of data.
* need to workout how to reduce it.
* If not clever enough, this service will have to store a huge amount of data.
* To make things tractable we do this:
* - do not store reputations when no data is present, or when all friends are neutral
* - only send a neutral opinion when they are a true change over someone's opinion
* - only send a neutral opinion when it is a true change over someone's opinion
* - auto-clean reputations for default values
*
* std::map<RsGxsId, Reputation> mReputations.
* std::multimap<time_t, RsGxsId> mUpdated.
@ -70,70 +68,87 @@ const int kMaximumSetSize = 100;
* std::map<RsPeerId, ReputationConfig> mConfig;
*
* Updates from p3GxsReputation -> p3IdService.
*
*
* Updates from p3IdService -> p3GxsReputation.
*
* Each peer locally stores reputations for all GXS ids. If not stored, a default value
* is used, corresponding to a neutral opinion. Peers also share their reputation level
* with their neighbor nodes.
*
* The calculation method is the following:
*
* Local values:
* Good: 2
* Neutral: 1
* Bad: 0
*
* Overall reputation score:
*
* if(own_opinion == 0) // means we dont' care
* r = average_of_friends_opinions
* else
* r = own_opinion
*
* Decisions based on reputation score:
*
* 0 x1 1 x2 2
* | <-----------------------------------------------------------------------> |
* ---------+
* Lobbies | Msgs dropped
* Forums | Msgs dropped
* Messages | Msgs dropped
* ---------+----------------------------------------------------------------------------
*
* We select x1=0.5
*
* => to kill an identity, either you, or at least 50% of your friends need to flag it
* as bad.
* Rules:
* * a single peer cannot drastically change the behavior of a given GXS id
* * it should be easy for many peers to globally kill a GXS id
*
* Typical examples:
*
* Friends | Friend average | Own | alpha | Score
* -----------+---------------------+----------+------------+--------------
* 10 | 0.5 | 1 | 0.25 | 0.375
* 10 | 1.0 | 1 | 0.25 | 1.0
* 10 | 1.0 | 0 | 0.25 | 1.0
*
* To check:
* [ ] Opinions are saved/loaded accross restart
* [ ] Opinions are transmitted to friends
* [ ] Opinions are transmitted to friends when updated
*
* To do:
* [ ] Add debug info
* [ ] Test the whole thing
* [X] Implement a system to allow not storing info when we don't have it
*/
const int32_t REPUTATION_OFFSET = 100000;
const int32_t LOWER_LIMIT = -100;
const int32_t UPPER_LIMIT = 100;
int32_t ConvertFromSerialised(uint32_t value, bool limit)
{
int32_t converted = ((int32_t) value) - REPUTATION_OFFSET ;
if (limit)
{
if (converted < LOWER_LIMIT)
{
converted = LOWER_LIMIT;
}
if (converted > UPPER_LIMIT)
{
converted = UPPER_LIMIT;
}
}
return converted;
}
uint32_t ConvertToSerialised(int32_t value, bool limit)
{
if (limit)
{
if (value < LOWER_LIMIT)
{
value = LOWER_LIMIT;
}
if (value > UPPER_LIMIT)
{
value = UPPER_LIMIT;
}
}
value += REPUTATION_OFFSET;
if (value < 0)
{
value = 0;
}
return (uint32_t) value;
}
static const uint32_t LOWER_LIMIT = 0; // used to filter valid Opinion values from serialized data
static const uint32_t UPPER_LIMIT = 2; // used to filter valid Opinion values from serialized data
static const int kMaximumPeerAge = 180; // half a year.
static const int kMaximumSetSize = 100; // max set of updates to send at once.
static const int ACTIVE_FRIENDS_UPDATE_PERIOD = 600 ; // 10 minutes
static const int ACTIVE_FRIENDS_ONLINE_DELAY = 86400*7 ; // 1 week.
static const int kReputationRequestPeriod = 600; // 10 mins
static const int kReputationStoreWait = 180; // 3 minutes.
static const float REPUTATION_ASSESSMENT_THRESHOLD_X1 = 0.5f ; // reputation under which the peer gets killed
p3GxsReputation::p3GxsReputation(p3LinkMgr *lm)
:p3Service(), p3Config(),
mReputationMtx("p3GxsReputation"), mLinkMgr(lm)
{
addSerialType(new RsGxsReputationSerialiser());
addSerialType(new RsGxsReputationSerialiser());
mRequestTime = 0;
mStoreTime = 0;
mReputationsUpdated = false;
mRequestTime = 0;
mStoreTime = 0;
mReputationsUpdated = false;
mLastActiveFriendsUpdate = 0 ;
mAverageActiveFriends = 0 ;
}
const std::string GXS_REPUTATION_APP_NAME = "gxsreputation";
const uint16_t GXS_REPUTATION_APP_MAJOR_VERSION = 1;
const uint16_t GXS_REPUTATION_APP_MINOR_VERSION = 0;
@ -150,13 +165,41 @@ RsServiceInfo p3GxsReputation::getServiceInfo()
GXS_REPUTATION_MIN_MINOR_VERSION);
}
int p3GxsReputation::tick()
{
processIncoming();
sendPackets();
time_t now = time(NULL);
if(mLastActiveFriendsUpdate + ACTIVE_FRIENDS_UPDATE_PERIOD < now)
{
updateActiveFriends() ;
cleanup() ;
mLastActiveFriendsUpdate = now ;
}
static time_t last_identity_flags_update = 0 ;
// no more than once per 5 second chunk.
if(now > 100+last_identity_flags_update)
{
last_identity_flags_update = now ;
updateIdentityFlags() ;
}
#ifdef DEBUG_REPUTATION
static time_t last_debug_print = time(NULL) ;
if(now > 10+last_debug_print)
{
last_debug_print = now ;
debug_print() ;
}
#endif
return 0;
}
@ -165,7 +208,105 @@ int p3GxsReputation::status()
return 1;
}
void p3GxsReputation::updateIdentityFlags()
{
std::list<RsGxsId> to_update ;
// we need to gather the list to be used in a non locked frame
{
RsStackMutex stack(mReputationMtx); /****** LOCKED MUTEX *******/
#ifdef DEBUG_REPUTATION
std::cerr << "Updating reputation identity flags" << std::endl;
#endif
for( std::map<RsGxsId, Reputation>::iterator rit = mReputations.begin();rit!=mReputations.end();++rit)
if(rit->second.mIdentityFlags & REPUTATION_IDENTITY_FLAG_NEEDS_UPDATE)
to_update.push_back(rit->first) ;
}
for(std::list<RsGxsId>::const_iterator rit(to_update.begin());rit!=to_update.end();++rit)
{
RsIdentityDetails details;
if(!rsIdentity->getIdDetails(*rit,details))
{
#ifdef DEBUG_REPUTATION
std::cerr << " cannot obtain info for " << *rit << ". Will do it later." << std::endl;
#endif
continue ;
}
RsStackMutex stack(mReputationMtx); /****** LOCKED MUTEX *******/
std::map<RsGxsId,Reputation>::iterator it = mReputations.find(*rit) ;
if(it == mReputations.end())
{
std::cerr << " Weird situation: item " << *rit << " has been deleted from the list??" << std::endl;
continue ;
}
it->second.mIdentityFlags = 0 ;
if(details.mFlags & RS_IDENTITY_FLAGS_PGP_LINKED) it->second.mIdentityFlags |= REPUTATION_IDENTITY_FLAG_PGP_LINKED ;
if(details.mFlags & RS_IDENTITY_FLAGS_PGP_KNOWN ) it->second.mIdentityFlags |= REPUTATION_IDENTITY_FLAG_PGP_KNOWN ;
#ifdef DEBUG_REPUTATION
std::cerr << " updated flags for " << *rit << " to " << std::hex << it->second.mIdentityFlags << std::dec << std::endl;
#endif
it->second.updateReputation() ;
IndicateConfigChanged();
}
}
void p3GxsReputation::cleanup()
{
// remove opinions from friends that havn't been seen online for more than the specified delay
#ifdef DEBUG_REPUTATION
std::cerr << "p3GxsReputation::cleanup() " << std::endl;
#endif
std::cerr << __PRETTY_FUNCTION__ << ": not implemented. TODO!" << std::endl;
}
void p3GxsReputation::updateActiveFriends()
{
RsStackMutex stack(mReputationMtx); /****** LOCKED MUTEX *******/
// keep track of who is recently connected. That will give a value to average friend
// for this, we count all friends that have been online in the last week.
time_t now = time(NULL) ;
std::list<RsPeerId> idList ;
mLinkMgr->getFriendList(idList) ;
mAverageActiveFriends = 0 ;
#ifdef DEBUG_REPUTATION
std::cerr << " counting recently online peers." << std::endl;
#endif
for(std::list<RsPeerId>::const_iterator it(idList.begin());it!=idList.end();++it)
{
RsPeerDetails details ;
#ifdef DEBUG_REPUTATION
std::cerr << " " << *it << ": last seen " << now - details.lastConnect << " secs ago" << std::endl;
#endif
if(rsPeers->getPeerDetails(*it, details) && now < details.lastConnect + ACTIVE_FRIENDS_ONLINE_DELAY)
++mAverageActiveFriends ;
}
#ifdef DEBUG_REPUTATION
std::cerr << " new count: " << mAverageActiveFriends << std::endl;
#endif
}
static RsReputations::Opinion safe_convert_uint32t_to_opinion(uint32_t op)
{
return RsReputations::Opinion(std::min((uint32_t)op,UPPER_LIMIT)) ;
}
/***** Implementation ******/
bool p3GxsReputation::processIncoming()
@ -184,40 +325,31 @@ bool p3GxsReputation::processIncoming()
switch(item->PacketSubType())
{
default:
case RS_PKT_SUBTYPE_GXSREPUTATION_CONFIG_ITEM:
case RS_PKT_SUBTYPE_GXSREPUTATION_SET_ITEM:
case RS_PKT_SUBTYPE_GXS_REPUTATION_CONFIG_ITEM:
case RS_PKT_SUBTYPE_GXS_REPUTATION_SET_ITEM:
std::cerr << "p3GxsReputation::processingIncoming() Unknown Item";
std::cerr << std::endl;
itemOk = false;
break;
case RS_PKT_SUBTYPE_GXSREPUTATION_REQUEST_ITEM:
case RS_PKT_SUBTYPE_GXS_REPUTATION_REQUEST_ITEM:
{
RsGxsReputationRequestItem *requestItem =
dynamic_cast<RsGxsReputationRequestItem *>(item);
RsGxsReputationRequestItem *requestItem = dynamic_cast<RsGxsReputationRequestItem *>(item);
if (requestItem)
{
SendReputations(requestItem);
}
else
{
itemOk = false;
}
}
break;
case RS_PKT_SUBTYPE_GXSREPUTATION_UPDATE_ITEM:
case RS_PKT_SUBTYPE_GXS_REPUTATION_UPDATE_ITEM:
{
RsGxsReputationUpdateItem *updateItem =
dynamic_cast<RsGxsReputationUpdateItem *>(item);
RsGxsReputationUpdateItem *updateItem = dynamic_cast<RsGxsReputationUpdateItem *>(item);
if (updateItem)
{
RecvReputations(updateItem);
}
else
{
itemOk = false;
}
}
break;
}
@ -237,8 +369,9 @@ bool p3GxsReputation::processIncoming()
bool p3GxsReputation::SendReputations(RsGxsReputationRequestItem *request)
{
std::cerr << "p3GxsReputation::SendReputations()";
std::cerr << std::endl;
#ifdef DEBUG_REPUTATION
std::cerr << "p3GxsReputation::SendReputations()" << std::endl;
#endif
RsPeerId peerId = request->PeerId();
time_t last_update = request->mLastUpdate;
@ -252,12 +385,13 @@ bool p3GxsReputation::SendReputations(RsGxsReputationRequestItem *request)
int count = 0;
int totalcount = 0;
RsGxsReputationUpdateItem *pkt = new RsGxsReputationUpdateItem();
pkt->PeerId(peerId);
for(;tit != mUpdated.end(); ++tit)
{
/* find */
std::map<RsGxsId, Reputation>::iterator rit;
rit = mReputations.find(tit->second);
std::map<RsGxsId, Reputation>::iterator rit = mReputations.find(tit->second);
if (rit == mReputations.end())
{
std::cerr << "p3GxsReputation::SendReputations() ERROR Missing Reputation";
@ -275,8 +409,9 @@ bool p3GxsReputation::SendReputations(RsGxsReputationRequestItem *request)
}
RsGxsId gxsId = rit->first;
pkt->mOpinions[gxsId] = ConvertToSerialised(rit->second.mOwnOpinion, true);
pkt->mOpinions[gxsId] = rit->second.mOwnOpinion;
pkt->mLatestUpdate = rit->second.mOwnOpinionTs;
if (pkt->mLatestUpdate == (uint32_t) now)
{
// if we could possibly get another Update at this point (same second).
@ -289,10 +424,13 @@ bool p3GxsReputation::SendReputations(RsGxsReputationRequestItem *request)
if (count > kMaximumSetSize)
{
#ifdef DEBUG_REPUTATION
std::cerr << "p3GxsReputation::SendReputations() Sending Full Packet";
std::cerr << std::endl;
#endif
sendItem(pkt);
pkt = new RsGxsReputationUpdateItem();
pkt->PeerId(peerId);
count = 0;
@ -301,8 +439,10 @@ bool p3GxsReputation::SendReputations(RsGxsReputationRequestItem *request)
if (!pkt->mOpinions.empty())
{
#ifdef DEBUG_REPUTATION
std::cerr << "p3GxsReputation::SendReputations() Sending Final Packet";
std::cerr << std::endl;
#endif
sendItem(pkt);
}
@ -311,52 +451,125 @@ bool p3GxsReputation::SendReputations(RsGxsReputationRequestItem *request)
delete pkt;
}
#ifdef DEBUG_REPUTATION
std::cerr << "p3GxsReputation::SendReputations() Total Count: " << totalcount;
std::cerr << std::endl;
#endif
return true;
}
void p3GxsReputation::locked_updateOpinion(const RsPeerId& from,const RsGxsId& about,RsReputations::Opinion op)
{
/* find matching Reputation */
std::map<RsGxsId, Reputation>::iterator rit = mReputations.find(about);
RsReputations::Opinion new_opinion = safe_convert_uint32t_to_opinion(op);
RsReputations::Opinion old_opinion = RsReputations::OPINION_NEUTRAL ; // default if not set
bool updated = false ;
#ifdef DEBUG_REPUTATION
std::cerr << "p3GxsReputation::update opinion of " << about << " from " << from << " to " << op << std::endl;
#endif
// now 4 cases;
// Opinion already stored
// New opinion is same: nothing to do
// New opinion is different: if neutral, remove entry
// Nothing stored
// New opinion is neutral: nothing to do
// New opinion is != 1: create entry and store
if (rit == mReputations.end())
{
#ifdef DEBUG_REPUTATION
std::cerr << " no preview record"<< std::endl;
#endif
if(new_opinion != RsReputations::OPINION_NEUTRAL)
{
mReputations[about] = Reputation(about);
rit = mReputations.find(about);
}
else
{
#ifdef DEBUG_REPUTATION
std::cerr << " no changes!"<< std::endl;
#endif
return ; // nothing to do
}
}
Reputation& reputation = rit->second;
std::map<RsPeerId,RsReputations::Opinion>::iterator it2 = reputation.mOpinions.find(from) ;
if(it2 == reputation.mOpinions.end())
{
if(new_opinion != RsReputations::OPINION_NEUTRAL)
{
reputation.mOpinions[from] = new_opinion; // filters potentially tweaked reputation score sent by friend
updated = true ;
}
}
else
{
old_opinion = it2->second ;
if(new_opinion == RsReputations::OPINION_NEUTRAL)
{
reputation.mOpinions.erase(it2) ; // don't store when the opinion is neutral
updated = true ;
}
else if(new_opinion != old_opinion)
{
it2->second = new_opinion ;
updated = true ;
}
}
if(reputation.mOpinions.empty() && reputation.mOwnOpinion == RsReputations::OPINION_NEUTRAL)
{
mReputations.erase(rit) ;
#ifdef DEBUG_REPUTATION
std::cerr << " own is neutral and no opinions from friends => remove entry" << std::endl;
#endif
updated = true ;
}
else if(updated)
{
#ifdef DEBUG_REPUTATION
std::cerr << " reputation changed. re-calculating." << std::endl;
#endif
reputation.updateReputation() ;
}
if(updated)
IndicateConfigChanged() ;
}
bool p3GxsReputation::RecvReputations(RsGxsReputationUpdateItem *item)
{
std::cerr << "p3GxsReputation::RecvReputations()";
std::cerr << std::endl;
#ifdef DEBUG_REPUTATION
std::cerr << "p3GxsReputation::RecvReputations() from " << item->PeerId() << std::endl;
#endif
RsPeerId peerid = item->PeerId();
std::map<RsGxsId, uint32_t>::iterator it;
for(it = item->mOpinions.begin(); it != item->mOpinions.end(); ++it)
for( std::map<RsGxsId, uint32_t>::iterator it = item->mOpinions.begin(); it != item->mOpinions.end(); ++it)
{
RsStackMutex stack(mReputationMtx); /****** LOCKED MUTEX *******/
/* find matching Reputation */
std::map<RsGxsId, Reputation>::iterator rit;
RsGxsId gxsId(it->first);
rit = mReputations.find(gxsId);
if (rit == mReputations.end())
{
mReputations[gxsId] = Reputation(gxsId);
rit = mReputations.find(gxsId);
}
Reputation &reputation = rit->second;
reputation.mOpinions[peerid] = ConvertFromSerialised(it->second, true);
int previous = reputation.mReputation;
if (previous != reputation.CalculateReputation())
{
// updated from the network.
mUpdatedReputations.insert(gxsId);
}
locked_updateOpinion(peerid,it->first,safe_convert_uint32t_to_opinion(it->second));
}
updateLatestUpdate(peerid, item->mLatestUpdate);
updateLatestUpdate(peerid,item->mLatestUpdate);
return true;
}
bool p3GxsReputation::updateLatestUpdate(RsPeerId peerid, time_t ts)
bool p3GxsReputation::updateLatestUpdate(RsPeerId peerid,time_t latest_update)
{
RsStackMutex stack(mReputationMtx); /****** LOCKED MUTEX *******/
@ -367,11 +580,12 @@ bool p3GxsReputation::updateLatestUpdate(RsPeerId peerid, time_t ts)
mConfig[peerid] = ReputationConfig(peerid);
it = mConfig.find(peerid) ;
}
it->second.mLatestUpdate = ts;
it->second.mLatestUpdate = latest_update ;
mReputationsUpdated = true;
// Switched to periodic save due to scale of data.
//IndicateConfigChanged();
IndicateConfigChanged();
return true;
}
@ -380,14 +594,64 @@ bool p3GxsReputation::updateLatestUpdate(RsPeerId peerid, time_t ts)
* Opinion
****/
bool p3GxsReputation::updateOpinion(const RsGxsId& gxsid, int opinion)
bool p3GxsReputation::getReputationInfo(const RsGxsId& gxsid, RsReputations::ReputationInfo& info)
{
if(gxsid.isNull())
return false ;
RsStackMutex stack(mReputationMtx); /****** LOCKED MUTEX *******/
#ifdef DEBUG_REPUTATION
std::cerr << "getReputationInfo() for " << gxsid << std::endl;
#endif
Reputation& rep(mReputations[gxsid]) ;
info.mOwnOpinion = RsReputations::Opinion(rep.mOwnOpinion) ;
info.mOverallReputationScore = rep.mReputation ;
info.mFriendAverage = rep.mFriendAverage ;
if(info.mOverallReputationScore > REPUTATION_ASSESSMENT_THRESHOLD_X1)
info.mAssessment = RsReputations::ASSESSMENT_OK ;
else
info.mAssessment = RsReputations::ASSESSMENT_BAD ;
#ifdef DEBUG_REPUTATION
std::cerr << " information present. OwnOp = " << info.mOwnOpinion << ", overall score=" << info.mAssessment << std::endl;
#endif
return true ;
}
bool p3GxsReputation::isIdentityBanned(const RsGxsId &id)
{
RsReputations::ReputationInfo info ;
getReputationInfo(id,info) ;
#ifdef DEBUG_REPUTATION
std::cerr << "isIdentityBanned(): returning " << (info.mAssessment == RsReputations::ASSESSMENT_BAD) << " for GXS id " << id << std::endl;
#endif
return info.mAssessment == RsReputations::ASSESSMENT_BAD ;
}
bool p3GxsReputation::setOwnOpinion(const RsGxsId& gxsid, const RsReputations::Opinion& opinion)
{
#ifdef DEBUG_REPUTATION
std::cerr << "setOwnOpinion(): for GXS id " << gxsid << " to " << opinion << std::endl;
#endif
if(gxsid.isNull())
{
std::cerr << " ID " << gxsid << " is rejected. Look for a bug in calling method." << std::endl;
return false ;
}
RsStackMutex stack(mReputationMtx); /****** LOCKED MUTEX *******/
std::map<RsGxsId, Reputation>::iterator rit;
/* find matching Reputation */
rit = mReputations.find(gxsid);
if (rit == mReputations.end())
{
mReputations[gxsid] = Reputation(gxsid);
@ -420,14 +684,15 @@ bool p3GxsReputation::updateOpinion(const RsGxsId& gxsid, int opinion)
time_t now = time(NULL);
reputation.mOwnOpinion = opinion;
reputation.mOwnOpinionTs = now;
reputation.CalculateReputation();
reputation.updateReputation();
mUpdated.insert(std::make_pair(now, gxsid));
mUpdatedReputations.insert(gxsid);
mReputationsUpdated = true;
// Switched to periodic save due to scale of data.
//IndicateConfigChanged();
IndicateConfigChanged();
return true;
}
@ -448,6 +713,9 @@ bool p3GxsReputation::saveList(bool& cleanup, std::list<RsItem*> &savelist)
cleanup = true;
RsStackMutex stack(mReputationMtx); /****** LOCKED MUTEX *******/
#ifdef DEBUG_REPUTATION
std::cerr << "p3GxsReputation::saveList()" << std::endl;
#endif
/* save */
std::map<RsPeerId, ReputationConfig>::iterator it;
for(it = mConfig.begin(); it != mConfig.end(); ++it)
@ -459,7 +727,7 @@ bool p3GxsReputation::saveList(bool& cleanup, std::list<RsItem*> &savelist)
}
RsGxsReputationConfigItem *item = new RsGxsReputationConfigItem();
item->mPeerId = it->first.toStdString();
item->mPeerId = it->first;
item->mLatestUpdate = it->second.mLatestUpdate;
item->mLastQuery = it->second.mLastQuery;
savelist.push_back(item);
@ -470,16 +738,16 @@ bool p3GxsReputation::saveList(bool& cleanup, std::list<RsItem*> &savelist)
for(rit = mReputations.begin(); rit != mReputations.end(); ++rit, count++)
{
RsGxsReputationSetItem *item = new RsGxsReputationSetItem();
item->mGxsId = rit->first.toStdString();
item->mOwnOpinion = ConvertToSerialised(rit->second.mOwnOpinion, false);
item->mOwnOpinionTs = rit->second.mOwnOpinionTs;
item->mReputation = ConvertToSerialised(rit->second.mReputation, false);
item->mGxsId = rit->first;
item->mOwnOpinion = rit->second.mOwnOpinion;
item->mOwnOpinionTS = rit->second.mOwnOpinionTs;
item->mIdentityFlags = rit->second.mIdentityFlags;
std::map<RsPeerId, int32_t>::iterator oit;
std::map<RsPeerId, RsReputations::Opinion>::iterator oit;
for(oit = rit->second.mOpinions.begin(); oit != rit->second.mOpinions.end(); ++oit)
{
// should be already limited.
item->mOpinions[oit->first.toStdString()] = ConvertToSerialised(oit->second, false);
item->mOpinions[oit->first] = (uint32_t)oit->second;
}
savelist.push_back(item);
@ -495,12 +763,16 @@ void p3GxsReputation::saveDone()
bool p3GxsReputation::loadList(std::list<RsItem *>& loadList)
{
#ifdef DEBUG_REPUTATION
std::cerr << "p3GxsReputation::saveList()" << std::endl;
#endif
std::list<RsItem *>::iterator it;
std::set<RsPeerId> peerSet;
for(it = loadList.begin(); it != loadList.end(); ++it)
{
RsGxsReputationConfigItem *item = dynamic_cast<RsGxsReputationConfigItem *>(*it);
// Configurations are loaded first. (to establish peerSet).
if (item)
{
@ -510,17 +782,18 @@ bool p3GxsReputation::loadList(std::list<RsItem *>& loadList)
config.mPeerId = peerId;
config.mLatestUpdate = item->mLatestUpdate;
config.mLastQuery = 0;
peerSet.insert(peerId);
}
RsGxsReputationSetItem *set = dynamic_cast<RsGxsReputationSetItem *>(*it);
if (set)
{
loadReputationSet(set, peerSet);
}
delete (*it);
}
loadList.clear() ;
return true;
}
@ -530,6 +803,9 @@ bool p3GxsReputation::loadReputationSet(RsGxsReputationSetItem *item, const std:
std::map<RsGxsId, Reputation>::iterator rit;
if(item->mGxsId.isNull()) // just a protection against potential errors having put 00000 into ids.
return false ;
/* find matching Reputation */
RsGxsId gxsId(item->mGxsId);
rit = mReputations.find(gxsId);
@ -542,26 +818,24 @@ bool p3GxsReputation::loadReputationSet(RsGxsReputationSetItem *item, const std:
Reputation &reputation = mReputations[gxsId];
// install opinions.
std::map<std::string, uint32_t>::const_iterator oit;
std::map<RsPeerId, uint32_t>::const_iterator oit;
for(oit = item->mOpinions.begin(); oit != item->mOpinions.end(); ++oit)
{
// expensive ... but necessary.
RsPeerId peerId(oit->first);
if (peerSet.end() != peerSet.find(peerId))
{
reputation.mOpinions[peerId] = ConvertFromSerialised(oit->second, true);
}
reputation.mOpinions[peerId] = safe_convert_uint32t_to_opinion(oit->second);
}
reputation.mOwnOpinion = ConvertFromSerialised(item->mOwnOpinion, false);
reputation.mOwnOpinionTs = item->mOwnOpinionTs;
reputation.mOwnOpinion = item->mOwnOpinion;
reputation.mOwnOpinionTs = item->mOwnOpinionTS;
// if dropping entries has changed the score -> must update.
int previous = ConvertFromSerialised(item->mReputation, false);
if (previous != reputation.CalculateReputation())
{
mUpdatedReputations.insert(gxsId);
}
//float old_reputation = reputation.mReputation ;
//mUpdatedReputations.insert(gxsId) ;
reputation.updateReputation() ;
mUpdated.insert(std::make_pair(reputation.mOwnOpinionTs, gxsId));
return true;
@ -572,9 +846,6 @@ bool p3GxsReputation::loadReputationSet(RsGxsReputationSetItem *item, const std:
* Send Requests.
****/
const int kReputationRequestPeriod = 3600;
const int kReputationStoreWait = 180; // 3 minutes.
int p3GxsReputation::sendPackets()
{
time_t now = time(NULL);
@ -585,7 +856,7 @@ int p3GxsReputation::sendPackets()
storeTime = mStoreTime;
}
if (now - requestTime > kReputationRequestPeriod)
if (now > requestTime + kReputationRequestPeriod)
{
sendReputationRequests();
@ -629,48 +900,41 @@ void p3GxsReputation::sendReputationRequests()
mLinkMgr->getOnlineList(idList);
#ifdef DEBUG_REPUTATION
std::cerr << "p3GxsReputation::sendReputationRequests()";
std::cerr << std::endl;
#endif
/* prepare packets */
std::list<RsPeerId>::iterator it;
for(it = idList.begin(); it != idList.end(); ++it)
{
#ifdef DEBUG_REPUTATION
std::cerr << "p3GxsReputation::sendReputationRequest() To: " << *it;
std::cerr << std::endl;
#endif
sendReputationRequest(*it);
}
}
int p3GxsReputation::sendReputationRequest(RsPeerId peerid)
{
std::cerr << "p3GxsReputation::sendReputationRequest(" << peerid << ")";
std::cerr << std::endl;
#ifdef DEBUG_REPUTATION
std::cerr << " p3GxsReputation::sendReputationRequest(" << peerid << ") " ;
#endif
time_t now = time(NULL) ;
/* */
RsGxsReputationRequestItem *requestItem =
new RsGxsReputationRequestItem();
RsGxsReputationRequestItem *requestItem = new RsGxsReputationRequestItem();
requestItem->PeerId(peerid);
{
RsStackMutex stack(mReputationMtx); /****** LOCKED MUTEX *******/
/* find the last timestamp we have */
std::map<RsPeerId, ReputationConfig>::iterator it;
it = mConfig.find(peerid);
std::map<RsPeerId, ReputationConfig>::iterator it = mConfig.find(peerid);
if (it != mConfig.end())
{
#ifdef DEBUG_REPUTATION
std::cerr << " lastUpdate = " << now - it->second.mLatestUpdate << " secs ago. Requesting only more recent." << std::endl;
#endif
requestItem->mLastUpdate = it->second.mLatestUpdate;
}
else
{
#ifdef DEBUG_REPUTATION
std::cerr << " lastUpdate = never. Requesting all!" << std::endl;
#endif
// get whole list.
requestItem->mLastUpdate = 0;
}
@ -680,4 +944,90 @@ int p3GxsReputation::sendReputationRequest(RsPeerId peerid)
return 1;
}
void Reputation::updateReputation()
{
// the calculation of reputation makes the whole thing
int friend_total = 0;
// accounts for all friends. Neutral opinions count for 1-1=0
// because the average is performed over only accessible peers (not the total number) we need to shift to 1
for(std::map<RsPeerId,RsReputations::Opinion>::const_iterator it(mOpinions.begin());it!=mOpinions.end();++it)
friend_total += it->second - 1;
if(mOpinions.empty()) // includes the case of no friends!
mFriendAverage = 1.0f ;
else
{
static const float REPUTATION_FRIEND_FACTOR_ANON = 2.0f ;
static const float REPUTATION_FRIEND_FACTOR_PGP_LINKED = 5.0f ;
static const float REPUTATION_FRIEND_FACTOR_PGP_KNOWN = 10.0f ;
// For positive votes, start from 1 and slowly tend to 2
// for negative votes, start from 1 and slowly tend to 0
// depending on signature state, the ID is harder (signed ids) or easier (anon ids) to ban or to promote.
//
// when REPUTATION_FRIEND_VARIANCE = 3, that gives the following values:
//
// total votes | mFriendAverage anon | mFriendAverage PgpLinked | mFriendAverage PgpKnown |
// | F=2.0 | F=5.0 | F=10.0 |
// -------------+----------------------+---------------------------+--------------------------+
// -10 | 0.00 Banned | 0.13 Banned | 0.36 Banned |
// -5 | 0.08 Banned | 0.36 Banned | 0.60 |
// -4 | 0.13 Banned | 0.44 Banned | 0.67 |
// -3 | 0.22 Banned | 0.54 | 0.74 |
// -2 | 0.36 Banned | 0.67 | 0.81 |
// -1 | 0.60 | 0.81 | 0.90 |
// 0 | 1.0 | 1.0 | 1.00 |
// 1 | 1.39 | 1.18 | 1.09 |
// 2 | 1.63 | 1.32 | 1.18 |
// 3 | 1.77 | 1.45 | 1.25 |
// 4 | 1.86 | 1.55 | 1.32 |
// 5 | 1.91 | 1.63 | 1.39 |
//
// Banning info is provided by the reputation system, and does not depend on PGP-sign state.
//
// However, each service might have its own rules for the different cases. For instance
// PGP-favoring forums might want a score > 1.4 for anon ids, and >= 1.0 for PGP-signed.
float reputation_bias ;
if(mIdentityFlags & REPUTATION_IDENTITY_FLAG_PGP_KNOWN)
reputation_bias = REPUTATION_FRIEND_FACTOR_PGP_KNOWN ;
else if(mIdentityFlags & REPUTATION_IDENTITY_FLAG_PGP_LINKED)
reputation_bias = REPUTATION_FRIEND_FACTOR_PGP_LINKED ;
else
reputation_bias = REPUTATION_FRIEND_FACTOR_ANON ;
if(friend_total > 0)
mFriendAverage = 2.0f-exp(-friend_total / reputation_bias) ;
else
mFriendAverage = exp( friend_total / reputation_bias) ;
}
// now compute a bias for PGP-signed ids.
if(mOwnOpinion == RsReputations::OPINION_NEUTRAL)
mReputation = mFriendAverage ;
else
mReputation = (float)mOwnOpinion ;
}
void p3GxsReputation::debug_print()
{
std::cerr << "Reputations database: " << std::endl;
std::cerr << " Average number of peers: " << mAverageActiveFriends << std::endl;
time_t now = time(NULL) ;
for(std::map<RsGxsId,Reputation>::const_iterator it(mReputations.begin());it!=mReputations.end();++it)
{
std::cerr << " ID=" << it->first << ", own: " << it->second.mOwnOpinion << ", Friend average: " << it->second.mFriendAverage << ", global_score: " << it->second.mReputation
<< ", last own update: " << now - it->second.mOwnOpinionTs << " secs ago." << std::endl;
for(std::map<RsPeerId,RsReputations::Opinion>::const_iterator it2(it->second.mOpinions.begin());it2!=it->second.mOpinions.end();++it2)
std::cerr << " " << it2->first << ": " << it2->second << std::endl;
}
}

View file

@ -32,9 +32,14 @@
#include <map>
#include <set>
#define REPUTATION_IDENTITY_FLAG_NEEDS_UPDATE 0x0100
#define REPUTATION_IDENTITY_FLAG_PGP_LINKED 0x0001
#define REPUTATION_IDENTITY_FLAG_PGP_KNOWN 0x0002
#include "serialiser/rsgxsreputationitems.h"
#include "retroshare/rsidentity.h"
#include "retroshare/rsreputations.h"
#include "services/p3service.h"
@ -58,19 +63,21 @@ class Reputation
{
public:
Reputation()
:mOwnOpinion(0), mOwnOpinionTs(0), mReputation(0) { return; }
:mOwnOpinion(RsReputations::OPINION_NEUTRAL), mOwnOpinionTs(0),mFriendAverage(1.0f), mReputation(RsReputations::OPINION_NEUTRAL),mIdentityFlags(REPUTATION_IDENTITY_FLAG_NEEDS_UPDATE){ }
Reputation(const RsGxsId& about)
:mOwnOpinion(RsReputations::OPINION_NEUTRAL), mOwnOpinionTs(0),mFriendAverage(1.0f), mReputation(RsReputations::OPINION_NEUTRAL),mIdentityFlags(REPUTATION_IDENTITY_FLAG_NEEDS_UPDATE){ }
Reputation(const RsGxsId& about)
:mGxsId(about), mOwnOpinion(0), mOwnOpinionTs(0), mReputation(0) { return; }
void updateReputation();
int32_t CalculateReputation();
RsGxsId mGxsId;
std::map<RsPeerId, int32_t> mOpinions;
std::map<RsPeerId, RsReputations::Opinion> mOpinions;
int32_t mOwnOpinion;
time_t mOwnOpinionTs;
int32_t mReputation;
float mFriendAverage ;
float mReputation;
uint32_t mIdentityFlags;
};
@ -80,37 +87,25 @@ int32_t CalculateReputation();
*
*/
class p3GxsReputation: public p3Service, public p3Config /* , public pqiMonitor */
class p3GxsReputation: public p3Service, public p3Config, public RsReputations /* , public pqiMonitor */
{
public:
p3GxsReputation(p3LinkMgr *lm);
virtual RsServiceInfo getServiceInfo();
/***** Interface for p3idservice *****/
virtual bool updateOpinion(const RsGxsId& gxsid, int opinion);
/***** Interface for RsReputations *****/
virtual bool setOwnOpinion(const RsGxsId& key_id, const Opinion& op) ;
virtual bool getReputationInfo(const RsGxsId& id,ReputationInfo& info) ;
virtual bool isIdentityBanned(const RsGxsId& id) ;
/***** overloaded from p3Service *****/
/*!
* This retrieves all chat msg items and also (important!)
* processes chat-status items that are in service item queue. chat msg item requests are also processed and not returned
* (important! also) notifications sent to notify base on receipt avatar, immediate status and custom status
* : notifyCustomState, notifyChatStatus, notifyPeerHasNewAvatar
* @see NotifyBase
*/
virtual int tick();
virtual int status();
/*!
* Interface stuff.
*/
/*************** pqiMonitor callback ***********************/
//virtual void statusChange(const std::list<pqipeer> &plist);
/************* from p3Config *******************/
virtual RsSerialiser *setupSerialiser() ;
virtual bool saveList(bool& cleanup, std::list<RsItem*>&) ;
@ -123,22 +118,28 @@ class p3GxsReputation: public p3Service, public p3Config /* , public pqiMonitor
bool SendReputations(RsGxsReputationRequestItem *request);
bool RecvReputations(RsGxsReputationUpdateItem *item);
bool updateLatestUpdate(RsPeerId peerid, time_t ts);
bool updateLatestUpdate(RsPeerId peerid, time_t latest_update);
void updateActiveFriends() ;
// internal update of data. Takes care of cleaning empty boxes.
void locked_updateOpinion(const RsPeerId &from, const RsGxsId &about, RsReputations::Opinion op);
bool loadReputationSet(RsGxsReputationSetItem *item, const std::set<RsPeerId> &peerSet);
bool loadReputationSet(RsGxsReputationSetItem *item,
const std::set<RsPeerId> &peerSet);
int sendPackets();
int sendPackets();
void cleanup();
void sendReputationRequests();
int sendReputationRequest(RsPeerId peerid);
int sendReputationRequest(RsPeerId peerid);
void debug_print() ;
void updateIdentityFlags();
private:
RsMutex mReputationMtx;
time_t mLastActiveFriendsUpdate;
time_t mRequestTime;
time_t mStoreTime;
bool mReputationsUpdated;
uint32_t mAverageActiveFriends ;
p3LinkMgr *mLinkMgr;
@ -148,7 +149,7 @@ class p3GxsReputation: public p3Service, public p3Config /* , public pqiMonitor
std::multimap<time_t, RsGxsId> mUpdated;
// set of Reputations to send to p3IdService.
std::set<RsGxsId> mUpdatedReputations;
std::set<RsGxsId> mUpdatedReputations;
};
#endif //SERVICE_RSGXSREPUTATION_HEADER

View file

@ -56,6 +56,8 @@
#define ID_REQUEST_REPUTATION 0x0003
#define ID_REQUEST_OPINION 0x0004
#define GXSID_MAX_CACHE_SIZE 5000
static const uint32_t MAX_KEEP_UNUSED_KEYS = 30*86400 ; // remove unused keys after 30 days
static const uint32_t MAX_DELAY_BEFORE_CLEANING = 601 ; // clean old keys every 10 mins
@ -144,8 +146,8 @@ RsIdentity *rsIdentity = NULL;
p3IdService::p3IdService(RsGeneralDataService *gds, RsNetworkExchangeService *nes, PgpAuxUtils *pgpUtils)
: RsGxsIdExchange(gds, nes, new RsGxsIdSerialiser(), RS_SERVICE_GXS_TYPE_GXSID, idAuthenPolicy()),
RsIdentity(this), GxsTokenQueue(this), RsTickEvent(),
mPublicKeyCache(DEFAULT_MEM_CACHE_SIZE, "GxsIdPublicKeyCache"),
mPrivateKeyCache(DEFAULT_MEM_CACHE_SIZE, "GxsIdPrivateKeyCache"),
mPublicKeyCache(GXSID_MAX_CACHE_SIZE, "GxsIdPublicKeyCache"),
mPrivateKeyCache(GXSID_MAX_CACHE_SIZE, "GxsIdPrivateKeyCache"),
mIdMtx("p3IdService"), mNes(nes),
mPgpUtils(pgpUtils)
{
@ -210,6 +212,32 @@ uint32_t p3IdService::idAuthenPolicy()
return policy;
}
bool p3IdService::isARegularContact(const RsGxsId& id)
{
RsStackMutex stack(mIdMtx);
return mContacts.find(id) != mContacts.end() ;
}
bool p3IdService::setAsRegularContact(const RsGxsId& id,bool b)
{
std::set<RsGxsId>::iterator it = mContacts.find(id) ;
if(b && (it == mContacts.end()))
{
mContacts.insert(id) ;
slowIndicateConfigChanged() ;
}
if( (!b) &&(it != mContacts.end()))
{
mContacts.erase(it) ;
slowIndicateConfigChanged() ;
}
return true ;
}
void p3IdService::slowIndicateConfigChanged()
{
time_t now = time(NULL) ;
@ -247,9 +275,19 @@ bool p3IdService::loadList(std::list<RsItem*>& items)
RsGxsIdLocalInfoItem *lii;
for(std::list<RsItem*>::const_iterator it = items.begin();it!=items.end();++it)
{
if( (lii = dynamic_cast<RsGxsIdLocalInfoItem*>(*it)) != NULL)
{
for(std::map<RsGxsId,time_t>::const_iterator it2 = lii->mTimeStamps.begin();it2!=lii->mTimeStamps.end();++it2)
mKeysTS.insert(*it2) ;
mContacts = lii->mContacts ;
}
delete *it ;
}
items.clear() ;
return true ;
}
@ -263,6 +301,7 @@ bool p3IdService::saveList(bool& cleanup,std::list<RsItem*>& items)
cleanup = true ;
RsGxsIdLocalInfoItem *item = new RsGxsIdLocalInfoItem ;
item->mTimeStamps = mKeysTS ;
item->mContacts = mContacts ;
items.push_back(item) ;
return true ;
@ -408,10 +447,15 @@ bool p3IdService:: getIdDetails(const RsGxsId &id, RsIdentityDetails &details)
{
details = data.details;
details.mLastUsageTS = locked_getLastUsageTS(id) ;
if(mContacts.find(id) != mContacts.end())
details.mFlags |= RS_IDENTITY_FLAGS_IS_A_CONTACT ;
// one utf8 symbol can be at most 4 bytes long - would be better to measure real unicode length !!!
if(details.mNickname.length() > RSID_MAXIMUM_NICKNAME_SIZE*4)
details.mNickname = "[too long a name]" ;
rsReputations->getReputationInfo(id,details.mReputation) ;
return true;
}
@ -421,6 +465,12 @@ bool p3IdService:: getIdDetails(const RsGxsId &id, RsIdentityDetails &details)
{
details = data.details;
details.mLastUsageTS = locked_getLastUsageTS(id) ;
if(mContacts.find(id) != mContacts.end())
details.mFlags |= RS_IDENTITY_FLAGS_IS_A_CONTACT ;
rsReputations->getReputationInfo(id,details.mReputation) ;
return true;
}
}
@ -831,7 +881,7 @@ bool p3IdService::getReputation(const RsGxsId &id, GixsReputation &rep)
if (mPublicKeyCache.fetch(id, data))
{
rep.id = id;
rep.score = data.details.mReputation.mOverallScore;
rep.score = 0;//data.details.mReputation.mOverallScore;
#ifdef DEBUG_IDS
std::cerr << "p3IdService::getReputation() id: ";
std::cerr << id.toStdString() << " score: " <<
@ -1022,33 +1072,33 @@ bool p3IdService::getGroupData(const uint32_t &token, std::vector<RsGxsIdGroup>
if (item)
{
#ifdef DEBUG_IDS
std::cerr << "p3IdService::getGroupData() Item is:";
std::cerr << std::endl;
item->print(std::cerr);
std::cerr << std::endl;
std::cerr << "p3IdService::getGroupData() Item is:";
std::cerr << std::endl;
item->print(std::cerr);
std::cerr << std::endl;
#endif // DEBUG_IDS
RsGxsIdGroup group ;
item->toGxsIdGroup(group,false) ;
RsGxsIdGroup group ;
item->toGxsIdGroup(group,false) ;
{
RS_STACK_MUTEX(mIdMtx) ;
group.mLastUsageTS = locked_getLastUsageTS(RsGxsId(group.mMeta.mGroupId)) ;
}
{
RS_STACK_MUTEX(mIdMtx) ;
group.mLastUsageTS = locked_getLastUsageTS(RsGxsId(group.mMeta.mGroupId)) ;
}
// Decode information from serviceString.
SSGxsIdGroup ssdata;
if (ssdata.load(group.mMeta.mServiceString))
{
group.mPgpKnown = ssdata.pgp.idKnown;
group.mPgpId = ssdata.pgp.pgpId;
group.mReputation = ssdata.score.rep;
// Decode information from serviceString.
SSGxsIdGroup ssdata;
if (ssdata.load(group.mMeta.mServiceString))
{
group.mPgpKnown = ssdata.pgp.idKnown;
group.mPgpId = ssdata.pgp.pgpId;
group.mReputation = ssdata.score.rep;
#ifdef DEBUG_IDS
std::cerr << "p3IdService::getGroupData() Success decoding ServiceString";
std::cerr << std::endl;
std::cerr << "\t mGpgKnown: " << group.mPgpKnown;
std::cerr << std::endl;
std::cerr << "\t mGpgId: " << group.mPgpId;
std::cerr << std::endl;
std::cerr << "p3IdService::getGroupData() Success decoding ServiceString";
std::cerr << std::endl;
std::cerr << "\t mGpgKnown: " << group.mPgpKnown;
std::cerr << std::endl;
std::cerr << "\t mGpgId: " << group.mPgpId;
std::cerr << std::endl;
#endif // DEBUG_IDS
}
else
@ -1056,13 +1106,15 @@ bool p3IdService::getGroupData(const uint32_t &token, std::vector<RsGxsIdGroup>
group.mPgpKnown = false;
group.mPgpId.clear();
std::cerr << "p3IdService::getGroupData() Failed to decode ServiceString \""
<< group.mMeta.mServiceString << "\"" ;
std::cerr << "p3IdService::getGroupData() Failed to decode ServiceString \""
<< group.mMeta.mServiceString << "\"" ;
std::cerr << std::endl;
}
group.mIsAContact = (mContacts.find(RsGxsId(group.mMeta.mGroupId)) != mContacts.end());
groups.push_back(group);
delete(item);
delete(item);
}
else
{
@ -1593,8 +1645,10 @@ RsGxsIdCache::RsGxsIdCache(const RsGxsIdGroupItem *item, const RsTlvSecurityKey
std::cerr << std::endl;
#endif // DEBUG_IDS
details.mIsOwnId = (item->meta.mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN);
details.mPgpLinked = (item->meta.mGroupFlags & RSGXSID_GROUPFLAG_REALID);
details.mFlags = 0 ;
if(item->meta.mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN) details.mFlags |= RS_IDENTITY_FLAGS_IS_OWN_ID;
if(item->meta.mGroupFlags & RSGXSID_GROUPFLAG_REALID) details.mFlags |= RS_IDENTITY_FLAGS_PGP_LINKED;
/* rest must be retrived from ServiceString */
updateServiceString(item->meta.mServiceString);
@ -1607,17 +1661,14 @@ void RsGxsIdCache::updateServiceString(std::string serviceString)
SSGxsIdGroup ssdata;
if (ssdata.load(serviceString))
{
if (details.mPgpLinked)
if (details.mFlags & RS_IDENTITY_FLAGS_PGP_LINKED)
{
details.mPgpKnown = ssdata.pgp.idKnown;
if (details.mPgpKnown)
{
if(ssdata.pgp.idKnown) details.mFlags |= RS_IDENTITY_FLAGS_PGP_KNOWN ;
if (details.mFlags & RS_IDENTITY_FLAGS_PGP_KNOWN)
details.mPgpId = ssdata.pgp.pgpId;
}
else
{
details.mPgpId.clear();
}
}
@ -1669,14 +1720,14 @@ void RsGxsIdCache::updateServiceString(std::string serviceString)
}
// copy over Reputation scores.
details.mReputation = ssdata.score.rep;
//details.mReputation = ssdata.score.rep;
}
else
{
details.mPgpKnown = false;
details.mFlags &= ~RS_IDENTITY_FLAGS_PGP_KNOWN ;
details.mPgpId.clear();
details.mReputation.updateIdScore(false, false);
details.mReputation.update();
//details.mReputation.updateIdScore(false, false);
//details.mReputation.update();
}
}

View file

@ -187,10 +187,6 @@ virtual std::string save() const;
#define ID_LOCAL_STATUS_FULL_CALC_FLAG 0x00010000
#define ID_LOCAL_STATUS_INC_CALC_FLAG 0x00020000
#define MAX_CACHE_SIZE 100 // Small for testing..
//#define MAX_CACHE_SIZE 10000 // More useful size
class RsGxsIdGroupItem;
class RsGxsIdCache
@ -269,6 +265,9 @@ virtual bool parseRecognTag(const RsGxsId &id, const std::string &nickname,
virtual bool getRecognTagRequest(const RsGxsId &id, const std::string &comment,
uint16_t tag_class, uint16_t tag_type, std::string &tag);
virtual bool setAsRegularContact(const RsGxsId& id,bool is_a_contact) ;
virtual bool isARegularContact(const RsGxsId& id) ;
/**************** RsGixs Implementation ***************/
virtual bool getOwnIds(std::list<RsGxsId> &ownIds);
@ -499,8 +498,11 @@ std::string genRandomId(int len = 20);
std::map<uint32_t, std::set<RsGxsGroupId> > mIdsPendingCache;
std::map<uint32_t, std::list<RsGxsGroupId> > mGroupNotPresent;
std::map<RsGxsId, std::list<RsPeerId> > mIdsNotPresent;
std::map<RsGxsId,time_t> mKeysTS ;
std::map<RsGxsId, std::list<RsPeerId> > mIdsNotPresent;
std::map<RsGxsId,time_t> mKeysTS ;
// keep a list of regular contacts. This is useful to sort IDs, and allow some services to priviledged ids only.
std::set<RsGxsId> mContacts;
RsNetworkExchangeService* mNes;
/**************************

View file

@ -87,6 +87,7 @@ p3MsgService::p3MsgService(p3ServiceControl *sc, p3IdService *id_serv)
mShouldEnableDistantMessaging = true ;
mDistantMessagingEnabled = false ;
mDistantMessagePermissions = RS_DISTANT_MESSAGING_CONTACT_PERMISSION_FLAG_FILTER_NONE ;
/* Initialize standard tag types */
if(sc)
@ -187,19 +188,26 @@ void p3MsgService::processMsg(RsMsgItem *mi, bool incoming)
msi->srcId = mi->PeerId();
mSrcIds.insert(std::pair<uint32_t, RsMsgSrcId*>(msi->msgId, msi));
IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/
/**** STACK UNLOCKED ***/
}
// If the peer is allowed to push files, then auto-download the recommended files.
if(rsPeers->servicePermissionFlags(mi->PeerId()) & RS_NODE_PERM_ALLOW_PUSH)
for(std::list<RsTlvFileItem>::const_iterator it(mi->attachment.items.begin());it!=mi->attachment.items.end();++it)
rsFiles->FileRequest((*it).name,(*it).hash,(*it).filesize,std::string(),RS_FILE_REQ_ANONYMOUS_ROUTING,std::list<RsPeerId>()) ;
if (incoming)
{
// If the peer is allowed to push files, then auto-download the recommended files.
if(rsPeers->servicePermissionFlags(mi->PeerId()) & RS_NODE_PERM_ALLOW_PUSH)
{
std::list<RsPeerId> srcIds;
srcIds.push_back(mi->PeerId());
for(std::list<RsTlvFileItem>::const_iterator it(mi->attachment.items.begin());it!=mi->attachment.items.end();++it)
rsFiles->FileRequest((*it).name,(*it).hash,(*it).filesize,std::string(),RS_FILE_REQ_ANONYMOUS_ROUTING,srcIds) ;
}
}
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_ADD);
/**** STACK UNLOCKED ***/
}
bool p3MsgService::checkAndRebuildPartialMessage(RsMsgItem *ci)
{
// Check is the item is ending an incomplete item.
@ -461,6 +469,10 @@ bool p3MsgService::saveList(bool& cleanup, std::list<RsItem*>& itemList)
kv.key = "DISTANT_MESSAGES_ENABLED" ;
kv.value = mShouldEnableDistantMessaging?"YES":"NO" ;
vitem->tlvkvs.pairs.push_back(kv) ;
kv.key = "DISTANT_MESSAGE_PERMISSION_FLAGS" ;
kv.value = RsUtil::NumberToString(mDistantMessagePermissions) ;
vitem->tlvkvs.pairs.push_back(kv) ;
itemList.push_back(vitem) ;
@ -532,7 +544,7 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
RsMsgSrcId* msi;
RsMsgParentId* msp;
RsMsgGRouterMap* grm;
// RsPublicMsgInviteConfigItem* msv;
// RsPublicMsgInviteConfigItem* msv;
std::list<RsMsgItem*> items;
std::list<RsItem*>::iterator it;
@ -540,130 +552,155 @@ bool p3MsgService::loadList(std::list<RsItem*>& load)
std::map<uint32_t, RsPeerId> srcIdMsgMap;
std::map<uint32_t, RsPeerId>::iterator srcIt;
// load items and calculate next unique msgId
for(it = load.begin(); it != load.end(); ++it)
{
// load items and calculate next unique msgId
for(it = load.begin(); it != load.end(); ++it)
{
if (NULL != (mitem = dynamic_cast<RsMsgItem *>(*it)))
{
/* STORE MsgID */
if (mitem->msgId >= mMsgUniqueId) {
mMsgUniqueId = mitem->msgId + 1;
}
items.push_back(mitem);
}
else if (NULL != (grm = dynamic_cast<RsMsgGRouterMap *>(*it)))
{
// merge.
for(std::map<GRouterMsgPropagationId,uint32_t>::const_iterator it(grm->ongoing_msgs.begin());it!=grm->ongoing_msgs.end();++it)
_ongoing_messages.insert(*it) ;
}
else if(NULL != (mtt = dynamic_cast<RsMsgTagType *>(*it)))
{
// delete standard tags as they are now save in config
if(mTags.end() == (tagIt = mTags.find(mtt->tagId)))
{
mTags.insert(std::pair<uint32_t, RsMsgTagType* >(mtt->tagId, mtt));
}
else
{
delete mTags[mtt->tagId];
mTags.erase(tagIt);
mTags.insert(std::pair<uint32_t, RsMsgTagType* >(mtt->tagId, mtt));
}
if (NULL != (mitem = dynamic_cast<RsMsgItem *>(*it)))
{
/* STORE MsgID */
if (mitem->msgId >= mMsgUniqueId) {
mMsgUniqueId = mitem->msgId + 1;
}
items.push_back(mitem);
}
else if (NULL != (grm = dynamic_cast<RsMsgGRouterMap *>(*it)))
{
// merge.
for(std::map<GRouterMsgPropagationId,uint32_t>::const_iterator it(grm->ongoing_msgs.begin());it!=grm->ongoing_msgs.end();++it)
_ongoing_messages.insert(*it) ;
}
else if(NULL != (mtt = dynamic_cast<RsMsgTagType *>(*it)))
{
// delete standard tags as they are now save in config
if(mTags.end() == (tagIt = mTags.find(mtt->tagId)))
{
mTags.insert(std::pair<uint32_t, RsMsgTagType* >(mtt->tagId, mtt));
}
else
{
delete mTags[mtt->tagId];
mTags.erase(tagIt);
mTags.insert(std::pair<uint32_t, RsMsgTagType* >(mtt->tagId, mtt));
}
}
else if(NULL != (mti = dynamic_cast<RsMsgTags *>(*it)))
{
mMsgTags.insert(std::pair<uint32_t, RsMsgTags* >(mti->msgId, mti));
}
else if(NULL != (msi = dynamic_cast<RsMsgSrcId *>(*it)))
{
srcIdMsgMap.insert(std::pair<uint32_t, RsPeerId>(msi->msgId, msi->srcId));
mSrcIds.insert(std::pair<uint32_t, RsMsgSrcId*>(msi->msgId, msi)); // does not need to be kept
}
else if(NULL != (msp = dynamic_cast<RsMsgParentId *>(*it)))
{
mParentId.insert(std::pair<uint32_t, RsMsgParentId*>(msp->msgId, msp));
}
}
else if(NULL != (mti = dynamic_cast<RsMsgTags *>(*it)))
{
mMsgTags.insert(std::pair<uint32_t, RsMsgTags* >(mti->msgId, mti));
}
else if(NULL != (msi = dynamic_cast<RsMsgSrcId *>(*it)))
{
srcIdMsgMap.insert(std::pair<uint32_t, RsPeerId>(msi->msgId, msi->srcId));
mSrcIds.insert(std::pair<uint32_t, RsMsgSrcId*>(msi->msgId, msi)); // does not need to be kept
}
else if(NULL != (msp = dynamic_cast<RsMsgParentId *>(*it)))
{
mParentId.insert(std::pair<uint32_t, RsMsgParentId*>(msp->msgId, msp));
}
RsConfigKeyValueSet *vitem = NULL ;
RsConfigKeyValueSet *vitem = NULL ;
if(NULL != (vitem = dynamic_cast<RsConfigKeyValueSet*>(*it)))
for(std::list<RsTlvKeyValue>::const_iterator kit = vitem->tlvkvs.pairs.begin(); kit != vitem->tlvkvs.pairs.end(); ++kit)
if(kit->key == "DISTANT_MESSAGES_ENABLED")
{
if(NULL != (vitem = dynamic_cast<RsConfigKeyValueSet*>(*it)))
{
for(std::list<RsTlvKeyValue>::const_iterator kit = vitem->tlvkvs.pairs.begin(); kit != vitem->tlvkvs.pairs.end(); ++kit)
{
if(kit->key == "DISTANT_MESSAGES_ENABLED")
{
#ifdef MSG_DEBUG
std::cerr << "Loaded config default nick name for distant chat: " << kit->value << std::endl ;
std::cerr << "Loaded config default nick name for distant chat: " << kit->value << std::endl ;
#endif
mShouldEnableDistantMessaging = (kit->value == "YES") ;
}
mShouldEnableDistantMessaging = (kit->value == "YES") ;
}
if(kit->key == "DISTANT_MESSAGE_PERMISSION_FLAGS")
{
#ifdef MSG_DEBUG
std::cerr << "Loaded distant message permission flags: " << kit->value << std::endl ;
#endif
if (!kit->value.empty())
{
std::istringstream is(kit->value) ;
}
uint32_t tmp ;
is >> tmp ;
// sort items into lists
std::list<RsMsgItem*>::iterator msgIt;
for (msgIt = items.begin(); msgIt != items.end(); ++msgIt)
{
mitem = *msgIt;
if(tmp < 3)
mDistantMessagePermissions = tmp ;
else
std::cerr << "(EE) Invalid value read for DistantMessagePermission flags in config: " << tmp << std::endl;
}
}
}
/* STORE MsgID */
if (mitem->msgId == 0) {
delete *it ;
continue ;
}
}
load.clear() ;
// sort items into lists
std::list<RsMsgItem*>::iterator msgIt;
for (msgIt = items.begin(); msgIt != items.end(); ++msgIt)
{
mitem = *msgIt;
/* STORE MsgID */
if (mitem->msgId == 0) {
mitem->msgId = getNewUniqueMsgId();
}
}
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
srcIt = srcIdMsgMap.find(mitem->msgId);
if(srcIt != srcIdMsgMap.end()) {
mitem->PeerId(srcIt->second);
srcIdMsgMap.erase(srcIt);
}
srcIt = srcIdMsgMap.find(mitem->msgId);
if(srcIt != srcIdMsgMap.end()) {
mitem->PeerId(srcIt->second);
srcIdMsgMap.erase(srcIt);
}
/* switch depending on the PENDING
/* switch depending on the PENDING
* flags
*/
if (mitem -> msgFlags & RS_MSG_FLAGS_PENDING)
{
if (mitem -> msgFlags & RS_MSG_FLAGS_PENDING)
{
//std::cerr << "MSG_PENDING";
//std::cerr << std::endl;
//mitem->print(std::cerr);
//std::cerr << "MSG_PENDING";
//std::cerr << std::endl;
//mitem->print(std::cerr);
msgOutgoing[mitem->msgId] = mitem;
}
else
{
imsg[mitem->msgId] = mitem;
}
}
msgOutgoing[mitem->msgId] = mitem;
}
else
{
imsg[mitem->msgId] = mitem;
}
}
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
/* remove missing msgId in mSrcIds */
for (srcIt = srcIdMsgMap.begin(); srcIt != srcIdMsgMap.end(); ++srcIt) {
std::map<uint32_t, RsMsgSrcId*>::iterator it = mSrcIds.find(srcIt->first);
if (it != mSrcIds.end()) {
delete(it->second);
mSrcIds.erase(it);
}
}
/* remove missing msgId in mSrcIds */
for (srcIt = srcIdMsgMap.begin(); srcIt != srcIdMsgMap.end(); ++srcIt) {
std::map<uint32_t, RsMsgSrcId*>::iterator it = mSrcIds.find(srcIt->first);
if (it != mSrcIds.end()) {
delete(it->second);
mSrcIds.erase(it);
}
}
/* remove missing msgId in mParentId */
std::map<uint32_t, RsMsgParentId *>::iterator mit = mParentId.begin();
while (mit != mParentId.end()) {
if (imsg.find(mit->first) == imsg.end()) {
if (msgOutgoing.find(mit->first) == msgOutgoing.end()) {
/* not found */
mParentId.erase(mit++);
continue;
}
}
++mit;
}
return true;
/* remove missing msgId in mParentId */
std::map<uint32_t, RsMsgParentId *>::iterator mit = mParentId.begin();
while (mit != mParentId.end()) {
if (imsg.find(mit->first) == imsg.end()) {
if (msgOutgoing.find(mit->first) == msgOutgoing.end()) {
/* not found */
mParentId.erase(mit++);
continue;
}
}
++mit;
}
return true;
}
void p3MsgService::loadWelcomeMsg()
@ -1784,6 +1821,32 @@ void p3MsgService::notifyDataStatus(const GRouterMsgPropagationId& id,uint32_t d
RsServer::notify()->notifyListChange(NOTIFY_LIST_MESSAGELIST,NOTIFY_TYPE_ADD);
IndicateConfigChanged() ;
}
bool p3MsgService::acceptDataFromPeer(const RsGxsId& to_gxs_id)
{
if(mDistantMessagePermissions & RS_DISTANT_MESSAGING_CONTACT_PERMISSION_FLAG_FILTER_NON_CONTACTS)
return (rsIdentity!=NULL) && rsIdentity->isARegularContact(to_gxs_id) ;
if(mDistantMessagePermissions & RS_DISTANT_MESSAGING_CONTACT_PERMISSION_FLAG_FILTER_EVERYBODY)
return false ;
return true ;
}
void p3MsgService::setDistantMessagingPermissionFlags(uint32_t flags)
{
if(flags != mDistantMessagePermissions)
{
mDistantMessagePermissions = flags ;
IndicateConfigChanged() ;
}
}
uint32_t p3MsgService::getDistantMessagingPermissionFlags()
{
return mDistantMessagePermissions ;
}
void p3MsgService::receiveGRouterData(const RsGxsId& destination_key, const RsGxsId& signing_key,GRouterServiceId& client_id,uint8_t *data,uint32_t data_size)
{
std::cerr << "p3MsgService::receiveGRouterData(): received message item of size " << data_size << ", for key " << destination_key << std::endl;

View file

@ -90,6 +90,7 @@ class p3MsgService: public p3Service, public p3Config, public pqiServiceMonitor,
void loadWelcomeMsg(); /* startup message */
//std::list<RsMsgItem *> &getMsgList();
//std::list<RsMsgItem *> &getMsgOutList();
@ -125,6 +126,9 @@ class p3MsgService: public p3Service, public p3Config, public pqiServiceMonitor,
};
void enableDistantMessaging(bool b) ;
bool distantMessagingEnabled() ;
void setDistantMessagingPermissionFlags(uint32_t flags) ;
uint32_t getDistantMessagingPermissionFlags() ;
private:
void sendDistantMsgItem(RsMsgItem *msgitem) ;
@ -136,6 +140,7 @@ class p3MsgService: public p3Service, public p3Config, public pqiServiceMonitor,
// Overloaded from GRouterClientService
virtual bool acceptDataFromPeer(const RsGxsId& gxs_id) ;
virtual void receiveGRouterData(const RsGxsId& destination_key,const RsGxsId& signing_key, GRouterServiceId &client_id, uint8_t *data, uint32_t data_size) ;
virtual void notifyDataStatus(const GRouterMsgPropagationId& msg_id,uint32_t data_status) ;
@ -202,6 +207,7 @@ class p3MsgService: public p3Service, public p3Config, public pqiServiceMonitor,
std::string config_dir;
bool mDistantMessagingEnabled ;
uint32_t mDistantMessagePermissions ;
bool mShouldEnableDistantMessaging ;
};

View file

@ -309,42 +309,34 @@ bool p3StatusService::saveList(bool& cleanup, std::list<RsItem*>& ilist){
return true;
}
bool p3StatusService::loadList(std::list<RsItem*>& load){
// load your status from last rs session
StatusInfo own_info;
std::list<RsItem*>::const_iterator it = load.begin();
if(it == load.end()){
std::cerr << "p3StatusService::loadList(): Failed to load " << std::endl;
return false;
}
for(; it != load.end(); ++it){
RsStatusItem* own_status = dynamic_cast<RsStatusItem* >(*it);
bool p3StatusService::loadList(std::list<RsItem*>& load)
{
// load your status from last rs session
StatusInfo own_info;
if(own_status != NULL){
for(std::list<RsItem*>::const_iterator it = load.begin() ; it != load.end(); ++it)
{
RsStatusItem* own_status = dynamic_cast<RsStatusItem* >(*it);
own_info.id = mServiceCtrl->getOwnId();
own_info.status = own_status->status;
own_info.time_stamp = own_status->sendTime;
delete own_status;
if(own_status != NULL)
{
own_info.id = mServiceCtrl->getOwnId();
own_info.status = own_status->status;
own_info.time_stamp = own_status->sendTime;
{
RsStackMutex stack(mStatusMtx);
std::pair<RsPeerId, StatusInfo> pr(mServiceCtrl->getOwnId(), own_info);
mStatusInfoMap.insert(pr);
}
{
RsStackMutex stack(mStatusMtx);
std::pair<RsPeerId, StatusInfo> pr(mServiceCtrl->getOwnId(), own_info);
mStatusInfoMap.insert(pr);
}
return true;
}else{
std::cerr << "p3StatusService::loadList " << "Failed to load list "
<< std::endl;
}
}
}
return false;
delete *it ;
}
load.clear() ;
return false;
}