Added allow_duplicate option to message ingest API

This commit is contained in:
Mark Qvist 2025-01-31 13:38:56 +01:00
parent a6cf585109
commit d5540b927f

@ -1618,7 +1618,7 @@ class LXMRouter:
### Message Routing & Delivery ########################
#######################################################
def lxmf_delivery(self, lxmf_data, destination_type = None, phy_stats = None, ratchet_id = None, method = None, no_stamp_enforcement=False):
def lxmf_delivery(self, lxmf_data, destination_type = None, phy_stats = None, ratchet_id = None, method = None, no_stamp_enforcement=False, allow_duplicate=False):
try:
message = LXMessage.unpack_from_bytes(lxmf_data)
if ratchet_id and not message.ratchet_id:
@ -1685,7 +1685,7 @@ class LXMRouter:
RNS.log(str(self)+" ignored message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG)
return False
if self.has_message(message.hash):
if not allow_duplicate and self.has_message(message.hash):
RNS.log(str(self)+" ignored already received message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG)
return False
else:
@ -2107,7 +2107,7 @@ class LXMRouter:
if peer != from_peer:
peer.queue_unhandled_message(transient_id)
def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, is_paper_message=False, from_peer=None):
def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False, is_paper_message=False, from_peer=None):
no_stamp_enforcement = False
if is_paper_message:
no_stamp_enforcement = True
@ -2116,9 +2116,10 @@ class LXMRouter:
if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD:
transient_id = RNS.Identity.full_hash(lxmf_data)
if not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids:
if (not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids) or allow_duplicate == True:
received = time.time()
destination_hash = lxmf_data[:LXMessage.DESTINATION_LENGTH]
RNS.log("GOT MESSAGE FOR "+RNS.prettyhexrep(destination_hash))
self.locally_processed_transient_ids[transient_id] = received
@ -2128,7 +2129,7 @@ class LXMRouter:
decrypted_lxmf_data = delivery_destination.decrypt(encrypted_lxmf_data)
if decrypted_lxmf_data != None:
delivery_data = lxmf_data[:LXMessage.DESTINATION_LENGTH]+decrypted_lxmf_data
self.lxmf_delivery(delivery_data, delivery_destination.type, ratchet_id=delivery_destination.latest_ratchet_id, method=LXMessage.PROPAGATED, no_stamp_enforcement=no_stamp_enforcement)
self.lxmf_delivery(delivery_data, delivery_destination.type, ratchet_id=delivery_destination.latest_ratchet_id, method=LXMessage.PROPAGATED, no_stamp_enforcement=no_stamp_enforcement, allow_duplicate=allow_duplicate)
self.locally_delivered_transient_ids[transient_id] = time.time()
if signal_local_delivery != None:
@ -2166,7 +2167,7 @@ class LXMRouter:
RNS.trace_exception(e)
return False
def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None):
def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False):
try:
if not uri.lower().startswith(LXMessage.URI_SCHEMA+"://"):
RNS.log("Cannot ingest LXM, invalid URI provided.", RNS.LOG_ERROR)
@ -2176,7 +2177,7 @@ class LXMRouter:
lxmf_data = base64.urlsafe_b64decode(uri.replace(LXMessage.URI_SCHEMA+"://", "").replace("/", "")+"==")
transient_id = RNS.Identity.full_hash(lxmf_data)
router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, is_paper_message=True)
router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, allow_duplicate=allow_duplicate, is_paper_message=True)
if router_propagation_result != False:
RNS.log("LXM with transient ID "+RNS.prettyhexrep(transient_id)+" was ingested.", RNS.LOG_DEBUG)
return router_propagation_result