diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index 17c0344..97fb747 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -148,8 +148,10 @@ class LXMPeer: RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG) self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) self.state = LXMPeer.REQUEST_SENT + else: RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR) + else: RNS.log("Postponing sync with peer "+RNS.prettyhexrep(self.destination_hash)+" for "+RNS.prettytime(self.next_sync_attempt-time.time())+" due to previous failures", RNS.LOG_DEBUG) if self.last_sync_attempt > self.last_heard: @@ -159,8 +161,8 @@ class LXMPeer: RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG) if self.link != None: self.link.teardown() - else: - self.state = LXMPeer.IDLE + + self.state = LXMPeer.IDLE def offer_response(self, request_receipt): try: @@ -222,6 +224,7 @@ class LXMPeer: resource = RNS.Resource(data, self.link, callback = self.resource_concluded) resource.transferred_messages = wanted_message_ids self.state = LXMPeer.RESOURCE_TRANSFERRING + else: RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_DEBUG) if self.link != None: @@ -261,6 +264,7 @@ class LXMPeer: if self.link != None: self.link.teardown() + self.link = None self.state = LXMPeer.IDLE def link_established(self, link): diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index e9a4836..04263a0 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -21,6 +21,7 @@ class LXMRouter: DELIVERY_RETRY_WAIT = 7 PATH_REQUEST_WAIT = 7 LINK_MAX_INACTIVITY = 10*60 + P_LINK_MAX_INACTIVITY = 3*60 MESSAGE_EXPIRY = 30*24*60*60 @@ -90,6 +91,7 @@ class LXMRouter: self.propagation_transfer_progress = 0.0 self.propagation_transfer_last_result = None self.propagation_transfer_max_messages = None + self.active_propagation_links = [] self.locally_delivered_transient_ids = {} self.locally_processed_transient_ids = {} @@ -471,6 +473,19 @@ class LXMRouter: cleaned_link = self.direct_links.pop(link_hash) RNS.log("Cleaned link "+str(cleaned_link), RNS.LOG_DEBUG) + try: + inactive_links = [] + for link in self.active_propagation_links: + if link.no_data_for() > LXMRouter.P_LINK_MAX_INACTIVITY: + inactive_links.append(link) + + for link in inactive_links: + self.active_propagation_links.remove(link) + link.teardown() + + except Exception as e: + RNS.log("An error occurred while cleaning inbound propagation links. The contained exception was: "+str(e), RNS.LOG_ERROR) + if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.CLOSED: self.outbound_propagation_link = None if self.propagation_transfer_state == LXMRouter.PR_COMPLETE: @@ -953,9 +968,8 @@ class LXMRouter: peer.alive = True peer.sync_backoff = 0 peer.next_sync_attempt = 0 - - peer.peering_timebase = timestamp - peer.last_heard = time.time() + peer.peering_timebase = timestamp + peer.last_heard = time.time() else: peer = LXMPeer(self, destination_hash) @@ -1038,6 +1052,7 @@ class LXMRouter: link.set_resource_strategy(RNS.Link.ACCEPT_ALL) link.set_resource_started_callback(self.resource_transfer_began) link.set_resource_concluded_callback(self.propagation_resource_concluded) + self.active_propagation_links.append(link) def propagation_packet(self, data, packet): try: @@ -1057,7 +1072,7 @@ class LXMRouter: RNS.log("Exception occurred while parsing incoming LXMF propagation data.", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) - def offer_request(self, path, data, request_id, remote_identity, requested_at): + def offer_request(self, path, data, request_id, link_id, remote_identity, requested_at): if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY else: @@ -1086,7 +1101,7 @@ class LXMRouter: RNS.log("Transfer concluded for incoming propagation resource "+str(resource), RNS.LOG_DEBUG) if resource.status == RNS.Resource.COMPLETE: # TODO: The peer this was received from should - # have the transient id added to it's list of + # have the transient id added to its list of # already handled messages. try: data = msgpack.unpackb(resource.data.read()) @@ -1326,7 +1341,7 @@ class LXMRouter: else: # Simply wait for the link to become # active or close - RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active: "+str(self.outbound_propagation_link.status), RNS.LOG_DEBUG) + RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active", RNS.LOG_DEBUG) else: # No link exists, so we'll try to establish one, but # only if we've never tried before, or the retry wait