Added automatic signalling of stamp costs, stamp generation and validation

This commit is contained in:
Mark Qvist 2024-09-06 20:34:09 +02:00
parent 015dcc5631
commit dcb0a18cd7
2 changed files with 111 additions and 5 deletions

View File

@ -24,6 +24,7 @@ class LXMRouter:
P_LINK_MAX_INACTIVITY = 3*60
MESSAGE_EXPIRY = 30*24*60*60
STAMP_COST_EXPIRY = 45*24*60*60
NODE_ANNOUNCE_DELAY = 20
@ -58,7 +59,8 @@ class LXMRouter:
#######################################################
def __init__(self, identity = None, storagepath = None, autopeer = AUTOPEER, autopeer_maxdepth = None,
propagation_limit = PROPAGATION_LIMIT, delivery_limit = DELIVERY_LIMIT, enforce_ratchets = False):
propagation_limit = PROPAGATION_LIMIT, delivery_limit = DELIVERY_LIMIT, enforce_ratchets = False,
enforce_stamps = False):
random.seed(os.urandom(10))
@ -94,6 +96,7 @@ class LXMRouter:
self.propagation_per_transfer_limit = propagation_limit
self.delivery_per_transfer_limit = delivery_limit
self.enforce_ratchets = enforce_ratchets
self._enforce_stamps = enforce_stamps
self.wants_download_on_path_available_from = None
self.wants_download_on_path_available_to = None
@ -104,6 +107,9 @@ class LXMRouter:
self.active_propagation_links = []
self.locally_delivered_transient_ids = {}
self.locally_processed_transient_ids = {}
self.outbound_stamp_costs = {}
self.cost_file_lock = threading.Lock()
if identity == None:
identity = RNS.Identity()
@ -148,6 +154,19 @@ class LXMRouter:
except Exception as e:
RNS.log("Could not load locally delivered message ID cache from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
try:
if os.path.isfile(self.storagepath+"/outbound_stamp_costs"):
with self.cost_file_lock:
with open(self.storagepath+"/outbound_stamp_costs", "rb") as outbound_stamp_cost_file:
data = outbound_stamp_cost_file.read()
self.outbound_stamp_costs = msgpack.unpackb(data)
self.clean_outbound_stamp_costs()
self.save_outbound_stamp_costs()
except Exception as e:
RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
atexit.register(self.exit_handler)
job_thread = threading.Thread(target=self.jobloop)
@ -156,8 +175,7 @@ class LXMRouter:
def announce(self, destination_hash):
if destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash]
delivery_destination.announce(delivery_destination.display_name.encode("utf-8"))
self.delivery_destinations[destination_hash].announce(app_data=self.get_announce_app_data(destination_hash))
def announce_propagation_node(self):
def delayed_announce():
@ -175,7 +193,7 @@ class LXMRouter:
da_thread.setDaemon(True)
da_thread.start()
def register_delivery_identity(self, identity, display_name = None):
def register_delivery_identity(self, identity, display_name = None, stamp_cost = None):
if not os.path.isdir(self.ratchetpath):
os.makedirs(self.ratchetpath)
@ -184,11 +202,16 @@ class LXMRouter:
delivery_destination.set_packet_callback(self.delivery_packet)
delivery_destination.set_link_established_callback(self.delivery_link_established)
delivery_destination.display_name = display_name
delivery_destination.stamp_cost = stamp_cost
if self.enforce_ratchets:
delivery_destination.enforce_ratchets()
if display_name != None:
delivery_destination.set_default_app_data(display_name.encode("utf-8"))
def get_app_data():
return self.get_announce_app_data(delivery_destination)
delivery_destination.set_default_app_data(get_app_data)
self.delivery_destinations[delivery_destination.hash] = delivery_destination
return delivery_destination
@ -380,6 +403,12 @@ class LXMRouter:
self.propagation_node = False
self.announce_propagation_node()
def enforce_stamps(self):
self._enforce_stamps = True
def ignore_stamps(self):
self._enforce_stamps = False
def ignore_destination(self, destination_hash):
if not destination_hash in self.ignored_list:
self.ignored_list.append(destination_hash)
@ -553,6 +582,31 @@ class LXMRouter:
self.locally_processed_transient_ids.pop(transient_id)
RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from locally processed cache", RNS.LOG_DEBUG)
def update_stamp_cost(self, destination_hash, stamp_cost):
RNS.log(f"Updating outbound stamp cost for {RNS.prettyhexrep(destination_hash)} to {stamp_cost}", RNS.LOG_DEBUG)
self.outbound_stamp_costs[destination_hash] = [time.time(), stamp_cost]
def job():
self.save_outbound_stamp_costs()
threading.Thread(target=self.save_outbound_stamp_costs, daemon=True).start()
def get_announce_app_data(self, destination_hash):
if destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash]
display_name = None
if delivery_destination.display_name != None:
display_name = delivery_destination.display_name.encode("utf-8")
stamp_cost = None
if delivery_destination.stamp_cost != None and type(delivery_destination.stamp_cost) == int:
if delivery_destination.stamp_cost > 0 and delivery_destination.stamp_cost < 255:
stamp_cost = delivery_destination.stamp_cost
peer_data = [display_name, stamp_cost]
return msgpack.packb(peer_data)
def get_weight(self, transient_id):
dst_hash = self.propagation_entries[transient_id][0]
lxm_rcvd = self.propagation_entries[transient_id][2]
@ -678,6 +732,36 @@ class LXMRouter:
except Exception as e:
RNS.log("Could not save locally processed message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def clean_outbound_stamp_costs(self):
try:
expired = []
for destination_hash in self.outbound_stamp_costs:
entry = self.outbound_stamp_costs[destination_hash]
if time.time() > entry[0] + LXMRouter.STAMP_COST_EXPIRY:
expired.append(destination_hash)
for destination_hash in expired:
RNS.log(f"Cleaning expired stamp cost for {destination_hash}") # TODO: Remove
self.outbound_stamp_costs.pop(destination_hash)
except Exception as e:
RNS.log(f"Error while cleaning outbound stamp costs. The contained exception was: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
def save_outbound_stamp_costs(self):
with self.cost_file_lock:
try:
RNS.log("Saving outbound stamp costs...", RNS.LOG_DEBUG) # TODO: Remove
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
locally_processed_file = open(self.storagepath+"/outbound_stamp_costs", "wb")
locally_processed_file.write(msgpack.packb(self.outbound_stamp_costs))
locally_processed_file.close()
except Exception as e:
RNS.log("Could not save locally processed message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def exit_handler(self):
if self.propagation_node:
try:
@ -919,6 +1003,13 @@ class LXMRouter:
return False
def handle_outbound(self, lxmessage):
if lxmessage.stamp_cost == None:
destination_hash = lxmessage.get_destination().hash
if destination_hash in self.outbound_stamp_costs:
stamp_cost = self.outbound_stamp_costs[destination_hash][1]
lxmessage.stamp_cost = stamp_cost
RNS.log(f"No stamp cost set on LXM to {RNS.prettyhexrep(destination_hash)}, autoconfigured to {stamp_cost}, as required by latest announce", RNS.LOG_DEBUG)
lxmessage.state = LXMessage.OUTBOUND
if not lxmessage.packed:
lxmessage.pack()
@ -946,6 +1037,20 @@ class LXMRouter:
try:
message = LXMessage.unpack_from_bytes(lxmf_data)
required_stamp_cost = self.delivery_destinations[message.destination_hash].stamp_cost
if required_stamp_cost != None:
if message.validate_stamp(required_stamp_cost):
message.stamp_valid = True
else:
message.stamp_valid = False
if not message.stamp_valid:
if self._enforce_stamps:
RNS.log(f"Dropping {message} with invalid stamp", RNS.LOG_NOTICE)
return False
else:
RNS.log(f"Received {message} with invalid stamp, but allowing anyway, since stamp enforcement is disabled", RNS.LOG_NOTICE)
if phy_stats != None:
if "rssi" in phy_stats: message.rssi = phy_stats["rssi"]
if "snr" in phy_stats: message.snr = phy_stats["snr"]

View File

@ -125,6 +125,7 @@ class LXMessage:
self.packed = None
self.stamp = None
self.stamp_cost = stamp_cost
self.stamp_valid = False
self.state = LXMessage.DRAFT
self.method = LXMessage.UNKNOWN
self.progress = 0.0