Implemented propagation fundamentals

This commit is contained in:
Mark Qvist 2021-10-03 18:49:40 +02:00
parent 354e1932dc
commit 52dc086a26

View File

@ -197,6 +197,8 @@ class LXMessage:
if self.timestamp == None: if self.timestamp == None:
self.timestamp = time.time() self.timestamp = time.time()
self.propagation_packed = None
self.payload = [self.timestamp, self.title, self.content, self.fields] self.payload = [self.timestamp, self.title, self.content, self.fields]
hashed_part = b"" hashed_part = b""
@ -240,7 +242,7 @@ class LXMessage:
self.representation = LXMessage.PACKET self.representation = LXMessage.PACKET
self.__delivery_destination = self.__destination self.__delivery_destination = self.__destination
elif self.desired_method == LXMessage.DIRECT or self.desired_method == LXMessage.PROPAGATED: elif self.desired_method == LXMessage.DIRECT:
single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT
if content_size <= single_packet_content_limit: if content_size <= single_packet_content_limit:
self.method = self.desired_method self.method = self.desired_method
@ -248,6 +250,21 @@ class LXMessage:
else: else:
self.method = self.desired_method self.method = self.desired_method
self.representation = LXMessage.RESOURCE self.representation = LXMessage.RESOURCE
elif self.desired_method == LXMessage.PROPAGATED:
single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT
encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:])
self.propagation_packed = msgpack.packb([time.time(), [self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data]])
content_size = len(self.propagation_packed)
if content_size <= single_packet_content_limit:
self.method = self.desired_method
self.representation = LXMessage.PACKET
else:
self.method = self.desired_method
self.representation = LXMessage.RESOURCE
else: else:
raise ValueError("Attempt to re-pack LXMessage "+str(self)+" that was already packed") raise ValueError("Attempt to re-pack LXMessage "+str(self)+" that was already packed")
@ -270,12 +287,20 @@ class LXMessage:
self.resource_representation = self.__as_resource() self.resource_representation = self.__as_resource()
elif self.method == LXMessage.PROPAGATED: elif self.method == LXMessage.PROPAGATED:
# TODO: Implement propagation self.state = LXMessage.SENDING
pass
if self.representation == LXMessage.PACKET:
receipt = self.__as_packet().send()
receipt.set_delivery_callback(self.__mark_propagated)
receipt.set_timeout_callback(self.__link_packet_timed_out)
elif self.representation == LXMessage.RESOURCE:
self.resource_representation = self.__as_resource()
def determine_transport_encryption(self): def determine_transport_encryption(self):
if self.method == LXMessage.OPPORTUNISTIC: if self.method == LXMessage.OPPORTUNISTIC:
if self.destination.type == RNS.Destination.SINGLE: if self.__destination.type == RNS.Destination.SINGLE:
self.transport_encrypted = True self.transport_encrypted = True
self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC
elif destination_type == RNS.Destination.GROUP: elif destination_type == RNS.Destination.GROUP:
@ -288,10 +313,10 @@ class LXMessage:
self.transport_encrypted = True self.transport_encrypted = True
self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC
elif self.method == LXMessage.PROPAGATED: elif self.method == LXMessage.PROPAGATED:
if self.destination.type == RNS.Destination.SINGLE: if self.__destination.type == RNS.Destination.SINGLE:
self.transport_encrypted = True self.transport_encrypted = True
self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC
elif destination_type == RNS.Destination.GROUP: elif self.__destination.type == RNS.Destination.GROUP:
self.transport_encrypted = True self.transport_encrypted = True
self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_AES self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_AES
else: else:
@ -308,6 +333,13 @@ class LXMessage:
if self.__delivery_callback != None: if self.__delivery_callback != None:
self.__delivery_callback(self) self.__delivery_callback(self)
def __mark_propagated(self, receipt = None):
RNS.log("Received propagation success notification for "+str(self), RNS.LOG_DEBUG)
self.state = LXMessage.SENT
if self.__delivery_callback != None:
self.__delivery_callback(self)
def __resource_concluded(self, resource): def __resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE: if resource.status == RNS.Resource.COMPLETE:
self.__mark_delivered() self.__mark_delivered()
@ -315,6 +347,13 @@ class LXMessage:
resource.link.teardown() resource.link.teardown()
self.state = LXMessage.OUTBOUND self.state = LXMessage.OUTBOUND
def __propagation_resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE:
self.__mark_propagated()
else:
resource.link.teardown()
self.state = LXMessage.OUTBOUND
def __link_packet_timed_out(self, packet_receipt): def __link_packet_timed_out(self, packet_receipt):
packet_receipt.destination.teardown() packet_receipt.destination.teardown()
self.state = LXMessage.OUTBOUND self.state = LXMessage.OUTBOUND
@ -332,8 +371,10 @@ class LXMessage:
if self.method == LXMessage.OPPORTUNISTIC: if self.method == LXMessage.OPPORTUNISTIC:
return RNS.Packet(self.__delivery_destination, self.packed[LXMessage.DESTINATION_LENGTH:]) return RNS.Packet(self.__delivery_destination, self.packed[LXMessage.DESTINATION_LENGTH:])
elif self.method == LXMessage.DIRECT or self.method == LXMessage.PROPAGATED: elif self.method == LXMessage.DIRECT:
return RNS.Packet(self.__delivery_destination, self.packed) return RNS.Packet(self.__delivery_destination, self.packed)
elif self.method == LXMessage.PROPAGATED:
return RNS.Packet(self.__delivery_destination, self.propagation_packed)
def __as_resource(self): def __as_resource(self):
if not self.packed: if not self.packed:
@ -349,7 +390,13 @@ class LXMessage:
raise ConnectionError("Tried to synthesize resource for LXMF message on a link that was not active") raise ConnectionError("Tried to synthesize resource for LXMF message on a link that was not active")
self.progress = 0.0 self.progress = 0.0
if self.method == LXMessage.DIRECT:
return RNS.Resource(self.packed, self.__delivery_destination, callback = self.__resource_concluded, progress_callback = self.__update_transfer_progress) return RNS.Resource(self.packed, self.__delivery_destination, callback = self.__resource_concluded, progress_callback = self.__update_transfer_progress)
elif self.method == LXMessage.PROPAGATED:
return RNS.Resource(self.propagation_packed, self.__delivery_destination, callback = self.__propagation_resource_concluded, progress_callback = self.__update_transfer_progress)
else:
return None
def write_to_directory(self, directory_path): def write_to_directory(self, directory_path):
file_name = RNS.hexrep(self.hash, delimit=False) file_name = RNS.hexrep(self.hash, delimit=False)
@ -484,9 +531,10 @@ class LXMFPropagationAnnounceHandler:
def received_announce(self, destination_hash, announced_identity, app_data): def received_announce(self, destination_hash, announced_identity, app_data):
try: try:
if type(app_data) == bytes:
data = msgpack.unpackb(app_data) data = msgpack.unpackb(app_data)
if self.lxmrouter.autopeer: if self.lxmrouter.propagation_node and self.lxmrouter.autopeer:
if data[0] == True: if data[0] == True:
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth: if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
self.lxmrouter.peer(destination_hash, data[1]) self.lxmrouter.peer(destination_hash, data[1])
@ -655,17 +703,6 @@ class LXMRouter:
AUTOPEER = True AUTOPEER = True
AUTOPEER_MAXDEPTH = 4 AUTOPEER_MAXDEPTH = 4
# TODO: Remove
def ptest(self):
i1 = RNS.Identity()
i2 = RNS.Identity()
d1 = RNS.Destination(i1, RNS.Destination.IN, RNS.Destination.SINGLE, "test", "delivery")
d2 = RNS.Destination(i2, RNS.Destination.IN, RNS.Destination.SINGLE, "test", "delivery")
lxm = LXMessage(d1, d2, content="No content here")
return lxm
def __init__(self, identity = None, autopeer = AUTOPEER, autopeer_maxdepth = AUTOPEER_MAXDEPTH): def __init__(self, identity = None, autopeer = AUTOPEER, autopeer_maxdepth = AUTOPEER_MAXDEPTH):
self.pending_inbound = [] self.pending_inbound = []
self.pending_outbound = [] self.pending_outbound = []
@ -679,6 +716,9 @@ class LXMRouter:
self.propagation_node = False self.propagation_node = False
self.outbound_propagation_node = None
self.outbound_propagation_link = None
if identity == None: if identity == None:
identity = RNS.Identity() identity = RNS.Identity()
@ -717,6 +757,19 @@ class LXMRouter:
def register_delivery_callback(self, callback): def register_delivery_callback(self, callback):
self.__delivery_callback = callback self.__delivery_callback = callback
def set_outbound_propagation_node(self, destination_hash):
if len(destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8 or type(destination_hash) != bytes:
raise ValueError("Invalid destination hash for outbound propagation node")
else:
self.outbound_propagation_node = destination_hash
if self.outbound_propagation_link != None:
if self.outbound_propagation_link.destination.hash != destination_hash:
self.outbound_propagation_link.teardown()
self.outbound_propagation_link = None
def get_outbound_propagation_node(self):
return self.outbound_propagation_node
def announce(self, destination_hash): def announce(self, destination_hash):
if destination_hash in self.delivery_destinations: if destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash] delivery_destination = self.delivery_destinations[destination_hash]
@ -808,6 +861,7 @@ class LXMRouter:
self.propagation_node = True self.propagation_node = True
self.propagation_destination.set_link_established_callback(self.propagation_link_established) self.propagation_destination.set_link_established_callback(self.propagation_link_established)
self.propagation_destination.set_packet_callback(self.propagation_packet)
self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL) self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL)
@ -857,11 +911,31 @@ class LXMRouter:
return None return None
def propagation_link_established(self, link): def propagation_link_established(self, link):
link.set_packet_callback(self.propagation_packet)
link.set_resource_strategy(RNS.Link.ACCEPT_ALL) link.set_resource_strategy(RNS.Link.ACCEPT_ALL)
link.set_resource_started_callback(self.resource_transfer_began) link.set_resource_started_callback(self.resource_transfer_began)
link.set_resource_concluded_callback(self.propagation_resource_concluded) link.set_resource_concluded_callback(self.propagation_resource_concluded)
def propagation_packet(self, data, packet):
try:
if packet.destination_type != RNS.Destination.LINK:
pass
else:
data = msgpack.unpackb(data)
remote_timebase = data[0]
messages = data[1]
for lxmf_data in messages:
self.lxmf_propagation(lxmf_data)
packet.prove()
except Exception as e:
RNS.log("Exception occurred while parsing incoming LXMF propagation data.", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
def propagation_resource_concluded(self, resource): def propagation_resource_concluded(self, resource):
RNS.log("Transfer concluded for propagation resource "+str(resource), RNS.LOG_DEBUG) RNS.log("Transfer concluded for propagation resource "+str(resource), RNS.LOG_DEBUG)
if resource.status == RNS.Resource.COMPLETE: if resource.status == RNS.Resource.COMPLETE:
@ -870,6 +944,9 @@ class LXMRouter:
# already handled messages. # already handled messages.
try: try:
data = msgpack.unpackb(resource.data.read()) data = msgpack.unpackb(resource.data.read())
if type(data) == list and len(data) == 2 and type(data[0] == float) and type(data[1]) == list:
# This is a series of propagation messages from a peer or originator
remote_timebase = data[0] remote_timebase = data[0]
remote_hash = None remote_hash = None
remote_identity = resource.link.get_remote_identity() remote_identity = resource.link.get_remote_identity()
@ -883,15 +960,17 @@ class LXMRouter:
messages = data[1] messages = data[1]
for lxmf_data in messages: for lxmf_data in messages:
if remote_hash in self.peers: if remote_hash != None and remote_hash in self.peers:
transient_id = RNS.Identity.full_hash(lxmf_data) transient_id = RNS.Identity.full_hash(lxmf_data)
peer = self.peers[remote_hash] peer = self.peers[remote_hash]
peer.handled_messages[transient_id] = [transient_id, remote_timebase, lxmf_data] peer.handled_messages[transient_id] = [transient_id, remote_timebase, lxmf_data]
self.lxmf_propagation(lxmf_data) self.lxmf_propagation(lxmf_data)
else:
RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG)
except Exception as e: except Exception as e:
RNS.log("Error while unpacking received propagation messages", RNS.LOG_DEBUG) RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG)
def lxmf_propagation(self, lxmf_data): def lxmf_propagation(self, lxmf_data):
@ -901,12 +980,20 @@ class LXMRouter:
if not transient_id in self.propagation_entries and not transient_id in self.propagated_ids: if not transient_id in self.propagation_entries and not transient_id in self.propagated_ids:
received = time.time() received = time.time()
propagation_entry = [transient_id, received, lxmf_data] propagation_entry = [transient_id, received, lxmf_data]
destination_hash = lxmf_data[:LXMessage.DESTINATION_LENGTH]
if destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash]
encrypted_lxmf_data = lxmf_data[LXMessage.DESTINATION_LENGTH:]
decrypted_lxmf_data = delivery_destination.decrypt(encrypted_lxmf_data)
delivery_data = lxmf_data[:LXMessage.DESTINATION_LENGTH]+decrypted_lxmf_data
self.lxmf_delivery(delivery_data, delivery_destination.type)
else:
if self.propagation_node:
self.propagation_entries[transient_id] = propagation_entry self.propagation_entries[transient_id] = propagation_entry
# TODO: Remove?
RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG) RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG)
for peer_id in self.peers: for peer_id in self.peers:
peer = self.peers[peer_id] peer = self.peers[peer_id]
peer.handle_message(transient_id) peer.handle_message(transient_id)
@ -995,6 +1082,9 @@ class LXMRouter:
if lxmessage.state == LXMessage.DELIVERED: if lxmessage.state == LXMessage.DELIVERED:
RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage) self.pending_outbound.remove(lxmessage)
elif lxmessage.state == LXMessage.SENT:
RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
else: else:
RNS.log("Starting outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) RNS.log("Starting outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
# Outbound handling for opportunistic messages # Outbound handling for opportunistic messages
@ -1064,4 +1154,53 @@ class LXMRouter:
# propagation to a LXMF router network. # propagation to a LXMF router network.
elif lxmessage.method == LXMessage.PROPAGATED: elif lxmessage.method == LXMessage.PROPAGATED:
RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
raise NotImplementedError("LXMF propagation is not implemented yet")
if self.outbound_propagation_node == None:
RNS.log("No outbound propagation node specified for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_ERROR)
self.fail_message(lxmessage)
else:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
if self.outbound_propagation_link != None:
# A link already exists, so we'll try to use it
# to deliver the message
if self.outbound_propagation_link.status == RNS.Link.ACTIVE:
if lxmessage.state != LXMessage.SENDING:
RNS.log("Starting propagation transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" via "+RNS.prettyhexrep(self.outbound_propagation_node), RNS.LOG_DEBUG)
lxmessage.set_delivery_destination(self.outbound_propagation_link)
lxmessage.send()
else:
if lxmessage.representation == LXMessage.RESOURCE:
RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
else:
RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
elif self.outbound_propagation_link.status == RNS.Link.CLOSED:
RNS.log("The link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" was closed", RNS.LOG_DEBUG)
self.outbound_propagation_link = None
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
else:
# Simply wait for the link to become
# active or close
RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active: "+str(self.outbound_propagation_link.status), RNS.LOG_DEBUG)
else:
# No link exists, so we'll try to establish one, but
# only if we've never tried before, or the retry wait
# period has elapsed.
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS:
if RNS.Transport.has_path(self.outbound_propagation_node):
RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node)
propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound)
else:
RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG)
RNS.Transport.request_path(self.outbound_propagation_node)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
else:
RNS.log("Max delivery attempts reached for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)