Added sync backoff for unresponsive peers. Improved sync peer selection.

This commit is contained in:
Mark Qvist 2022-12-20 23:58:09 +01:00
parent b10755e34f
commit 1440a0b162
3 changed files with 91 additions and 43 deletions

View File

@ -24,6 +24,17 @@ class LXMPeer:
# be unreachable before it is removed
MAX_UNREACHABLE = 4*24*60*60
# Everytime consecutive time a sync
# link fails to establish, add this
# amount off time to wait before the
# next sync is attempted.
SYNC_BACKOFF_STEP = 12*60
# How long to wait for an answer to
# peer path requests before deferring
# sync to later.
PATH_REQUEST_GRACE = 7.5
@staticmethod
def from_bytes(peer_bytes, router):
dictionary = msgpack.unpackb(peer_bytes)
@ -66,6 +77,9 @@ class LXMPeer:
def __init__(self, router, destination_hash):
self.alive = False
self.last_heard = 0
self.next_sync_attempt = 0
self.last_sync_attempt = 0
self.sync_backoff = 0
self.peering_timebase = 0
self.link = None
@ -80,48 +94,61 @@ class LXMPeer:
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
def sync(self):
RNS.log("Initiating LXMF Propagation Node sync with peer "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_DEBUG)
self.last_sync_attempt = time.time()
if not RNS.Transport.has_path(self.destination_hash):
RNS.log("No path to peer "+RNS.prettyhexrep(self.destination_hash)+" exists, requesting...", RNS.LOG_DEBUG)
RNS.Transport.request_path(self.destination_hash)
RNS.log("Path requested, retrying sync later", RNS.LOG_DEBUG)
else:
if self.identity == None:
self.identity = RNS.Identity.recall(destination_hash)
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
if time.time() > self.next_sync_attempt:
if not RNS.Transport.has_path(self.destination_hash):
RNS.log("No path to peer "+RNS.prettyhexrep(self.destination_hash)+" exists, requesting...", RNS.LOG_DEBUG)
RNS.Transport.request_path(self.destination_hash)
time.sleep(LXMPeer.PATH_REQUEST_GRACE)
if self.identity != None:
if len(self.unhandled_messages) > 0:
if self.state == LXMPeer.IDLE:
RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG)
self.link = RNS.Link(self.destination, established_callback = self.link_established, closed_callback = self.link_closed)
self.state = LXMPeer.LINK_ESTABLISHING
else:
if self.state == LXMPeer.LINK_READY:
self.alive = True
self.last_heard = time.time()
RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG)
unhandled_ids = []
purged_ids = []
for transient_id in self.unhandled_messages:
if transient_id in self.router.propagation_entries:
unhandled_ids.append(transient_id)
else:
purged_ids.append(transient_id)
for transient_id in purged_ids:
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
self.unhandled_messages.pop(transient_id)
RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
self.state = LXMPeer.REQUEST_SENT
if not RNS.Transport.has_path(self.destination_hash):
RNS.log("Path request was not answered, retrying sync with peer "+RNS.prettyhexrep(self.destination_hash)+" later", RNS.LOG_DEBUG)
else:
RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR)
if self.identity == None:
self.identity = RNS.Identity.recall(destination_hash)
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
if self.identity != None:
if len(self.unhandled_messages) > 0:
if self.state == LXMPeer.IDLE:
RNS.log("Establishing link for sync to peer "+RNS.prettyhexrep(self.destination_hash)+"...", RNS.LOG_DEBUG)
self.sync_backoff += LXMPeer.SYNC_BACKOFF_STEP
self.next_sync_attempt = time.time() + self.sync_backoff
self.link = RNS.Link(self.destination, established_callback = self.link_established, closed_callback = self.link_closed)
self.state = LXMPeer.LINK_ESTABLISHING
else:
if self.state == LXMPeer.LINK_READY:
self.alive = True
self.last_heard = time.time()
self.sync_backoff = 0
RNS.log("Synchronisation link to peer "+RNS.prettyhexrep(self.destination_hash)+" established, preparing request...", RNS.LOG_DEBUG)
unhandled_ids = []
purged_ids = []
for transient_id in self.unhandled_messages:
if transient_id in self.router.propagation_entries:
unhandled_ids.append(transient_id)
else:
purged_ids.append(transient_id)
for transient_id in purged_ids:
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
self.unhandled_messages.pop(transient_id)
RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
self.state = LXMPeer.REQUEST_SENT
else:
RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR)
else:
RNS.log("Postponing sync with peer "+RNS.prettyhexrep(self.destination_hash)+" for "+RNS.prettytime(self.next_sync_attempt-time.time())+" due to previous failures", RNS.LOG_DEBUG)
if self.last_sync_attempt > self.last_heard:
self.alive = False
def request_failed(self, request_receipt):
RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG)
@ -191,6 +218,7 @@ class LXMPeer:
resource.transferred_messages = wanted_message_ids
self.state = LXMPeer.RESOURCE_TRANSFERRING
else:
RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_DEBUG)
self.state = LXMPeer.IDLE
except Exception as e:
@ -226,6 +254,7 @@ class LXMPeer:
def link_established(self, link):
self.link.identify(self.router.identity)
self.state = LXMPeer.LINK_READY
self.next_sync_attempt = 0
self.sync()
def link_closed(self, link):

View File

@ -910,7 +910,11 @@ class LXMRouter:
def peer(self, destination_hash, timestamp):
if destination_hash in self.peers:
peer = self.peers[destination_hash]
peer.alive = True
if timestamp > peer.peering_timebase:
peer.alive = True
peer.sync_backoff = 0
peer.next_sync_attempt = 0
peer.peering_timebase = timestamp
peer.last_heard = time.time()
else:
@ -934,18 +938,33 @@ class LXMRouter:
def sync_peers(self):
culled_peers = []
waiting_peers = []
unresponsive_peers = []
for peer_id in self.peers:
peer = self.peers[peer_id]
if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE:
culled_peers.append(peer_id)
else:
if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0:
waiting_peers.append(peer)
if peer.alive:
waiting_peers.append(peer)
else:
if hasattr(peer, "next_sync_attempt") and time.time() > peer.next_sync_attempt:
unresponsive_peers.append(peer)
else:
pass
# RNS.log("Not adding peer "+str(peer)+" since it is in sync backoff", RNS.LOG_DEBUG)
peer_pool = []
if len(waiting_peers) > 0:
RNS.log("Randomly selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_DEBUG)
selected_index = random.randint(0,len(waiting_peers)-1)
selected_peer = waiting_peers[selected_index]
peer_pool = waiting_peers
elif len(unresponsive_peers) > 0:
RNS.log("No active peers available, randomly selecting peer to sync from "+str(len(unresponsive_peers))+" unresponsive peers.", RNS.LOG_DEBUG)
peer_pool = unresponsive_peers
if len(peer_pool) > 0:
selected_index = random.randint(0,len(peer_pool)-1)
selected_peer = peer_pool[selected_index]
RNS.log("Selected waiting peer "+str(selected_index)+": "+RNS.prettyhexrep(selected_peer.destination.hash), RNS.LOG_DEBUG)
selected_peer.sync()

View File

@ -1 +1 @@
__version__ = "0.2.6"
__version__ = "0.2.7"