Improved propagation node sync and memory consumption

This commit is contained in:
Mark Qvist 2024-02-29 23:02:16 +01:00
parent 7aea4ea209
commit 696c78ecea
2 changed files with 27 additions and 8 deletions

View File

@ -148,8 +148,10 @@ class LXMPeer:
RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG) RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed) self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
self.state = LXMPeer.REQUEST_SENT self.state = LXMPeer.REQUEST_SENT
else: else:
RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR) RNS.log("Could not request sync to peer "+RNS.prettyhexrep(self.destination_hash)+" since its identity could not be recalled.", RNS.LOG_ERROR)
else: else:
RNS.log("Postponing sync with peer "+RNS.prettyhexrep(self.destination_hash)+" for "+RNS.prettytime(self.next_sync_attempt-time.time())+" due to previous failures", RNS.LOG_DEBUG) RNS.log("Postponing sync with peer "+RNS.prettyhexrep(self.destination_hash)+" for "+RNS.prettytime(self.next_sync_attempt-time.time())+" due to previous failures", RNS.LOG_DEBUG)
if self.last_sync_attempt > self.last_heard: if self.last_sync_attempt > self.last_heard:
@ -159,7 +161,7 @@ class LXMPeer:
RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG) RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG)
if self.link != None: if self.link != None:
self.link.teardown() self.link.teardown()
else:
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
def offer_response(self, request_receipt): def offer_response(self, request_receipt):
@ -222,6 +224,7 @@ class LXMPeer:
resource = RNS.Resource(data, self.link, callback = self.resource_concluded) resource = RNS.Resource(data, self.link, callback = self.resource_concluded)
resource.transferred_messages = wanted_message_ids resource.transferred_messages = wanted_message_ids
self.state = LXMPeer.RESOURCE_TRANSFERRING self.state = LXMPeer.RESOURCE_TRANSFERRING
else: else:
RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_DEBUG) RNS.log("Peer "+RNS.prettyhexrep(self.destination_hash)+" did not request any of the available messages, sync completed", RNS.LOG_DEBUG)
if self.link != None: if self.link != None:
@ -261,6 +264,7 @@ class LXMPeer:
if self.link != None: if self.link != None:
self.link.teardown() self.link.teardown()
self.link = None
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
def link_established(self, link): def link_established(self, link):

View File

@ -21,6 +21,7 @@ class LXMRouter:
DELIVERY_RETRY_WAIT = 7 DELIVERY_RETRY_WAIT = 7
PATH_REQUEST_WAIT = 7 PATH_REQUEST_WAIT = 7
LINK_MAX_INACTIVITY = 10*60 LINK_MAX_INACTIVITY = 10*60
P_LINK_MAX_INACTIVITY = 3*60
MESSAGE_EXPIRY = 30*24*60*60 MESSAGE_EXPIRY = 30*24*60*60
@ -90,6 +91,7 @@ class LXMRouter:
self.propagation_transfer_progress = 0.0 self.propagation_transfer_progress = 0.0
self.propagation_transfer_last_result = None self.propagation_transfer_last_result = None
self.propagation_transfer_max_messages = None self.propagation_transfer_max_messages = None
self.active_propagation_links = []
self.locally_delivered_transient_ids = {} self.locally_delivered_transient_ids = {}
self.locally_processed_transient_ids = {} self.locally_processed_transient_ids = {}
@ -471,6 +473,19 @@ class LXMRouter:
cleaned_link = self.direct_links.pop(link_hash) cleaned_link = self.direct_links.pop(link_hash)
RNS.log("Cleaned link "+str(cleaned_link), RNS.LOG_DEBUG) RNS.log("Cleaned link "+str(cleaned_link), RNS.LOG_DEBUG)
try:
inactive_links = []
for link in self.active_propagation_links:
if link.no_data_for() > LXMRouter.P_LINK_MAX_INACTIVITY:
inactive_links.append(link)
for link in inactive_links:
self.active_propagation_links.remove(link)
link.teardown()
except Exception as e:
RNS.log("An error occurred while cleaning inbound propagation links. The contained exception was: "+str(e), RNS.LOG_ERROR)
if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.CLOSED: if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.CLOSED:
self.outbound_propagation_link = None self.outbound_propagation_link = None
if self.propagation_transfer_state == LXMRouter.PR_COMPLETE: if self.propagation_transfer_state == LXMRouter.PR_COMPLETE:
@ -953,7 +968,6 @@ class LXMRouter:
peer.alive = True peer.alive = True
peer.sync_backoff = 0 peer.sync_backoff = 0
peer.next_sync_attempt = 0 peer.next_sync_attempt = 0
peer.peering_timebase = timestamp peer.peering_timebase = timestamp
peer.last_heard = time.time() peer.last_heard = time.time()
@ -1038,6 +1052,7 @@ class LXMRouter:
link.set_resource_strategy(RNS.Link.ACCEPT_ALL) link.set_resource_strategy(RNS.Link.ACCEPT_ALL)
link.set_resource_started_callback(self.resource_transfer_began) link.set_resource_started_callback(self.resource_transfer_began)
link.set_resource_concluded_callback(self.propagation_resource_concluded) link.set_resource_concluded_callback(self.propagation_resource_concluded)
self.active_propagation_links.append(link)
def propagation_packet(self, data, packet): def propagation_packet(self, data, packet):
try: try:
@ -1057,7 +1072,7 @@ class LXMRouter:
RNS.log("Exception occurred while parsing incoming LXMF propagation data.", RNS.LOG_ERROR) RNS.log("Exception occurred while parsing incoming LXMF propagation data.", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
def offer_request(self, path, data, request_id, remote_identity, requested_at): def offer_request(self, path, data, request_id, link_id, remote_identity, requested_at):
if remote_identity == None: if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY return LXMPeer.ERROR_NO_IDENTITY
else: else:
@ -1086,7 +1101,7 @@ class LXMRouter:
RNS.log("Transfer concluded for incoming propagation resource "+str(resource), RNS.LOG_DEBUG) RNS.log("Transfer concluded for incoming propagation resource "+str(resource), RNS.LOG_DEBUG)
if resource.status == RNS.Resource.COMPLETE: if resource.status == RNS.Resource.COMPLETE:
# TODO: The peer this was received from should # TODO: The peer this was received from should
# have the transient id added to it's list of # have the transient id added to its list of
# already handled messages. # already handled messages.
try: try:
data = msgpack.unpackb(resource.data.read()) data = msgpack.unpackb(resource.data.read())
@ -1326,7 +1341,7 @@ class LXMRouter:
else: else:
# Simply wait for the link to become # Simply wait for the link to become
# active or close # active or close
RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active: "+str(self.outbound_propagation_link.status), RNS.LOG_DEBUG) RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
else: else:
# No link exists, so we'll try to establish one, but # No link exists, so we'll try to establish one, but
# only if we've never tried before, or the retry wait # only if we've never tried before, or the retry wait