mirror of
https://github.com/markqvist/LXMF.git
synced 2025-11-19 15:11:54 -05:00
Handle client propagation transfer limits separately from PN peers
This commit is contained in:
parent
99830b6e8b
commit
bc3f4ecff5
1 changed files with 56 additions and 36 deletions
|
|
@ -154,6 +154,7 @@ class LXMRouter:
|
|||
self.propagation_transfer_max_messages = None
|
||||
self.prioritise_rotating_unreachable_peers = False
|
||||
self.active_propagation_links = []
|
||||
self.validated_peer_links = {}
|
||||
self.locally_delivered_transient_ids = {}
|
||||
self.locally_processed_transient_ids = {}
|
||||
self.outbound_stamp_costs = {}
|
||||
|
|
@ -910,6 +911,8 @@ class LXMRouter:
|
|||
if inactive_time > LXMRouter.LINK_MAX_INACTIVITY:
|
||||
link.teardown()
|
||||
closed_links.append(link_hash)
|
||||
if link.link_id in self.validated_peer_links:
|
||||
self.validated_peer_links.pop(link.link_id)
|
||||
|
||||
for link_hash in closed_links:
|
||||
cleaned_link = self.direct_links.pop(link_hash)
|
||||
|
|
@ -2101,7 +2104,10 @@ class LXMRouter:
|
|||
self.lxmf_propagation(lxmf_data, stamp_value=stamp_value, stamp_data=stamp_data)
|
||||
self.client_propagation_messages_received += 1
|
||||
|
||||
if len(validated_messages) == len(messages): packet.prove()
|
||||
if len(validated_messages) == len(messages):
|
||||
ms = "" if len(messages) == 1 else "s"
|
||||
RNS.log(f"Received {len(messages)} propagation message{ms} from client with valid stamp{ms}", RNS.LOG_DEBUG)
|
||||
packet.prove()
|
||||
else:
|
||||
RNS.log("Propagation transfer from client contained messages with invalid stamps", RNS.LOG_NOTICE)
|
||||
reject_data = msgpack.packb([LXMPeer.ERROR_INVALID_STAMP])
|
||||
|
|
@ -2151,6 +2157,7 @@ class LXMRouter:
|
|||
|
||||
else:
|
||||
RNS.log(f"Peering key validated for incoming offer in {RNS.prettytime(td)}", RNS.LOG_DEBUG)
|
||||
self.validated_peer_links[link_id] = True
|
||||
for transient_id in transient_ids:
|
||||
if not transient_id in self.propagation_entries: wanted_ids.append(transient_id)
|
||||
|
||||
|
|
@ -2174,7 +2181,7 @@ class LXMRouter:
|
|||
remote_timebase = data[0]
|
||||
messages = data[1]
|
||||
remote_hash = None
|
||||
remote_str = "unknown peer"
|
||||
remote_str = "unknown client"
|
||||
|
||||
if remote_identity != None:
|
||||
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||
|
|
@ -2203,42 +2210,55 @@ class LXMRouter:
|
|||
RNS.log(f"Auto-peering with {remote_str} discovered via incoming sync", RNS.LOG_DEBUG) # TODO: Remove debug
|
||||
self.peer(remote_hash, remote_timebase, remote_transfer_limit, remote_sync_limit, remote_stamp_cost, remote_stamp_flex, remote_peering_cost, remote_metadata)
|
||||
|
||||
ms = "" if len(messages) == 1 else "s"
|
||||
RNS.log(f"Received {len(messages)} message{ms} from {remote_str}, validating stamps...", RNS.LOG_VERBOSE)
|
||||
peering_key_valid = False
|
||||
if remote_identity != None:
|
||||
if resource.link.link_id in self.validated_peer_links and self.validated_peer_links[resource.link.link_id] == True:
|
||||
peering_key_valid = True
|
||||
|
||||
min_accepted_cost = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility)
|
||||
validated_messages = LXStamper.validate_pn_stamps(messages, min_accepted_cost)
|
||||
if len(validated_messages) == len(messages): RNS.log(f"All message stamps validated from {remote_str}", RNS.LOG_VERBOSE)
|
||||
else: RNS.log(f"Transfer from {remote_str} contained {len(messages)-len(validated_messages)} invalid stamps", RNS.LOG_WARNING)
|
||||
|
||||
for validated_entry in validated_messages:
|
||||
transient_id = validated_entry[0]
|
||||
lxmf_data = validated_entry[1]
|
||||
stamp_value = validated_entry[2]
|
||||
stamp_data = validated_entry[3]
|
||||
peer = None
|
||||
|
||||
if remote_hash != None and remote_hash in self.peers:
|
||||
peer = self.peers[remote_hash]
|
||||
peer.incoming += 1
|
||||
peer.rx_bytes += len(lxmf_data)
|
||||
else:
|
||||
if remote_identity != None:
|
||||
self.unpeered_propagation_incoming += 1
|
||||
self.unpeered_propagation_rx_bytes += len(lxmf_data)
|
||||
else:
|
||||
self.client_propagation_messages_received += 1
|
||||
|
||||
self.lxmf_propagation(lxmf_data, from_peer=peer, stamp_value=stamp_value, stamp_data=stamp_data)
|
||||
if peer != None: peer.queue_handled_message(transient_id)
|
||||
|
||||
invalid_message_count = len(messages) - len(validated_messages)
|
||||
if invalid_message_count > 0:
|
||||
if not peering_key_valid and len(messages) > 1:
|
||||
resource.link.teardown()
|
||||
throttle_time = LXMRouter.PN_STAMP_THROTTLE
|
||||
self.throttled_peers[remote_hash] = time.time()+throttle_time
|
||||
ms = "" if invalid_message_count == 1 else "s"
|
||||
RNS.log(f"Propagation transfer from {remote_str} contained {invalid_message_count} message{ms} with invalid stamps, throttled for {RNS.prettytime(throttle_time)}", RNS.LOG_NOTICE)
|
||||
RNS.log(f"Received multiple propagation messages from {remote_str} without valid peering key presentation. This is not supposed to happen, ignoring.", RNS.LOG_WARNING)
|
||||
RNS.log(f"Clients and peers without a valid peering key can only deliver 1 message per transfer.", RNS.LOG_WARNING)
|
||||
else:
|
||||
ms = "" if len(messages) == 1 else "s"
|
||||
RNS.log(f"Received {len(messages)} message{ms} from {remote_str}, validating stamps...", RNS.LOG_VERBOSE)
|
||||
|
||||
min_accepted_cost = max(0, self.propagation_stamp_cost-self.propagation_stamp_cost_flexibility)
|
||||
validated_messages = LXStamper.validate_pn_stamps(messages, min_accepted_cost)
|
||||
invalid_stamps = len(messages)-len(validated_messages)
|
||||
ms = "" if invalid_stamps == 1 else "s"
|
||||
if len(validated_messages) == len(messages): RNS.log(f"All message stamps validated from {remote_str}", RNS.LOG_VERBOSE)
|
||||
else: RNS.log(f"Transfer from {remote_str} contained {invalid_stamps} invalid stamp{ms}", RNS.LOG_WARNING)
|
||||
|
||||
for validated_entry in validated_messages:
|
||||
transient_id = validated_entry[0]
|
||||
lxmf_data = validated_entry[1]
|
||||
stamp_value = validated_entry[2]
|
||||
stamp_data = validated_entry[3]
|
||||
peer = None
|
||||
|
||||
if remote_hash != None and remote_hash in self.peers:
|
||||
peer = self.peers[remote_hash]
|
||||
peer.incoming += 1
|
||||
peer.rx_bytes += len(lxmf_data)
|
||||
else:
|
||||
if remote_identity != None:
|
||||
self.unpeered_propagation_incoming += 1
|
||||
self.unpeered_propagation_rx_bytes += len(lxmf_data)
|
||||
else:
|
||||
self.client_propagation_messages_received += 1
|
||||
|
||||
self.lxmf_propagation(lxmf_data, from_peer=peer, stamp_value=stamp_value, stamp_data=stamp_data)
|
||||
if peer != None: peer.queue_handled_message(transient_id)
|
||||
|
||||
invalid_message_count = len(messages) - len(validated_messages)
|
||||
if invalid_message_count > 0:
|
||||
resource.link.teardown()
|
||||
if remote_identity != None:
|
||||
throttle_time = LXMRouter.PN_STAMP_THROTTLE
|
||||
self.throttled_peers[remote_hash] = time.time()+throttle_time
|
||||
ms = "" if invalid_message_count == 1 else "s"
|
||||
RNS.log(f"Propagation transfer from {remote_str} contained {invalid_message_count} message{ms} with invalid stamps, throttled for {RNS.prettytime(throttle_time)}", RNS.LOG_NOTICE)
|
||||
|
||||
else:
|
||||
RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue