This commit is contained in:
Mark Qvist 2025-10-30 19:45:40 +01:00
parent 6446db4f11
commit a62ffa12b1
2 changed files with 45 additions and 63 deletions

View file

@ -8,13 +8,13 @@ from .LXMessage import LXMessage
class LXMFDeliveryAnnounceHandler: class LXMFDeliveryAnnounceHandler:
def __init__(self, lxmrouter): def __init__(self, lxmrouter):
self.aspect_filter = APP_NAME+".delivery" self.aspect_filter = APP_NAME+".delivery"
self.receive_path_responses = True self.receive_path_responses = True
self.lxmrouter = lxmrouter self.lxmrouter = lxmrouter
def received_announce(self, destination_hash, announced_identity, app_data): def received_announce(self, destination_hash, announced_identity, app_data):
for lxmessage in self.lxmrouter.pending_outbound: for lxmessage in self.lxmrouter.pending_outbound:
if destination_hash == lxmessage.destination_hash: if destination_hash == lxmessage.destination_hash:
if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC: if lxmessage.method == LXMessage.DIRECT or lxmessage.method == LXMessage.OPPORTUNISTIC:
lxmessage.next_delivery_attempt = time.time() lxmessage.next_delivery_attempt = time.time()
@ -34,9 +34,9 @@ class LXMFDeliveryAnnounceHandler:
class LXMFPropagationAnnounceHandler: class LXMFPropagationAnnounceHandler:
def __init__(self, lxmrouter): def __init__(self, lxmrouter):
self.aspect_filter = APP_NAME+".propagation" self.aspect_filter = APP_NAME+".propagation"
self.receive_path_responses = True self.receive_path_responses = True
self.lxmrouter = lxmrouter self.lxmrouter = lxmrouter
def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash, is_path_response): def received_announce(self, destination_hash, announced_identity, app_data, announce_packet_hash, is_path_response):
if not is_path_response: if not is_path_response:
@ -51,7 +51,7 @@ class LXMFPropagationAnnounceHandler:
propagation_sync_limit = int(data[4]) propagation_sync_limit = int(data[4])
propagation_stamp_cost = int(data[5][0]) propagation_stamp_cost = int(data[5][0])
propagation_stamp_cost_flexibility = int(data[5][1]) propagation_stamp_cost_flexibility = int(data[5][1])
metadata = data[6] metadata = data[6]
if destination_hash in self.lxmrouter.static_peers: if destination_hash in self.lxmrouter.static_peers:
self.lxmrouter.peer(destination_hash=destination_hash, self.lxmrouter.peer(destination_hash=destination_hash,
@ -84,4 +84,4 @@ class LXMFPropagationAnnounceHandler:
except Exception as e: except Exception as e:
RNS.log("Error while evaluating propagation node announce, ignoring announce.", RNS.LOG_DEBUG) RNS.log("Error while evaluating propagation node announce, ignoring announce.", RNS.LOG_DEBUG)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG) RNS.log(f"The contained exception was: {str(e)}", RNS.LOG_DEBUG)

View file

@ -174,11 +174,11 @@ class LXMPeer:
self.handled_messages_queue = deque() self.handled_messages_queue = deque()
self.unhandled_messages_queue = deque() self.unhandled_messages_queue = deque()
self.offered = 0 # Messages offered to this peer self.offered = 0 # Messages offered to this peer
self.outgoing = 0 # Messages transferred to this peer self.outgoing = 0 # Messages transferred to this peer
self.incoming = 0 # Messages received from this peer self.incoming = 0 # Messages received from this peer
self.rx_bytes = 0 # Bytes received from this peer self.rx_bytes = 0 # Bytes received from this peer
self.tx_bytes = 0 # Bytes sent to this peer self.tx_bytes = 0 # Bytes sent to this peer
self._hm_count = 0 self._hm_count = 0
self._um_count = 0 self._um_count = 0
@ -243,29 +243,29 @@ class LXMPeer:
purged_ids = [] purged_ids = []
for transient_id in self.unhandled_messages: for transient_id in self.unhandled_messages:
if transient_id in self.router.propagation_entries: if transient_id in self.router.propagation_entries:
unhandled_entry = [ unhandled_entry = [ transient_id,
transient_id, self.router.get_weight(transient_id),
self.router.get_weight(transient_id), self.router.get_size(transient_id) ]
self.router.get_size(transient_id),
]
unhandled_entries.append(unhandled_entry) unhandled_entries.append(unhandled_entry)
else:
purged_ids.append(transient_id) else: purged_ids.append(transient_id)
for transient_id in purged_ids: for transient_id in purged_ids:
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG) RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
self.remove_unhandled_message(transient_id) self.remove_unhandled_message(transient_id)
unhandled_entries.sort(key=lambda e: e[1], reverse=False) unhandled_entries.sort(key=lambda e: e[1], reverse=False)
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
cumulative_size = 24 # Initialised to highest reasonable binary structure overhead cumulative_size = 24 # Initialised to highest reasonable binary structure overhead
RNS.log(f"Syncing to peer with per-message limit {RNS.prettysize(self.propagation_transfer_limit*1000)} and sync limit {RNS.prettysize(self.propagation_sync_limit*1000)}") # TODO: Remove debug RNS.log(f"Syncing to peer with per-message limit {RNS.prettysize(self.propagation_transfer_limit*1000)} and sync limit {RNS.prettysize(self.propagation_sync_limit*1000)}") # TODO: Remove debug
for unhandled_entry in unhandled_entries: for unhandled_entry in unhandled_entries:
transient_id = unhandled_entry[0] transient_id = unhandled_entry[0]
weight = unhandled_entry[1] weight = unhandled_entry[1]
lxm_size = unhandled_entry[2] lxm_size = unhandled_entry[2]
lxm_transfer_size = lxm_size+per_message_overhead lxm_transfer_size = lxm_size+per_message_overhead
next_size = cumulative_size + lxm_transfer_size next_size = cumulative_size + lxm_transfer_size
if self.propagation_transfer_limit != None and lxm_transfer_size > (self.propagation_transfer_limit*1000): if self.propagation_transfer_limit != None and lxm_transfer_size > (self.propagation_transfer_limit*1000):
self.remove_unhandled_message(transient_id) self.remove_unhandled_message(transient_id)
@ -288,14 +288,11 @@ class LXMPeer:
else: else:
RNS.log("Postponing sync with peer "+RNS.prettyhexrep(self.destination_hash)+" for "+RNS.prettytime(self.next_sync_attempt-time.time())+" due to previous failures", RNS.LOG_DEBUG) RNS.log("Postponing sync with peer "+RNS.prettyhexrep(self.destination_hash)+" for "+RNS.prettytime(self.next_sync_attempt-time.time())+" due to previous failures", RNS.LOG_DEBUG)
if self.last_sync_attempt > self.last_heard: if self.last_sync_attempt > self.last_heard: self.alive = False
self.alive = False
def request_failed(self, request_receipt): def request_failed(self, request_receipt):
RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG) RNS.log(f"Sync request to peer {self.destination} failed", RNS.LOG_DEBUG)
if self.link != None: if self.link != None: self.link.teardown()
self.link.teardown()
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
def offer_response(self, request_receipt): def offer_response(self, request_receipt):
@ -389,18 +386,16 @@ class LXMPeer:
if self.currently_transferring_messages == None: if self.currently_transferring_messages == None:
RNS.log(f"Sync transfer completed on {self}, but transferred message index was unavailable. Aborting.", RNS.LOG_ERROR) RNS.log(f"Sync transfer completed on {self}, but transferred message index was unavailable. Aborting.", RNS.LOG_ERROR)
if self.link != None: self.link.teardown() if self.link != None: self.link.teardown()
self.link = None self.link = None
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
for transient_id in self.currently_transferring_messages: for transient_id in self.currently_transferring_messages:
self.add_handled_message(transient_id) self.add_handled_message(transient_id)
self.remove_unhandled_message(transient_id) self.remove_unhandled_message(transient_id)
if self.link != None: if self.link != None: self.link.teardown()
self.link.teardown() self.link = None
self.state = LXMPeer.IDLE
self.link = None
self.state = LXMPeer.IDLE
rate_str = "" rate_str = ""
if self.current_sync_transfer_started != None: if self.current_sync_transfer_started != None:
@ -408,14 +403,14 @@ class LXMPeer:
rate_str = f" at {RNS.prettyspeed(self.sync_transfer_rate)}" rate_str = f" at {RNS.prettyspeed(self.sync_transfer_rate)}"
RNS.log(f"Syncing {len(self.currently_transferring_messages)} messages to peer {RNS.prettyhexrep(self.destination_hash)} completed{rate_str}", RNS.LOG_VERBOSE) RNS.log(f"Syncing {len(self.currently_transferring_messages)} messages to peer {RNS.prettyhexrep(self.destination_hash)} completed{rate_str}", RNS.LOG_VERBOSE)
self.alive = True self.alive = True
self.last_heard = time.time() self.last_heard = time.time()
self.offered += len(self.last_offer) self.offered += len(self.last_offer)
self.outgoing += len(self.currently_transferring_messages) self.outgoing += len(self.currently_transferring_messages)
self.tx_bytes += resource.get_data_size() self.tx_bytes += resource.get_data_size()
self.currently_transferring_messages = None self.currently_transferring_messages = None
self.current_sync_transfer_started = None self.current_sync_transfer_started = None
if self.sync_strategy == self.STRATEGY_PERSISTENT: if self.sync_strategy == self.STRATEGY_PERSISTENT:
if self.unhandled_message_count > 0: self.sync() if self.unhandled_message_count > 0: self.sync()
@ -423,10 +418,10 @@ class LXMPeer:
else: else:
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_VERBOSE) RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_VERBOSE)
if self.link != None: self.link.teardown() if self.link != None: self.link.teardown()
self.link = None self.link = None
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
self.currently_transferring_messages = None self.currently_transferring_messages = None
self.current_sync_transfer_started = None self.current_sync_transfer_started = None
def link_established(self, link): def link_established(self, link):
self.link.identify(self.router.identity) self.link.identify(self.router.identity)
@ -439,7 +434,7 @@ class LXMPeer:
self.sync() self.sync()
def link_closed(self, link): def link_closed(self, link):
self.link = None self.link = None
self.state = LXMPeer.IDLE self.state = LXMPeer.IDLE
def queued_items(self): def queued_items(self):
@ -452,19 +447,14 @@ class LXMPeer:
self.handled_messages_queue.append(transient_id) self.handled_messages_queue.append(transient_id)
def process_queues(self): def process_queues(self):
if len(self.unhandled_messages_queue) > 0 or len(self.handled_messages_queue) > 0: if len(self.unhandled_messages_queue) > 0 or len(self.handled_messages_queue) > 0:
# TODO: Remove debug
# st = time.time(); lu = len(self.unhandled_messages_queue); lh = len(self.handled_messages_queue)
handled_messages = self.handled_messages handled_messages = self.handled_messages
unhandled_messages = self.unhandled_messages unhandled_messages = self.unhandled_messages
while len(self.handled_messages_queue) > 0: while len(self.handled_messages_queue) > 0:
transient_id = self.handled_messages_queue.pop() transient_id = self.handled_messages_queue.pop()
if not transient_id in handled_messages: if not transient_id in handled_messages: self.add_handled_message(transient_id)
self.add_handled_message(transient_id) if transient_id in unhandled_messages: self.remove_unhandled_message(transient_id)
if transient_id in unhandled_messages:
self.remove_unhandled_message(transient_id)
while len(self.unhandled_messages_queue) > 0: while len(self.unhandled_messages_queue) > 0:
transient_id = self.unhandled_messages_queue.pop() transient_id = self.unhandled_messages_queue.pop()
@ -472,8 +462,6 @@ class LXMPeer:
self.add_unhandled_message(transient_id) self.add_unhandled_message(transient_id)
del handled_messages, unhandled_messages del handled_messages, unhandled_messages
# TODO: Remove debug
# RNS.log(f"{self} processed {lh}/{lu} in {RNS.prettytime(time.time()-st)}")
@property @property
def handled_messages(self): def handled_messages(self):
@ -493,16 +481,12 @@ class LXMPeer:
@property @property
def handled_message_count(self): def handled_message_count(self):
if not self._hm_counts_synced: if not self._hm_counts_synced: self._update_counts()
self._update_counts()
return self._hm_count return self._hm_count
@property @property
def unhandled_message_count(self): def unhandled_message_count(self):
if not self._um_counts_synced: if not self._um_counts_synced: self._update_counts()
self._update_counts()
return self._um_count return self._um_count
@property @property
@ -541,7 +525,5 @@ class LXMPeer:
self._um_counts_synced = False self._um_counts_synced = False
def __str__(self): def __str__(self):
if self.destination_hash: if self.destination_hash: return RNS.prettyhexrep(self.destination_hash)
return RNS.prettyhexrep(self.destination_hash) else: return "<Unknown>"
else:
return "<Unknown>"