diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index f2daa2f..5e3bf36 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -24,6 +24,7 @@ class LXMRouter: P_LINK_MAX_INACTIVITY = 3*60 MESSAGE_EXPIRY = 30*24*60*60 + STAMP_COST_EXPIRY = 45*24*60*60 NODE_ANNOUNCE_DELAY = 20 @@ -58,7 +59,8 @@ class LXMRouter: ####################################################### def __init__(self, identity = None, storagepath = None, autopeer = AUTOPEER, autopeer_maxdepth = None, - propagation_limit = PROPAGATION_LIMIT, delivery_limit = DELIVERY_LIMIT, enforce_ratchets = False): + propagation_limit = PROPAGATION_LIMIT, delivery_limit = DELIVERY_LIMIT, enforce_ratchets = False, + enforce_stamps = False): random.seed(os.urandom(10)) @@ -94,6 +96,7 @@ class LXMRouter: self.propagation_per_transfer_limit = propagation_limit self.delivery_per_transfer_limit = delivery_limit self.enforce_ratchets = enforce_ratchets + self._enforce_stamps = enforce_stamps self.wants_download_on_path_available_from = None self.wants_download_on_path_available_to = None @@ -104,6 +107,9 @@ class LXMRouter: self.active_propagation_links = [] self.locally_delivered_transient_ids = {} self.locally_processed_transient_ids = {} + self.outbound_stamp_costs = {} + + self.cost_file_lock = threading.Lock() if identity == None: identity = RNS.Identity() @@ -148,6 +154,19 @@ class LXMRouter: except Exception as e: RNS.log("Could not load locally delivered message ID cache from storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + try: + if os.path.isfile(self.storagepath+"/outbound_stamp_costs"): + with self.cost_file_lock: + with open(self.storagepath+"/outbound_stamp_costs", "rb") as outbound_stamp_cost_file: + data = outbound_stamp_cost_file.read() + self.outbound_stamp_costs = msgpack.unpackb(data) + + self.clean_outbound_stamp_costs() + self.save_outbound_stamp_costs() + + except Exception as e: + RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + atexit.register(self.exit_handler) job_thread = threading.Thread(target=self.jobloop) @@ -156,8 +175,7 @@ class LXMRouter: def announce(self, destination_hash): if destination_hash in self.delivery_destinations: - delivery_destination = self.delivery_destinations[destination_hash] - delivery_destination.announce(delivery_destination.display_name.encode("utf-8")) + self.delivery_destinations[destination_hash].announce(app_data=self.get_announce_app_data(destination_hash)) def announce_propagation_node(self): def delayed_announce(): @@ -175,7 +193,7 @@ class LXMRouter: da_thread.setDaemon(True) da_thread.start() - def register_delivery_identity(self, identity, display_name = None): + def register_delivery_identity(self, identity, display_name = None, stamp_cost = None): if not os.path.isdir(self.ratchetpath): os.makedirs(self.ratchetpath) @@ -184,11 +202,16 @@ class LXMRouter: delivery_destination.set_packet_callback(self.delivery_packet) delivery_destination.set_link_established_callback(self.delivery_link_established) delivery_destination.display_name = display_name + delivery_destination.stamp_cost = stamp_cost + if self.enforce_ratchets: delivery_destination.enforce_ratchets() if display_name != None: - delivery_destination.set_default_app_data(display_name.encode("utf-8")) + def get_app_data(): + return self.get_announce_app_data(delivery_destination) + + delivery_destination.set_default_app_data(get_app_data) self.delivery_destinations[delivery_destination.hash] = delivery_destination return delivery_destination @@ -380,6 +403,12 @@ class LXMRouter: self.propagation_node = False self.announce_propagation_node() + def enforce_stamps(self): + self._enforce_stamps = True + + def ignore_stamps(self): + self._enforce_stamps = False + def ignore_destination(self, destination_hash): if not destination_hash in self.ignored_list: self.ignored_list.append(destination_hash) @@ -553,6 +582,31 @@ class LXMRouter: self.locally_processed_transient_ids.pop(transient_id) RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from locally processed cache", RNS.LOG_DEBUG) + def update_stamp_cost(self, destination_hash, stamp_cost): + RNS.log(f"Updating outbound stamp cost for {RNS.prettyhexrep(destination_hash)} to {stamp_cost}", RNS.LOG_DEBUG) + self.outbound_stamp_costs[destination_hash] = [time.time(), stamp_cost] + + def job(): + self.save_outbound_stamp_costs() + threading.Thread(target=self.save_outbound_stamp_costs, daemon=True).start() + + def get_announce_app_data(self, destination_hash): + if destination_hash in self.delivery_destinations: + delivery_destination = self.delivery_destinations[destination_hash] + + display_name = None + if delivery_destination.display_name != None: + display_name = delivery_destination.display_name.encode("utf-8") + + stamp_cost = None + if delivery_destination.stamp_cost != None and type(delivery_destination.stamp_cost) == int: + if delivery_destination.stamp_cost > 0 and delivery_destination.stamp_cost < 255: + stamp_cost = delivery_destination.stamp_cost + + peer_data = [display_name, stamp_cost] + + return msgpack.packb(peer_data) + def get_weight(self, transient_id): dst_hash = self.propagation_entries[transient_id][0] lxm_rcvd = self.propagation_entries[transient_id][2] @@ -678,6 +732,36 @@ class LXMRouter: except Exception as e: RNS.log("Could not save locally processed message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + def clean_outbound_stamp_costs(self): + try: + expired = [] + for destination_hash in self.outbound_stamp_costs: + entry = self.outbound_stamp_costs[destination_hash] + if time.time() > entry[0] + LXMRouter.STAMP_COST_EXPIRY: + expired.append(destination_hash) + + for destination_hash in expired: + RNS.log(f"Cleaning expired stamp cost for {destination_hash}") # TODO: Remove + self.outbound_stamp_costs.pop(destination_hash) + + except Exception as e: + RNS.log(f"Error while cleaning outbound stamp costs. The contained exception was: {e}", RNS.LOG_ERROR) + RNS.trace_exception(e) + + def save_outbound_stamp_costs(self): + with self.cost_file_lock: + try: + RNS.log("Saving outbound stamp costs...", RNS.LOG_DEBUG) # TODO: Remove + if not os.path.isdir(self.storagepath): + os.makedirs(self.storagepath) + + locally_processed_file = open(self.storagepath+"/outbound_stamp_costs", "wb") + locally_processed_file.write(msgpack.packb(self.outbound_stamp_costs)) + locally_processed_file.close() + + except Exception as e: + RNS.log("Could not save locally processed message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + def exit_handler(self): if self.propagation_node: try: @@ -919,6 +1003,13 @@ class LXMRouter: return False def handle_outbound(self, lxmessage): + if lxmessage.stamp_cost == None: + destination_hash = lxmessage.get_destination().hash + if destination_hash in self.outbound_stamp_costs: + stamp_cost = self.outbound_stamp_costs[destination_hash][1] + lxmessage.stamp_cost = stamp_cost + RNS.log(f"No stamp cost set on LXM to {RNS.prettyhexrep(destination_hash)}, autoconfigured to {stamp_cost}, as required by latest announce", RNS.LOG_DEBUG) + lxmessage.state = LXMessage.OUTBOUND if not lxmessage.packed: lxmessage.pack() @@ -946,6 +1037,20 @@ class LXMRouter: try: message = LXMessage.unpack_from_bytes(lxmf_data) + required_stamp_cost = self.delivery_destinations[message.destination_hash].stamp_cost + if required_stamp_cost != None: + if message.validate_stamp(required_stamp_cost): + message.stamp_valid = True + else: + message.stamp_valid = False + + if not message.stamp_valid: + if self._enforce_stamps: + RNS.log(f"Dropping {message} with invalid stamp", RNS.LOG_NOTICE) + return False + else: + RNS.log(f"Received {message} with invalid stamp, but allowing anyway, since stamp enforcement is disabled", RNS.LOG_NOTICE) + if phy_stats != None: if "rssi" in phy_stats: message.rssi = phy_stats["rssi"] if "snr" in phy_stats: message.snr = phy_stats["snr"] diff --git a/LXMF/LXMessage.py b/LXMF/LXMessage.py index a79f600..76a8661 100644 --- a/LXMF/LXMessage.py +++ b/LXMF/LXMessage.py @@ -125,6 +125,7 @@ class LXMessage: self.packed = None self.stamp = None self.stamp_cost = stamp_cost + self.stamp_valid = False self.state = LXMessage.DRAFT self.method = LXMessage.UNKNOWN self.progress = 0.0