diff --git a/LXMF/LXMF.py b/LXMF/LXMF.py index c69a321..7502fb6 100644 --- a/LXMF/LXMF.py +++ b/LXMF/LXMF.py @@ -197,6 +197,8 @@ class LXMessage: if self.timestamp == None: self.timestamp = time.time() + self.propagation_packed = None + self.payload = [self.timestamp, self.title, self.content, self.fields] hashed_part = b"" @@ -240,7 +242,7 @@ class LXMessage: self.representation = LXMessage.PACKET self.__delivery_destination = self.__destination - elif self.desired_method == LXMessage.DIRECT or self.desired_method == LXMessage.PROPAGATED: + elif self.desired_method == LXMessage.DIRECT: single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT if content_size <= single_packet_content_limit: self.method = self.desired_method @@ -248,6 +250,21 @@ class LXMessage: else: self.method = self.desired_method self.representation = LXMessage.RESOURCE + + elif self.desired_method == LXMessage.PROPAGATED: + single_packet_content_limit = LXMessage.LINK_PACKET_MAX_CONTENT + + encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:]) + self.propagation_packed = msgpack.packb([time.time(), [self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data]]) + + content_size = len(self.propagation_packed) + if content_size <= single_packet_content_limit: + self.method = self.desired_method + self.representation = LXMessage.PACKET + else: + self.method = self.desired_method + self.representation = LXMessage.RESOURCE + else: raise ValueError("Attempt to re-pack LXMessage "+str(self)+" that was already packed") @@ -270,12 +287,20 @@ class LXMessage: self.resource_representation = self.__as_resource() elif self.method == LXMessage.PROPAGATED: - # TODO: Implement propagation - pass + self.state = LXMessage.SENDING + + if self.representation == LXMessage.PACKET: + receipt = self.__as_packet().send() + receipt.set_delivery_callback(self.__mark_propagated) + receipt.set_timeout_callback(self.__link_packet_timed_out) + + elif self.representation == LXMessage.RESOURCE: + self.resource_representation = self.__as_resource() + def determine_transport_encryption(self): if self.method == LXMessage.OPPORTUNISTIC: - if self.destination.type == RNS.Destination.SINGLE: + if self.__destination.type == RNS.Destination.SINGLE: self.transport_encrypted = True self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC elif destination_type == RNS.Destination.GROUP: @@ -288,10 +313,10 @@ class LXMessage: self.transport_encrypted = True self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC elif self.method == LXMessage.PROPAGATED: - if self.destination.type == RNS.Destination.SINGLE: + if self.__destination.type == RNS.Destination.SINGLE: self.transport_encrypted = True self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC - elif destination_type == RNS.Destination.GROUP: + elif self.__destination.type == RNS.Destination.GROUP: self.transport_encrypted = True self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_AES else: @@ -308,6 +333,13 @@ class LXMessage: if self.__delivery_callback != None: self.__delivery_callback(self) + def __mark_propagated(self, receipt = None): + RNS.log("Received propagation success notification for "+str(self), RNS.LOG_DEBUG) + self.state = LXMessage.SENT + + if self.__delivery_callback != None: + self.__delivery_callback(self) + def __resource_concluded(self, resource): if resource.status == RNS.Resource.COMPLETE: self.__mark_delivered() @@ -315,6 +347,13 @@ class LXMessage: resource.link.teardown() self.state = LXMessage.OUTBOUND + def __propagation_resource_concluded(self, resource): + if resource.status == RNS.Resource.COMPLETE: + self.__mark_propagated() + else: + resource.link.teardown() + self.state = LXMessage.OUTBOUND + def __link_packet_timed_out(self, packet_receipt): packet_receipt.destination.teardown() self.state = LXMessage.OUTBOUND @@ -332,8 +371,10 @@ class LXMessage: if self.method == LXMessage.OPPORTUNISTIC: return RNS.Packet(self.__delivery_destination, self.packed[LXMessage.DESTINATION_LENGTH:]) - elif self.method == LXMessage.DIRECT or self.method == LXMessage.PROPAGATED: + elif self.method == LXMessage.DIRECT: return RNS.Packet(self.__delivery_destination, self.packed) + elif self.method == LXMessage.PROPAGATED: + return RNS.Packet(self.__delivery_destination, self.propagation_packed) def __as_resource(self): if not self.packed: @@ -349,7 +390,13 @@ class LXMessage: raise ConnectionError("Tried to synthesize resource for LXMF message on a link that was not active") self.progress = 0.0 - return RNS.Resource(self.packed, self.__delivery_destination, callback = self.__resource_concluded, progress_callback = self.__update_transfer_progress) + + if self.method == LXMessage.DIRECT: + return RNS.Resource(self.packed, self.__delivery_destination, callback = self.__resource_concluded, progress_callback = self.__update_transfer_progress) + elif self.method == LXMessage.PROPAGATED: + return RNS.Resource(self.propagation_packed, self.__delivery_destination, callback = self.__propagation_resource_concluded, progress_callback = self.__update_transfer_progress) + else: + return None def write_to_directory(self, directory_path): file_name = RNS.hexrep(self.hash, delimit=False) @@ -484,14 +531,15 @@ class LXMFPropagationAnnounceHandler: def received_announce(self, destination_hash, announced_identity, app_data): try: - data = msgpack.unpackb(app_data) + if type(app_data) == bytes: + data = msgpack.unpackb(app_data) - if self.lxmrouter.autopeer: - if data[0] == True: - if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth: - self.lxmrouter.peer(destination_hash, data[1]) - elif data[0] == False: - self.lxmrouter.unpeer(destination_hash, data[1]) + if self.lxmrouter.propagation_node and self.lxmrouter.autopeer: + if data[0] == True: + if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth: + self.lxmrouter.peer(destination_hash, data[1]) + elif data[0] == False: + self.lxmrouter.unpeer(destination_hash, data[1]) except Exception as e: RNS.log("Error while evaluating propagation node announce, ignoring announce.", RNS.LOG_DEBUG) @@ -655,17 +703,6 @@ class LXMRouter: AUTOPEER = True AUTOPEER_MAXDEPTH = 4 - # TODO: Remove - def ptest(self): - i1 = RNS.Identity() - i2 = RNS.Identity() - d1 = RNS.Destination(i1, RNS.Destination.IN, RNS.Destination.SINGLE, "test", "delivery") - d2 = RNS.Destination(i2, RNS.Destination.IN, RNS.Destination.SINGLE, "test", "delivery") - - lxm = LXMessage(d1, d2, content="No content here") - - return lxm - def __init__(self, identity = None, autopeer = AUTOPEER, autopeer_maxdepth = AUTOPEER_MAXDEPTH): self.pending_inbound = [] self.pending_outbound = [] @@ -679,6 +716,9 @@ class LXMRouter: self.propagation_node = False + self.outbound_propagation_node = None + self.outbound_propagation_link = None + if identity == None: identity = RNS.Identity() @@ -717,6 +757,19 @@ class LXMRouter: def register_delivery_callback(self, callback): self.__delivery_callback = callback + def set_outbound_propagation_node(self, destination_hash): + if len(destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8 or type(destination_hash) != bytes: + raise ValueError("Invalid destination hash for outbound propagation node") + else: + self.outbound_propagation_node = destination_hash + if self.outbound_propagation_link != None: + if self.outbound_propagation_link.destination.hash != destination_hash: + self.outbound_propagation_link.teardown() + self.outbound_propagation_link = None + + def get_outbound_propagation_node(self): + return self.outbound_propagation_node + def announce(self, destination_hash): if destination_hash in self.delivery_destinations: delivery_destination = self.delivery_destinations[destination_hash] @@ -808,6 +861,7 @@ class LXMRouter: self.propagation_node = True self.propagation_destination.set_link_established_callback(self.propagation_link_established) + self.propagation_destination.set_packet_callback(self.propagation_packet) self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL) @@ -857,11 +911,31 @@ class LXMRouter: return None def propagation_link_established(self, link): + link.set_packet_callback(self.propagation_packet) link.set_resource_strategy(RNS.Link.ACCEPT_ALL) link.set_resource_started_callback(self.resource_transfer_began) link.set_resource_concluded_callback(self.propagation_resource_concluded) + def propagation_packet(self, data, packet): + try: + if packet.destination_type != RNS.Destination.LINK: + pass + else: + data = msgpack.unpackb(data) + remote_timebase = data[0] + + messages = data[1] + for lxmf_data in messages: + self.lxmf_propagation(lxmf_data) + + packet.prove() + + except Exception as e: + RNS.log("Exception occurred while parsing incoming LXMF propagation data.", RNS.LOG_ERROR) + RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) + + def propagation_resource_concluded(self, resource): RNS.log("Transfer concluded for propagation resource "+str(resource), RNS.LOG_DEBUG) if resource.status == RNS.Resource.COMPLETE: @@ -870,28 +944,33 @@ class LXMRouter: # already handled messages. try: data = msgpack.unpackb(resource.data.read()) - remote_timebase = data[0] - remote_hash = None - remote_identity = resource.link.get_remote_identity() - if remote_identity != None: - remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") - remote_hash = remote_destination.hash + if type(data) == list and len(data) == 2 and type(data[0] == float) and type(data[1]) == list: + # This is a series of propagation messages from a peer or originator + remote_timebase = data[0] + remote_hash = None + remote_identity = resource.link.get_remote_identity() - if not remote_hash in self.peers: - self.peer(remote_hash, remote_timebase) + if remote_identity != None: + remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") + remote_hash = remote_destination.hash - messages = data[1] - for lxmf_data in messages: - if remote_hash in self.peers: - transient_id = RNS.Identity.full_hash(lxmf_data) - peer = self.peers[remote_hash] - peer.handled_messages[transient_id] = [transient_id, remote_timebase, lxmf_data] + if not remote_hash in self.peers: + self.peer(remote_hash, remote_timebase) - self.lxmf_propagation(lxmf_data) + messages = data[1] + for lxmf_data in messages: + if remote_hash != None and remote_hash in self.peers: + transient_id = RNS.Identity.full_hash(lxmf_data) + peer = self.peers[remote_hash] + peer.handled_messages[transient_id] = [transient_id, remote_timebase, lxmf_data] + + self.lxmf_propagation(lxmf_data) + else: + RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG) except Exception as e: - RNS.log("Error while unpacking received propagation messages", RNS.LOG_DEBUG) + RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG) def lxmf_propagation(self, lxmf_data): @@ -901,15 +980,23 @@ class LXMRouter: if not transient_id in self.propagation_entries and not transient_id in self.propagated_ids: received = time.time() propagation_entry = [transient_id, received, lxmf_data] + destination_hash = lxmf_data[:LXMessage.DESTINATION_LENGTH] - self.propagation_entries[transient_id] = propagation_entry + if destination_hash in self.delivery_destinations: + delivery_destination = self.delivery_destinations[destination_hash] + encrypted_lxmf_data = lxmf_data[LXMessage.DESTINATION_LENGTH:] + decrypted_lxmf_data = delivery_destination.decrypt(encrypted_lxmf_data) + delivery_data = lxmf_data[:LXMessage.DESTINATION_LENGTH]+decrypted_lxmf_data + self.lxmf_delivery(delivery_data, delivery_destination.type) - # TODO: Remove? - RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG) + else: + if self.propagation_node: + self.propagation_entries[transient_id] = propagation_entry - for peer_id in self.peers: - peer = self.peers[peer_id] - peer.handle_message(transient_id) + RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG) + for peer_id in self.peers: + peer = self.peers[peer_id] + peer.handle_message(transient_id) return True @@ -995,6 +1082,9 @@ class LXMRouter: if lxmessage.state == LXMessage.DELIVERED: RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) self.pending_outbound.remove(lxmessage) + elif lxmessage.state == LXMessage.SENT: + RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG) + self.pending_outbound.remove(lxmessage) else: RNS.log("Starting outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) # Outbound handling for opportunistic messages @@ -1064,4 +1154,53 @@ class LXMRouter: # propagation to a LXMF router network. elif lxmessage.method == LXMessage.PROPAGATED: RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) - raise NotImplementedError("LXMF propagation is not implemented yet") \ No newline at end of file + + if self.outbound_propagation_node == None: + RNS.log("No outbound propagation node specified for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_ERROR) + self.fail_message(lxmessage) + else: + if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: + + if self.outbound_propagation_link != None: + # A link already exists, so we'll try to use it + # to deliver the message + if self.outbound_propagation_link.status == RNS.Link.ACTIVE: + if lxmessage.state != LXMessage.SENDING: + RNS.log("Starting propagation transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" via "+RNS.prettyhexrep(self.outbound_propagation_node), RNS.LOG_DEBUG) + lxmessage.set_delivery_destination(self.outbound_propagation_link) + lxmessage.send() + else: + if lxmessage.representation == LXMessage.RESOURCE: + RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG) + else: + RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG) + elif self.outbound_propagation_link.status == RNS.Link.CLOSED: + RNS.log("The link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" was closed", RNS.LOG_DEBUG) + self.outbound_propagation_link = None + lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT + else: + # Simply wait for the link to become + # active or close + RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active: "+str(self.outbound_propagation_link.status), RNS.LOG_DEBUG) + else: + # No link exists, so we'll try to establish one, but + # only if we've never tried before, or the retry wait + # period has elapsed. + if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt: + lxmessage.delivery_attempts += 1 + lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT + + if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS: + if RNS.Transport.has_path(self.outbound_propagation_node): + RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node) + propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation") + self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound) + else: + RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG) + RNS.Transport.request_path(self.outbound_propagation_node) + lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT + + else: + RNS.log("Max delivery attempts reached for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG) + self.fail_message(lxmessage) \ No newline at end of file