Merge branch 'master'

This commit is contained in:
zeners 2016-03-12 16:46:33 +01:00
commit b1da4ed67e
55 changed files with 650 additions and 267 deletions

View file

@ -1141,9 +1141,17 @@ void p3GRouter::locked_collectAvailableFriends(const GRouterKeyId& gxs_id,const
for(uint32_t i=0;i<tmp_peers.size();++i)
if(incoming_routes.find(tmp_peers[i]) != incoming_routes.end())
{
#ifdef GROUTER_DEBUG
std::cerr << " removing " << tmp_peers[i] << " which is an incoming route" << std::endl;
#endif
}
else if(probas[i] < RS_GROUTER_PROBABILITY_THRESHOLD_BEST_PEERS_SELECT*max_probability)
{
#ifdef GROUTER_DEBUG
std::cerr << " removing " << tmp_peers[i] << " because probability is below best peers threshold" << std::endl;
#endif
}
else
{
tmp_peers[count] = tmp_peers[i] ;
@ -1200,7 +1208,9 @@ void p3GRouter::locked_collectAvailableFriends(const GRouterKeyId& gxs_id,const
uint32_t real_dupl = std::min( duplication_factor - max_count+1,std::max(1u,uint32_t(rint(ideal_dupl)))) ;
duplication_factor_delta = real_dupl - ideal_dupl ;
#ifdef GROUTER_DEBUG
std::cerr << " Peer " << mypairs[i].second << " prob=" << mypairs[i].first << ", ideal_dupl=" << ideal_dupl << ", real=" << real_dupl << ". Delta = " << duplication_factor_delta << std::endl;
#endif
friend_peers_and_duplication_factors[ mypairs[i].second ] = real_dupl ; // should be updated correctly
}
@ -1533,13 +1543,20 @@ void p3GRouter::handleIncomingReceiptItem(RsGRouterSignedReceiptItem *receipt_it
else
std::cerr << " checking receipt hash : OK" << std::endl;
#endif
// check signature.
// check signature. The policy if the following:
// if we're the destination:
// signature should check and signing key should be available // always ensures the receipt is valid
// else
// if key is available, signature should check // early protects against frodulent receipts that we can check
uint32_t error_status ;
if(! verifySignedDataItem(receipt_item))
{
std::cerr << " checking receipt signature : FAILED. Receipt is dropped." << std::endl;
return ;
}
if(! verifySignedDataItem(receipt_item,error_status))
if( (it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN) || (error_status != RsGixs::RS_GIXS_ERROR_KEY_NOT_AVAILABLE))
{
std::cerr << " checking receipt signature : FAILED. Receipt is dropped. Error status=" << error_status << std::endl;
return ;
}
#ifdef GROUTER_DEBUG
std::cerr << " checking receipt signature : OK. " << std::endl;
std::cerr << " removing messsage from cache." << std::endl;
@ -1688,7 +1705,9 @@ void p3GRouter::handleIncomingDataItem(RsGRouterGenericDataItem *data_item)
#ifdef GROUTER_DEBUG
std::cerr << " step B: item is for us and is new, so make sure it's authentic and create a receipt" << std::endl;
#endif
if(!verifySignedDataItem(data_item)) // we should get proper flags out of this
uint32_t error_status ;
if(!verifySignedDataItem(data_item,error_status)) // we should get proper flags out of this
{
std::cerr << " verifying item signature: FAILED! Droping that item" ;
std::cerr << " You probably received a message from a person you don't have key." << std::endl;
@ -1968,7 +1987,7 @@ bool p3GRouter::signDataItem(RsGRouterAbstractMsgItem *item,const RsGxsId& signi
return false ;
}
}
bool p3GRouter::verifySignedDataItem(RsGRouterAbstractMsgItem *item)
bool p3GRouter::verifySignedDataItem(RsGRouterAbstractMsgItem *item,uint32_t& error_status)
{
try
{
@ -1987,9 +2006,6 @@ bool p3GRouter::verifySignedDataItem(RsGRouterAbstractMsgItem *item)
if(!item->serialise_signed_data(data,data_size))
throw std::runtime_error("Cannot serialise signed data.") ;
uint32_t error_status ;
if(!mGixs->validateData(data,data_size,item->signature,true,error_status))
{
switch(error_status)
@ -2105,8 +2121,9 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie
}
// Verify the signature. If that fails, there's a bug somewhere!!
if(!verifySignedDataItem(data_item))
uint32_t error_status;
if(!verifySignedDataItem(data_item,error_status))
{
std::cerr << "Cannot verify data item that was just signed. Some error occured!" << std::endl;
delete data_item;

View file

@ -263,7 +263,7 @@ private:
// signs an item with the given key.
bool signDataItem(RsGRouterAbstractMsgItem *item,const RsGxsId& id) ;
bool verifySignedDataItem(RsGRouterAbstractMsgItem *item) ;
bool verifySignedDataItem(RsGRouterAbstractMsgItem *item, uint32_t &error_status) ;
bool encryptDataItem(RsGRouterGenericDataItem *item,const RsGxsId& destination_key) ;
bool decryptDataItem(RsGRouterGenericDataItem *item) ;

View file

@ -950,6 +950,7 @@ int RsDataService::updateGroup(std::map<RsNxsGrp *, RsGxsGrpMetaData *> &grp)
cv.put(KEY_GRP_ORIGINATOR, grpMetaPtr->mOriginator.toStdString());
cv.put(KEY_GRP_AUTHEN_FLAGS, (int32_t)grpMetaPtr->mAuthenFlags);
cv.put(KEY_NXS_HASH, grpMetaPtr->mHash.toStdString());
cv.put(KEY_RECV_TS, (int32_t)grpMetaPtr->mRecvTS);
cv.put(KEY_NXS_IDENTITY, grpMetaPtr->mAuthorId.toStdString());
offset = 0;

View file

@ -2013,8 +2013,12 @@ void RsGenExchange::publishMsgs()
grpId = msg->grpId;
msg->metaData->recvTS = time(NULL);
mRoutingClues[msg->metaData->mAuthorId].insert(rsPeers->getOwnId()) ;
mTrackingClues.push_back(std::make_pair(msg->msgId,rsPeers->getOwnId())) ;
// FIXTESTS global variable rsPeers not available in unittests!
if(rsPeers)
{
mRoutingClues[msg->metaData->mAuthorId].insert(rsPeers->getOwnId()) ;
mTrackingClues.push_back(std::make_pair(msg->msgId,rsPeers->getOwnId())) ;
}
computeHash(msg->msg, msg->metaData->mHash);
mDataAccess->addMsgData(msg);
@ -2327,7 +2331,6 @@ void RsGenExchange::publishGrps()
{
grp->metaData = new RsGxsGrpMetaData();
grpItem->meta.mPublishTs = time(NULL);
//grpItem->meta.mParentGrpId = std::string("empty");
*(grp->metaData) = grpItem->meta;
// TODO: change when publish key optimisation added (public groups don't have publish key
@ -2799,13 +2802,17 @@ void RsGenExchange::processRecvdGroups()
std::cerr << " Group routage info: Identity=" << meta->mAuthorId << " from " << grp->PeerId() << std::endl;
#endif
if(!meta->mAuthorId.isNull())
mRoutingClues[meta->mAuthorId].insert(grp->PeerId()) ;
if(!meta->mAuthorId.isNull())
mRoutingClues[meta->mAuthorId].insert(grp->PeerId()) ;
// This has been moved here (as opposed to inside part for new groups below) because it is used to update the server TS when updates
// of grp metadata arrive.
meta->mRecvTS = time(NULL);
// now check if group already existss
if(std::find(existingGrpIds.begin(), existingGrpIds.end(), grp->grpId) == existingGrpIds.end())
{
meta->mRecvTS = time(NULL);
if(meta->mCircleType == GXS_CIRCLE_TYPE_YOUREYESONLY)
meta->mOriginator = grp->PeerId();
@ -2919,6 +2926,11 @@ void RsGenExchange::performUpdateValidation()
if(gu.newGrp->metaData->mCircleType == GXS_CIRCLE_TYPE_YOUREYESONLY)
gu.newGrp->metaData->mOriginator = gu.newGrp->PeerId();
// Keep subscriptionflag to what it was. This avoids clearing off the flag when updates to group meta information
// is received.
gu.newGrp->metaData->mSubscribeFlags = gu.oldGrpMeta->mSubscribeFlags ;
grps.insert(std::make_pair(gu.newGrp, gu.newGrp->metaData));
}
else
@ -2930,6 +2942,18 @@ void RsGenExchange::performUpdateValidation()
}
mDataStore->updateGroup(grps);
// notify the client
RsGxsGroupChange* c = new RsGxsGroupChange(RsGxsNotify::TYPE_RECEIVE, true);
for(uint32_t i=0;i<mGroupUpdates.size();++i)
c->mGrpIdList.push_back(mGroupUpdates[i].oldGrpMeta->mGroupId) ;
mNotifications.push_back(c);
// cleanup
mGroupUpdates.clear();
}

View file

@ -482,6 +482,13 @@ public:
static float computeCurrentSendingProbability()
{
// FIXTESTS global variable rsConfig not available in unittests!
if(rsConfig == 0)
{
std::cerr << "computeCurrentSendingProbability(): rsConfig not initialised, returning 1.0"<<std::endl;
return 1.0;
}
int maxIn=50,maxOut=50;
float currIn=0,currOut=0 ;
@ -1830,19 +1837,15 @@ void RsGxsNetService::updateServerSyncTS()
// retrieve all grps and update TS
mDataStore->retrieveGxsGrpMetaData(gxsMap);
#ifdef TO_REMOVE
// (cyril) This code is removed because it is inconsistent: the list of grps does not need to be updated when
// new posts arrive. The two (grp list and msg list) are handled independently.
// (cyril) This code was previously removed because it sounded inconsistent: the list of grps normally does not need to be updated when
// new posts arrive. The two (grp list and msg list) are handled independently. Still, when group meta data updates are received,
// the server TS needs to be updated, because it is the only way to propagate the changes. So we update it to the publish time stamp,
// if needed.
// as a grp list server also note this is the latest item you have
if(mGrpServerUpdateItem == NULL)
mGrpServerUpdateItem = new RsGxsServerGrpUpdateItem(mServType);
// First reset it. That's important because it will re-compute correct TS in case
// we have unsubscribed a group.
mGrpServerUpdateItem->grpUpdateTS = 0 ;
#endif
bool change = false;
// then remove from mServerMsgUpdateMap, all items that are not in the group list!
@ -1914,19 +1917,18 @@ void RsGxsNetService::updateServerSyncTS()
#endif
}
#ifdef TO_REMOVE
// This might be very inefficient with time. This is needed because an old message might have been received, so the last modification time
// needs to account for this so that a friend who hasn't
// This is needed for group metadata updates to actually propagate: only a new grpUpdateTS will trigger the exchange of groups mPublishTs which
// will then be compared and pssibly trigger a MetaData transmission. mRecvTS is upated when creating, receiving for the first time, or receiving
// an update, all in rsgenexchange.cc, after group/update validation. It is therefore a local TS, that can be compared to grpUpdateTS (same machine).
if(mGrpServerUpdateItem->grpUpdateTS < grpMeta->mRecvTS)
{
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG__G(grpId) << " updated msgUpdateTS to last RecvTS = " << time(NULL) - grpMeta->mRecvTS << " secs ago for group "<< grpId << std::endl;
GXSNETDEBUG__G(grpId) << " updated msgUpdateTS to last RecvTS = " << time(NULL) - grpMeta->mRecvTS << " secs ago for group "<< grpId << ". This is probably because an update has been received." << std::endl;
#endif
mGrpServerUpdateItem->grpUpdateTS = grpMeta->mRecvTS;
change = true;
}
#endif
}
// actual change in config settings, then save configuration
@ -2705,8 +2707,9 @@ void RsGxsNetService::locked_genReqMsgTransaction(NxsTransaction* tr)
#endif
continue;
}
if(rsReputations->isIdentityBanned(syncItem->authorId))
// FIXTESTS global variable rsReputations not available in unittests!
if(rsReputations == 0){ std::cerr << "rsReputations==0, accepting all messages!" << std::endl; }
if(rsReputations && rsReputations->isIdentityBanned(syncItem->authorId))
{
#ifdef NXS_NET_DEBUG_1
GXSNETDEBUG_PG(item->PeerId(),grpId) << ", Identity " << syncItem->authorId << " is banned. Not requesting message!" << std::endl;
@ -2944,8 +2947,9 @@ void RsGxsNetService::locked_genReqGrpTransaction(NxsTransaction* tr)
haveItem = true;
latestVersion = grpSyncItem->publishTs > metaIter->second->mPublishTs;
}
if(!grpSyncItem->authorId.isNull() && rsReputations->isIdentityBanned(grpSyncItem->authorId))
// FIXTESTS global variable rsReputations not available in unittests!
if(rsReputations == 0){ std::cerr << "rsReputations==0, accepting all groups!" << std::endl; }
if(!grpSyncItem->authorId.isNull() && rsReputations && rsReputations->isIdentityBanned(grpSyncItem->authorId))
{
#ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_PG(tr->mTransaction->PeerId(),grpId) << " Identity " << grpSyncItem->authorId << " is banned. Not syncing group." << std::endl;

View file

@ -2732,10 +2732,51 @@ bool p3PeerMgrIMPL::removeBannedIps()
// return ret;
// }
/**
* @brief p3PeerMgrIMPL::removeUnusedLocations Removes all location offline for RS_PEER_OFFLINE_DELETE seconds or more. Keeps the most recent location per PGP id.
* @return true on success
*
* This function removes all location that are offline for too long defined by RS_PEER_OFFLINE_DELETE.
* It also makes sure that at least one location (the most recent) is kept.
*
* The idea of the function is the following:
* - keep track if there is at least one location per PGP id that is not offline for too long
* -> hasRecentLocation
* - keep track of most recent location per PGP id that is offline for too long (and its time stamp)
* -> mostRecentLocation
* -> mostRecentTime
*
* When a location is found that is offline for too long the following points are checked from the top to the bottom:
* 1) remove it when the PGP id has a location that is not offline for too long
* 2) remove it when the PGP id has a more recent location
* 3) keep it when it is the most recent location
* This location will possibly be removed when a more recent (but still offline for too long) is found
*/
bool p3PeerMgrIMPL::removeUnusedLocations()
{
std::list<RsPeerId> toRemove;
std::map<RsPgpId, bool> hasRecentLocation;
std::map<RsPgpId, time_t> mostRecentTime;
std::map<RsPgpId, RsPeerId> mostRecentLocation;
// init maps
{
std::list<RsPgpId> pgpList;
if(!rsPeers->getGPGAcceptedList(pgpList))
return false;
std::list<RsPgpId>::iterator it;
for(it = pgpList.begin(); it != pgpList.end(); ++it)
{
hasRecentLocation[*it] = false;
mostRecentTime[*it] = (time_t)0;
}
}
const time_t now = time(NULL);
RsPgpId pgpID;
{
RsStackMutex stack(mPeerMtx); /****** STACK LOCK MUTEX *******/
@ -2744,19 +2785,71 @@ bool p3PeerMgrIMPL::removeUnusedLocations()
std::cerr << "p3PeerMgr::removeUnusedLocations()" << std::endl;
#endif
time_t now = time(NULL);
std::map<RsPeerId, peerState>::iterator it;
for(it = mFriendList.begin(); it != mFriendList.end(); ++it)
{
pgpID = it->second.gpg_id;
// store some references to speed up accessing
RsPeerId &idRef = mostRecentLocation[pgpID];
bool &recentRef = hasRecentLocation[pgpID];
if (now > it->second.lastcontact + RS_PEER_OFFLINE_DELETE)
{
toRemove.push_back(it->first);
// location is too old
if(recentRef)
{
// there is already one location that won't get removed
// -> we can safely remove this one
toRemove.push_back(it->first);
#ifdef PEER_DEBUG
std::cerr << "p3PeerMgr::removeUnusedLocations() removing Old SSL Id: " << it->first << std::endl;
std::cerr << "p3PeerMgr::removeUnusedLocations() removing Old SSL Id: " << it->first << std::endl;
#endif
}
else
{
// we need to take care that the most recent location it not removed
time_t &timeRef = mostRecentTime[pgpID];
if(timeRef > it->second.lastcontact)
{
// this (it) location is longer offline compared to mostRecentLocation
// -> we can remove this one
toRemove.push_back(it->first);
#ifdef PEER_DEBUG
std::cerr << "p3PeerMgr::removeUnusedLocations() removing Old SSL Id: " << it->first << std::endl;
#endif
}
else
{
// this (it) location is more recent compared to mostRecentLocation
// -> we can remove mostRecentLocation
if(!idRef.isNull())
{
toRemove.push_back(idRef);
#ifdef PEER_DEBUG
std::cerr << "p3PeerMgr::removeUnusedLocations() removing Old SSL Id: " << it->first << std::endl;
#endif
}
// update maps
idRef = it->first;
timeRef = it->second.lastcontact;
}
}
}
else
{
// found a location that won't get removed
recentRef = true;
// we can remove mostRecentLocation if it is set
if(!idRef.isNull())
{
toRemove.push_back(idRef);
#ifdef PEER_DEBUG
std::cerr << "p3PeerMgr::removeUnusedLocations() removing Old SSL Id: " << it->first << std::endl;
#endif
}
}
// if (isDummyFriend(it->first))
@ -2771,11 +2864,11 @@ bool p3PeerMgrIMPL::removeUnusedLocations()
}
}
std::list<RsPeerId>::iterator it;
std::list<RsPeerId>::iterator it;
for(it = toRemove.begin(); it != toRemove.end(); ++it)
{
removeFriend(*it,false);
removeFriend(*it, false);
}
return true;

View file

@ -823,8 +823,6 @@ continue_packet:
std::cerr << "[" << (void*)pthread_self() << "] " << "deserializing. Size=" << pktlen << std::endl ;
#endif
inReadBytes_locked(pktlen); // only count deserialised packets, because that's what is actually been transfered.
RsItem *pkt = mRsSerialiser->deserialise(block, &pktlen);
if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt,pktlen)))
@ -832,6 +830,7 @@ continue_packet:
#ifdef DEBUG_PQISTREAMER
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "Successfully Read a Packet!");
#endif
inReadBytes_locked(pktlen); // only count deserialised packets, because that's what is actually been transfered.
}
else
{
@ -952,7 +951,7 @@ int pqistreamer::inAllowedBytes_locked()
static const float AVG_PERIOD = 5; // sec
static const float AVG_FRAC = 0.8; // for low pass filter.
void pqistreamer::outSentBytes_locked(int outb)
void pqistreamer::outSentBytes_locked(uint32_t outb)
{
#ifdef DEBUG_PQISTREAMER
{
@ -1022,7 +1021,7 @@ void pqistreamer::outSentBytes_locked(int outb)
return;
}
void pqistreamer::inReadBytes_locked(int inb)
void pqistreamer::inReadBytes_locked(uint32_t inb)
{
#ifdef DEBUG_PQISTREAMER
{

View file

@ -102,10 +102,10 @@ class pqistreamer: public PQInterface
float outTimeSlice_locked();
int outAllowedBytes_locked();
void outSentBytes_locked(int );
void outSentBytes_locked(uint32_t );
int inAllowedBytes_locked();
void inReadBytes_locked(int );
void inReadBytes_locked(uint32_t );

View file

@ -514,20 +514,20 @@ bool p3GxsChannels::setChannelDownloadDirectory(const RsGxsGroupId &groupId, con
return true;
}
bool p3GxsChannels::getChannelDownloadDirectory(const RsGxsGroupId & id,std::string& directory)
bool p3GxsChannels::getChannelDownloadDirectory(const RsGxsGroupId & groupId,std::string& directory)
{
#ifdef GXSCHANNELS_DEBUG
std::cerr << "p3GxsChannels::autoDownloadEnabled(" << id << ")" << std::endl;
std::cerr << "p3GxsChannels::getChannelDownloadDirectory(" << id << ")" << std::endl;
#endif
std::map<RsGxsGroupId, RsGroupMetaData>::iterator it;
it = mSubscribedGroups.find(id);
it = mSubscribedGroups.find(groupId);
if (it == mSubscribedGroups.end())
{
#ifdef GXSCHANNELS_DEBUG
std::cerr << "p3GxsChannels::autoDownloadEnabled() No Entry" << std::endl;
std::cerr << "p3GxsChannels::getChannelDownloadDirectory() No Entry" << std::endl;
#endif
return false;
@ -904,7 +904,7 @@ void p3GxsChannels::handleResponse(uint32_t token, uint32_t req_type)
/********************************************************************************************/
bool p3GxsChannels::autoDownloadEnabled(const RsGxsGroupId &id,bool& enabled)
bool p3GxsChannels::autoDownloadEnabled(const RsGxsGroupId &groupId,bool& enabled)
{
#ifdef GXSCHANNELS_DEBUG
std::cerr << "p3GxsChannels::autoDownloadEnabled(" << id << ")";
@ -913,7 +913,7 @@ bool p3GxsChannels::autoDownloadEnabled(const RsGxsGroupId &id,bool& enabled)
std::map<RsGxsGroupId, RsGroupMetaData>::iterator it;
it = mSubscribedGroups.find(id);
it = mSubscribedGroups.find(groupId);
if (it == mSubscribedGroups.end())
{
#ifdef GXSCHANNELS_DEBUG
@ -927,22 +927,24 @@ bool p3GxsChannels::autoDownloadEnabled(const RsGxsGroupId &id,bool& enabled)
/* extract from ServiceString */
SSGxsChannelGroup ss;
ss.load(it->second.mServiceString);
enabled = ss.mAutoDownload;
enabled = ss.mAutoDownload;
return true ;
return true;
}
bool SSGxsChannelGroup::load(const std::string &input)
{
if(input.empty())
{
#ifdef GXSCHANNELS_DEBUG
std::cerr << "SSGxsChannelGroup::load() asked to load a null string." << std::endl;
#endif
return true ;
}
int download_val;
mAutoDownload = false;
mDownloadDirectory.clear();
if(input.empty())
{
std::cerr << "(EE) SSGxsChannelGroup::load() asked to load a null string. Weird." << std::endl;
return false ;
}
RsTemporaryMemory tmpmem(input.length());

View file

@ -45,12 +45,12 @@
class SSGxsChannelGroup
{
public:
SSGxsChannelGroup(): mAutoDownload(false), mDownloadDirectory("") {}
bool load(const std::string &input);
std::string save() const;
bool mAutoDownload;
std::string mDownloadDirectory ;
std::string mDownloadDirectory;
};
@ -176,7 +176,7 @@ static uint32_t channelsAuthenPolicy();
void updateSubscribedGroup(const RsGroupMetaData &group);
void clearUnsubscribedGroup(const RsGxsGroupId &id);
bool setAutoDownload(const RsGxsGroupId &groupId, bool enabled);
bool autoDownloadEnabled(const RsGxsGroupId &id, bool &enabled);
bool autoDownloadEnabled(const RsGxsGroupId &groupId, bool &enabled);

View file

@ -70,7 +70,7 @@ void PeerNode::provideFileHash(const RsFileHash& hash)
void PeerNode::manageFileHash(const RsFileHash& hash)
{
_managed_hashes.insert(hash) ;
_turtle->monitorTunnels(hash,_turtle_client) ;
_turtle->monitorTunnels(hash,_turtle_client, false) ;
}
void PeerNode::sendToGRKey(const GRouterKeyId& key_id)
{

View file

@ -859,7 +859,7 @@ void p3turtle::handleSearchRequest(RsTurtleSearchRequestItem *item)
#ifdef P3TURTLE_DEBUG
std::cerr << " Dropping, because the serial size exceeds the accepted limit." << std::endl ;
#endif
std::cerr << " Caught a turtle search item with arbitrary large size from " << item->PeerId() << " of size " << item->serial_size() << ". This is not allowed => dropping." << std::endl;
std::cerr << " Caught a turtle search item with arbitrary large size from " << item->PeerId() << " of size " << item->serial_size() << " and depth " << item->depth << ". This is not allowed => dropping." << std::endl;
return ;
}

View file

@ -65,7 +65,9 @@ public:
~RsSharedPtr()
{
lock();
DecrementAndDeleteIfLast();
unlock();
}
private: