mirror of
https://github.com/RetroShare/RetroShare.git
synced 2024-10-01 02:35:48 -04:00
added sorting of peers (part 2/2)
This commit is contained in:
parent
af6dee088c
commit
5a55800f31
@ -148,9 +148,9 @@ bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,st
|
|||||||
p.tick(); // ticks bio
|
p.tick(); // ticks bio
|
||||||
|
|
||||||
RsItem *item = GetItem();
|
RsItem *item = GetItem();
|
||||||
|
#ifdef DEBUG_FSCLIENT
|
||||||
RsDbg() << "Ticking for response...";
|
RsDbg() << "Ticking for response...";
|
||||||
|
#endif
|
||||||
if(item)
|
if(item)
|
||||||
{
|
{
|
||||||
response.push_back(item);
|
response.push_back(item);
|
||||||
|
@ -376,9 +376,9 @@ int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize)
|
|||||||
if(ptr == NULL)
|
if(ptr == NULL)
|
||||||
return 0 ;
|
return 0 ;
|
||||||
|
|
||||||
//#ifdef DEBUG_PQISTREAMER
|
#ifdef DEBUG_PQISTREAMER
|
||||||
std::cerr << "pqistreamer::queue_outpqi() serializing packet with packet size : " << pktsize << std::endl;
|
std::cerr << "pqistreamer::queue_outpqi() serializing packet with packet size : " << pktsize << std::endl;
|
||||||
//#endif
|
#endif
|
||||||
|
|
||||||
/*******************************************************************************************/
|
/*******************************************************************************************/
|
||||||
// keep info for stats for a while. Only keep the items for the last two seconds. sec n is ongoing and second n-1
|
// keep info for stats for a while. Only keep the items for the last two seconds. sec n is ongoing and second n-1
|
||||||
|
@ -84,6 +84,13 @@ void FriendServer::handleClientPublish(const RsFriendServerClientPublishItem *it
|
|||||||
sr_item->friend_invites = computeListOfFriendInvites(item->n_requested_friends,pi->first,pi->second.pgp_fingerprint);
|
sr_item->friend_invites = computeListOfFriendInvites(item->n_requested_friends,pi->first,pi->second.pgp_fingerprint);
|
||||||
sr_item->PeerId(item->PeerId());
|
sr_item->PeerId(item->PeerId());
|
||||||
|
|
||||||
|
// Update the have_added_as_friend for the list of each peer. We do that before sending because sending destroys
|
||||||
|
// the item.
|
||||||
|
|
||||||
|
for(auto& it:mCurrentClientPeers)
|
||||||
|
it.second.have_added_this_peer[computePeerDistance(it.second.pgp_fingerprint,pi->second.pgp_fingerprint)] = pi->first;
|
||||||
|
|
||||||
|
// Send the item.
|
||||||
mni->SendItem(sr_item);
|
mni->SendItem(sr_item);
|
||||||
|
|
||||||
// Update the list of closest peers for all peers currently in the database.
|
// Update the list of closest peers for all peers currently in the database.
|
||||||
@ -102,8 +109,6 @@ void FriendServer::handleClientPublish(const RsFriendServerClientPublishItem *it
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Close client connection from server side, to tell the client that nothing more is coming.
|
// Close client connection from server side, to tell the client that nothing more is coming.
|
||||||
//RsDbg() << "Closing client connection." ;
|
|
||||||
//mni->closeConnection(item->PeerId());
|
|
||||||
|
|
||||||
RsDbg() << "Sending end-of-stream item to " << item->PeerId() ;
|
RsDbg() << "Sending end-of-stream item to " << item->PeerId() ;
|
||||||
|
|
||||||
@ -112,6 +117,9 @@ void FriendServer::handleClientPublish(const RsFriendServerClientPublishItem *it
|
|||||||
status_item->PeerId(item->PeerId());
|
status_item->PeerId(item->PeerId());
|
||||||
|
|
||||||
mni->SendItem(status_item);
|
mni->SendItem(status_item);
|
||||||
|
|
||||||
|
RsDbg() << "Closing client connection." ;
|
||||||
|
mni->closeConnection(item->PeerId());
|
||||||
}
|
}
|
||||||
|
|
||||||
std::map<std::string, bool> FriendServer::computeListOfFriendInvites(uint32_t nb_reqs_invites, const RsPeerId &pid, const RsPgpFingerprint &fpr)
|
std::map<std::string, bool> FriendServer::computeListOfFriendInvites(uint32_t nb_reqs_invites, const RsPeerId &pid, const RsPgpFingerprint &fpr)
|
||||||
@ -239,14 +247,25 @@ void FriendServer::handleClientRemove(const RsFriendServerClientRemoveItem *item
|
|||||||
|
|
||||||
RsDbg() << " Nonce is correct: " << std::hex << item->nonce << std::dec << ". Removing peer " << item->peer_id ;
|
RsDbg() << " Nonce is correct: " << std::hex << item->nonce << std::dec << ". Removing peer " << item->peer_id ;
|
||||||
|
|
||||||
mCurrentClientPeers.erase(it);
|
removePeer(item->peer_id);
|
||||||
|
}
|
||||||
|
|
||||||
// Also remove that peer from all n-closest lists
|
void FriendServer::removePeer(const RsPeerId& peer_id)
|
||||||
|
{
|
||||||
|
auto it = mCurrentClientPeers.find(peer_id);
|
||||||
|
|
||||||
|
if(it != mCurrentClientPeers.end())
|
||||||
|
mCurrentClientPeers.erase(it);
|
||||||
|
|
||||||
for(auto& it:mCurrentClientPeers)
|
for(auto& it:mCurrentClientPeers)
|
||||||
|
{
|
||||||
|
// Also remove that peer from all n-closest lists
|
||||||
|
|
||||||
for(auto pit(it.second.closest_peers.begin());pit!=it.second.closest_peers.end();)
|
for(auto pit(it.second.closest_peers.begin());pit!=it.second.closest_peers.end();)
|
||||||
if(pit->second == item->peer_id)
|
if(pit->second == peer_id)
|
||||||
{
|
{
|
||||||
|
RsDbg() << " Removing from n-closest peers of peer " << pit->first ;
|
||||||
|
|
||||||
auto tmp(pit);
|
auto tmp(pit);
|
||||||
++tmp;
|
++tmp;
|
||||||
it.second.closest_peers.erase(pit);
|
it.second.closest_peers.erase(pit);
|
||||||
@ -254,6 +273,22 @@ void FriendServer::handleClientRemove(const RsFriendServerClientRemoveItem *item
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
++pit;
|
++pit;
|
||||||
|
|
||||||
|
// Also remove that peer from peers that have accepted each peer
|
||||||
|
|
||||||
|
for(auto fit(it.second.have_added_this_peer.begin());fit!=it.second.have_added_this_peer.end();)
|
||||||
|
if(fit->second == peer_id)
|
||||||
|
{
|
||||||
|
RsDbg() << " Removing from have_added_as_friend peers of peer " << fit->first ;
|
||||||
|
|
||||||
|
auto tmp(fit);
|
||||||
|
++tmp;
|
||||||
|
it.second.closest_peers.erase(fit);
|
||||||
|
fit=tmp;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
++fit;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
PeerInfo::PeerDistance FriendServer::computePeerDistance(const RsPgpFingerprint& p1,const RsPgpFingerprint& p2)
|
PeerInfo::PeerDistance FriendServer::computePeerDistance(const RsPgpFingerprint& p1,const RsPgpFingerprint& p2)
|
||||||
@ -295,17 +330,17 @@ void FriendServer::autoWash()
|
|||||||
rstime_t now = time(nullptr);
|
rstime_t now = time(nullptr);
|
||||||
RsDbg() << "autoWash..." ;
|
RsDbg() << "autoWash..." ;
|
||||||
|
|
||||||
|
std::list<RsPeerId> to_remove;
|
||||||
|
|
||||||
for(std::map<RsPeerId,PeerInfo>::iterator it(mCurrentClientPeers.begin());it!=mCurrentClientPeers.end();)
|
for(std::map<RsPeerId,PeerInfo>::iterator it(mCurrentClientPeers.begin());it!=mCurrentClientPeers.end();)
|
||||||
if(it->second.last_connection_TS + MAXIMUM_PEER_INACTIVE_DELAY < now)
|
if(it->second.last_connection_TS + MAXIMUM_PEER_INACTIVE_DELAY < now)
|
||||||
{
|
{
|
||||||
RsDbg() << "Removing client peer " << it->first << " because it's inactive for more than " << MAXIMUM_PEER_INACTIVE_DELAY << " seconds." ;
|
RsDbg() << "Removing client peer " << it->first << " because it's inactive for more than " << MAXIMUM_PEER_INACTIVE_DELAY << " seconds." ;
|
||||||
auto tmp = it;
|
to_remove.push_back(it->first);
|
||||||
++tmp;
|
|
||||||
mCurrentClientPeers.erase(it);
|
|
||||||
it = tmp;
|
|
||||||
}
|
}
|
||||||
else
|
|
||||||
++it;
|
for(auto peer_id:to_remove)
|
||||||
|
removePeer(peer_id);
|
||||||
|
|
||||||
RsDbg() << "done." ;
|
RsDbg() << "done." ;
|
||||||
}
|
}
|
||||||
|
@ -40,6 +40,7 @@ struct PeerInfo
|
|||||||
uint64_t last_nonce;
|
uint64_t last_nonce;
|
||||||
|
|
||||||
std::map<PeerDistance,RsPeerId> closest_peers;
|
std::map<PeerDistance,RsPeerId> closest_peers;
|
||||||
|
std::map<PeerDistance,RsPeerId> have_added_this_peer;
|
||||||
};
|
};
|
||||||
|
|
||||||
class FriendServer : public RsTickingThread
|
class FriendServer : public RsTickingThread
|
||||||
@ -61,6 +62,9 @@ private:
|
|||||||
// Updates for each peer in the database, the list of closest peers w.r.t. some arbitrary distance.
|
// Updates for each peer in the database, the list of closest peers w.r.t. some arbitrary distance.
|
||||||
void updateClosestPeers(const RsPeerId& pid,const RsPgpFingerprint& fpr);
|
void updateClosestPeers(const RsPeerId& pid,const RsPgpFingerprint& fpr);
|
||||||
|
|
||||||
|
// removes a single peer from all lists.
|
||||||
|
void removePeer(const RsPeerId& peer_id);
|
||||||
|
|
||||||
// Adds the incoming peer data to the list of current clients and returns the
|
// Adds the incoming peer data to the list of current clients and returns the
|
||||||
std::map<RsPeerId,PeerInfo>::iterator handleIncomingClientData(const std::string& pgp_public_key_b64,const std::string& short_invite_b64);
|
std::map<RsPeerId,PeerInfo>::iterator handleIncomingClientData(const std::string& pgp_public_key_b64,const std::string& short_invite_b64);
|
||||||
|
|
||||||
|
@ -103,7 +103,7 @@ void FsNetworkInterface::threadTick()
|
|||||||
to_close.push_back(it.first);
|
to_close.push_back(it.first);
|
||||||
|
|
||||||
for(const auto& pid:to_close)
|
for(const auto& pid:to_close)
|
||||||
closeConnection(pid);
|
locked_closeConnection(pid);
|
||||||
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||||
}
|
}
|
||||||
@ -224,6 +224,10 @@ void FsNetworkInterface::closeConnection(const RsPeerId& peer_id)
|
|||||||
{
|
{
|
||||||
RS_STACK_MUTEX(mFsNiMtx);
|
RS_STACK_MUTEX(mFsNiMtx);
|
||||||
|
|
||||||
|
locked_closeConnection(peer_id);
|
||||||
|
}
|
||||||
|
void FsNetworkInterface::locked_closeConnection(const RsPeerId& peer_id)
|
||||||
|
{
|
||||||
RsDbg() << "Closing connection to virtual peer " << peer_id ;
|
RsDbg() << "Closing connection to virtual peer " << peer_id ;
|
||||||
|
|
||||||
const auto& it = mConnections.find(peer_id);
|
const auto& it = mConnections.find(peer_id);
|
||||||
|
@ -48,7 +48,6 @@ public:
|
|||||||
|
|
||||||
// basic functionality
|
// basic functionality
|
||||||
|
|
||||||
void closeConnection(const RsPeerId& peer_id);
|
|
||||||
void debugPrint();
|
void debugPrint();
|
||||||
|
|
||||||
// Implements PQInterface
|
// Implements PQInterface
|
||||||
@ -57,12 +56,15 @@ public:
|
|||||||
int SendItem(RsItem *item) override;
|
int SendItem(RsItem *item) override;
|
||||||
RsItem *GetItem() override;
|
RsItem *GetItem() override;
|
||||||
|
|
||||||
|
void closeConnection(const RsPeerId& peer_id);
|
||||||
|
|
||||||
// Implements RsTickingThread
|
// Implements RsTickingThread
|
||||||
|
|
||||||
void threadTick() override;
|
void threadTick() override;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
bool checkForNewConnections();
|
bool checkForNewConnections();
|
||||||
|
void locked_closeConnection(const RsPeerId& peer_id);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
RsMutex mFsNiMtx;
|
RsMutex mFsNiMtx;
|
||||||
|
Loading…
Reference in New Issue
Block a user