From dee3a33907f0bc1fada5e67329f2e17bfe129ced Mon Sep 17 00:00:00 2001 From: Pavol Rusnak Date: Mon, 7 Oct 2024 10:47:57 +0200 Subject: [PATCH] remove dangling spaces --- LXMF/LXMPeer.py | 12 ++++---- LXMF/LXMRouter.py | 64 +++++++++++++++++++++--------------------- LXMF/LXMessage.py | 24 ++++++++-------- LXMF/LXStamper.py | 14 ++++----- LXMF/Utilities/lxmd.py | 18 ++++++------ README.md | 2 +- docs/example_sender.py | 2 +- 7 files changed, 68 insertions(+), 68 deletions(-) diff --git a/LXMF/LXMPeer.py b/LXMF/LXMPeer.py index 818ee0d..377dfa2 100644 --- a/LXMF/LXMPeer.py +++ b/LXMF/LXMPeer.py @@ -104,7 +104,7 @@ class LXMPeer: self.unhandled_messages = {} self.handled_messages = {} self.last_offer = [] - + self.router = router self.destination_hash = destination_hash self.identity = RNS.Identity.recall(destination_hash) @@ -125,7 +125,7 @@ class LXMPeer: if not RNS.Transport.has_path(self.destination_hash): RNS.log(f"Path request was not answered, retrying sync with peer {RNS.prettyhexrep(self.destination_hash)} later", RNS.LOG_DEBUG) - + else: if self.identity == None: self.identity = RNS.Identity.recall(destination_hash) @@ -197,7 +197,7 @@ class LXMPeer: RNS.log(f"Sync request to peer {self.destination} failed", RNS.LOG_DEBUG) if self.link != None: self.link.teardown() - + self.state = LXMPeer.IDLE def offer_response(self, request_receipt): @@ -220,7 +220,7 @@ class LXMPeer: for transient_id in self.last_offer: if transient_id in self.unhandled_messages: self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id) - + elif response == True: # Peer wants all advertised messages @@ -282,7 +282,7 @@ class LXMPeer: for transient_id in resource.transferred_messages: message = self.unhandled_messages.pop(transient_id) self.handled_messages[transient_id] = message - + if self.link != None: self.link.teardown() @@ -292,7 +292,7 @@ class LXMPeer: RNS.log(f"Sync to peer {RNS.prettyhexrep(self.destination_hash)} completed", RNS.LOG_DEBUG) self.alive = True self.last_heard = time.time() - + else: RNS.log(f"Resource transfer for LXMF peer sync failed to {self.destination}", RNS.LOG_DEBUG) if self.link != None: diff --git a/LXMF/LXMRouter.py b/LXMF/LXMRouter.py index ef95480..7c607ad 100644 --- a/LXMF/LXMRouter.py +++ b/LXMF/LXMRouter.py @@ -276,7 +276,7 @@ class LXMRouter: delivery_destination.stamp_cost = stamp_cost else: return False - + return True return False @@ -439,7 +439,7 @@ class LXMRouter: except Exception as e: RNS.log(f"Could not read LXM from message store. The contained exception was: {e}", RNS.LOG_ERROR) - + if os.path.isfile(f"{self.storagepath}/peers"): peers_file = open(f"{self.storagepath}/peers", "rb") peers_data = peers_file.read() @@ -518,7 +518,7 @@ class LXMRouter: self.message_storage_limit = int(limit_bytes) else: raise ValueError(f"Cannot set LXMF information storage limit to {limit_bytes}") - + except Exception as e: raise ValueError(f"Cannot set LXMF information storage limit to {limit_bytes}") @@ -551,7 +551,7 @@ class LXMRouter: self.information_storage_limit = int(limit_bytes) else: raise ValueError(f"Cannot set LXMF information storage limit to {limit_bytes}") - + except Exception as e: raise ValueError(f"Cannot set LXMF information storage limit to {limit_bytes}") @@ -633,7 +633,7 @@ class LXMRouter: for link in inactive_links: self.active_propagation_links.remove(link) link.teardown() - + except Exception as e: RNS.log(f"An error occurred while cleaning inbound propagation links. The contained exception was: {e}", RNS.LOG_ERROR) @@ -676,7 +676,7 @@ class LXMRouter: def update_stamp_cost(self, destination_hash, stamp_cost): RNS.log(f"Updating outbound stamp cost for {RNS.prettyhexrep(destination_hash)} to {stamp_cost}", RNS.LOG_DEBUG) self.outbound_stamp_costs[destination_hash] = [time.time(), stamp_cost] - + def job(): self.save_outbound_stamp_costs() threading.Thread(target=self.save_outbound_stamp_costs, daemon=True).start() @@ -684,7 +684,7 @@ class LXMRouter: def get_announce_app_data(self, destination_hash): if destination_hash in self.delivery_destinations: delivery_destination = self.delivery_destinations[destination_hash] - + display_name = None if delivery_destination.display_name != None: display_name = delivery_destination.display_name.encode("utf-8") @@ -710,7 +710,7 @@ class LXMRouter: priority_weight = 0.1 else: priority_weight = 1.0 - + weight = priority_weight * age_weight * lxm_size return weight @@ -732,7 +732,7 @@ class LXMRouter: if validity_left > LXMessage.TICKET_RENEW: RNS.log(f"Found generated ticket for {RNS.prettyhexrep(destination_hash)} with {RNS.prettytime(validity_left)} of validity left, re-using this one", RNS.LOG_DEBUG) return [expires, ticket] - + else: self.available_tickets["inbound"][destination_hash] = {} @@ -800,7 +800,7 @@ class LXMRouter: else: RNS.log(f"Purging message {RNS.prettyhexrep(transient_id)} due to invalid file path", RNS.LOG_WARNING) removed_entries[transient_id] = filepath - + removed_count = 0 for transient_id in removed_entries: try: @@ -844,15 +844,15 @@ class LXMRouter: if os.path.isfile(filepath): os.unlink(filepath) - + self.propagation_entries.pop(transient_id) bytes_cleaned += entry[3] RNS.log(f"Removed {RNS.prettyhexrep(transient_id)} with weight {w[1]} to clear up {RNS.prettysize(entry[3])}, now cleaned {RNS.prettysize(bytes_cleaned)} out of {RNS.prettysize(bytes_needed)} needed", RNS.LOG_EXTREME) - + except Exception as e: RNS.log(f"Error while cleaning LXMF message from message store. The contained exception was: {e}", RNS.LOG_ERROR) - + finally: i += 1 @@ -894,7 +894,7 @@ class LXMRouter: for destination_hash in expired: self.outbound_stamp_costs.pop(destination_hash) - + except Exception as e: RNS.log(f"Error while cleaning outbound stamp costs. The contained exception was: {e}", RNS.LOG_ERROR) RNS.trace_exception(e) @@ -935,7 +935,7 @@ class LXMRouter: for inbound_ticket in expired_inbound: self.available_tickets["inbound"][destination_hash].pop(inbound_ticket) - + except Exception as e: RNS.log(f"Error while cleaning available tickets. The contained exception was: {e}", RNS.LOG_ERROR) RNS.trace_exception(e) @@ -972,7 +972,7 @@ class LXMRouter: if not "last_deliveries" in self.available_tickets: RNS.log("Missing local_deliveries entry in loaded available tickets, recreating...", RNS.LOG_ERROR) self.available_tickets["last_deliveries"] = {} - + except Exception as e: RNS.log(f"An error occurred while reloading available tickets from storage: {e}", RNS.LOG_ERROR) @@ -1002,7 +1002,7 @@ class LXMRouter: ### Message Download ################################## ####################################################### - + def request_messages_path_job(self): job_thread = threading.Thread(target=self.__request_messages_path_job) job_thread.setDaemon(True) @@ -1018,21 +1018,21 @@ class LXMRouter: else: RNS.log("Propagation node path request timed out", RNS.LOG_DEBUG) self.acknowledge_sync_completion(failure_state=LXMRouter.PR_NO_PATH) - + def identity_allowed(self, identity): if self.auth_required: if identity.hash in self.allowed_list: return True else: return False - + else: return True def message_get_request(self, path, data, request_id, remote_identity, requested_at): if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY - + elif not self.identity_allowed(remote_identity): return LXMPeer.ERROR_NO_ACCESS @@ -1069,7 +1069,7 @@ class LXMRouter: self.propagation_entries.pop(transient_id) os.unlink(filepath) RNS.log(f"Client {RNS.prettyhexrep(remote_destination.hash)} purged message {RNS.prettyhexrep(transient_id)} at {filepath}", RNS.LOG_DEBUG) - + except Exception as e: RNS.log(f"Error while processing message purge request from {RNS.prettyhexrep(remote_destination.hash)}. The contained exception was: {e}", RNS.LOG_ERROR) @@ -1111,7 +1111,7 @@ class LXMRouter: return response_messages - + except Exception as e: RNS.log(f"Error occurred while generating response for download request, the contained exception was: {e}", RNS.LOG_DEBUG) return None @@ -1223,7 +1223,7 @@ class LXMRouter: return True else: return False - + def handle_outbound(self, lxmessage): destination_hash = lxmessage.get_destination().hash @@ -1284,7 +1284,7 @@ class LXMRouter: for lxm_id in self.pending_deferred_stamps: if self.pending_deferred_stamps[lxm_id].hash == lxm_hash: return self.pending_deferred_stamps[lxm_id].progress - + return None def get_outbound_lxm_stamp_cost(self, lxm_hash): @@ -1295,7 +1295,7 @@ class LXMRouter: for lxm_id in self.pending_deferred_stamps: if self.pending_deferred_stamps[lxm_id].hash == lxm_hash: return self.pending_deferred_stamps[lxm_id].stamp_cost - + return None @@ -1471,7 +1471,7 @@ class LXMRouter: peer.peering_timebase = timestamp peer.last_heard = time.time() peer.propagation_transfer_limit = propagation_transfer_limit - + else: peer = LXMPeer(self, destination_hash) peer.alive = True @@ -1518,7 +1518,7 @@ class LXMRouter: reverse=True )[0:min(LXMRouter.FASTEST_N_RANDOM_POOL, len(waiting_peers))] peer_pool.extend(fastest_peers) - + unknown_speed_peers = [p for p in waiting_peers if p.link_establishment_rate == 0] if len(unknown_speed_peers) > 0: peer_pool.extend( @@ -1530,11 +1530,11 @@ class LXMRouter: ) RNS.log(f"Selecting peer to sync from {len(waiting_peers)} waiting peers.", RNS.LOG_DEBUG) - + elif len(unresponsive_peers) > 0: RNS.log(f"No active peers available, randomly selecting peer to sync from {len(unresponsive_peers)} unresponsive peers.", RNS.LOG_DEBUG) peer_pool = unresponsive_peers - + if len(peer_pool) > 0: selected_index = random.randint(0,len(peer_pool)-1) selected_peer = peer_pool[selected_index] @@ -1642,7 +1642,7 @@ class LXMRouter: self.lxmf_propagation(lxmf_data) else: RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG) - + except Exception as e: RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG) @@ -1717,7 +1717,7 @@ class LXMRouter: else: lxmf_data = base64.urlsafe_b64decode(f"{uri.replace(f'{LXMessage.URI_SCHEMA}://', '').replace('/', '')}==") transient_id = RNS.Identity.full_hash(lxmf_data) - + router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, is_paper_message=True) if router_propagation_result != False: RNS.log(f"LXM with transient ID {RNS.prettyhexrep(transient_id)} was ingested.", RNS.LOG_DEBUG) @@ -1851,7 +1851,7 @@ class LXMRouter: if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS: delivery_destination_hash = lxmessage.get_destination().hash direct_link = None - + if delivery_destination_hash in self.direct_links: # An established direct link already exists to # the destination, so we'll try to use it for diff --git a/LXMF/LXMessage.py b/LXMF/LXMessage.py index 2865726..8213f2f 100644 --- a/LXMF/LXMessage.py +++ b/LXMF/LXMessage.py @@ -63,7 +63,7 @@ class LXMessage: # we can send in a single encrypted packet is # 391 bytes. ENCRYPTED_PACKET_MDU = RNS.Packet.ENCRYPTED_MDU + TIMESTAMP_SIZE - + # The max content length we can fit in LXMF message # inside a single RNS packet is the encrypted MDU, minus # the LXMF overhead. We can optimise a bit though, by @@ -74,7 +74,7 @@ class LXMessage: # LXMF message we can send is 295 bytes. If a message # is larger than that, a Reticulum link will be used. ENCRYPTED_PACKET_MAX_CONTENT = ENCRYPTED_PACKET_MDU - LXMF_OVERHEAD + DESTINATION_LENGTH - + # Links can carry a larger MDU, due to less overhead per # packet. The link MDU with default Reticulum parameters # is 431 bytes. @@ -183,7 +183,7 @@ class LXMessage: self.__delivery_destination = None self.__delivery_callback = None self.failed_callback = None - + self.deferred_stamp_generating = False def set_title_from_string(self, title_string): @@ -327,7 +327,7 @@ class LXMessage: self.stamp_value = value self.stamp_valid = True return generated_stamp - + else: return None @@ -352,7 +352,7 @@ class LXMessage: self.stamp = self.get_stamp() if self.stamp != None: self.payload.append(self.stamp) - + signed_part = b"" signed_part += hashed_part signed_part += self.hash @@ -372,7 +372,7 @@ class LXMessage: # one will be chosen according to these rules: if self.desired_method == None: self.desired_method = LXMessage.DIRECT - + # If opportunistic delivery was requested, check # that message will fit within packet size limits if self.desired_method == LXMessage.OPPORTUNISTIC: @@ -445,7 +445,7 @@ class LXMessage: self.progress = 0.50 self.ratchet_id = lxm_packet.ratchet_id self.state = LXMessage.SENT - + elif self.method == LXMessage.DIRECT: self.state = LXMessage.SENDING @@ -577,7 +577,7 @@ class LXMessage: def __link_packet_timed_out(self, packet_receipt): if packet_receipt: packet_receipt.destination.teardown() - + self.state = LXMessage.OUTBOUND @@ -662,7 +662,7 @@ class LXMessage: if finalise: self.determine_transport_encryption() self.__mark_paper_generated() - + return lxm_uri else: @@ -703,7 +703,7 @@ class LXMessage: signature = lxmf_bytes[2*LXMessage.DESTINATION_LENGTH:2*LXMessage.DESTINATION_LENGTH+LXMessage.SIGNATURE_LENGTH] packed_payload = lxmf_bytes[2*LXMessage.DESTINATION_LENGTH+LXMessage.SIGNATURE_LENGTH:] unpacked_payload = msgpack.unpackb(packed_payload) - + # Extract stamp from payload if included if len(unpacked_payload) > 4: stamp = unpacked_payload[4] @@ -725,7 +725,7 @@ class LXMessage: destination = RNS.Destination(destination_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "delivery") else: destination = None - + source_identity = RNS.Identity.recall(source_hash) if source_identity != None: source = RNS.Destination(source_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "delivery") @@ -769,7 +769,7 @@ class LXMessage: RNS.log(f"Error while validating LXMF message signature. The contained exception was: {e}", RNS.LOG_ERROR) return message - + @staticmethod def unpack_from_file(lxmf_file_handle): try: diff --git a/LXMF/LXStamper.py b/LXMF/LXStamper.py index 2023ec0..a62b719 100644 --- a/LXMF/LXStamper.py +++ b/LXMF/LXStamper.py @@ -31,13 +31,13 @@ def stamp_value(workblock, stamp): while ((i & (1 << (bits - 1))) == 0): i = (i << 1) value += 1 - + return value def generate_stamp(message_id, stamp_cost): RNS.log(f"Generating stamp with cost {stamp_cost} for {RNS.prettyhexrep(message_id)}...", RNS.LOG_DEBUG) workblock = stamp_workblock(message_id) - + start_time = time.time() stamp = None rounds = 0 @@ -51,7 +51,7 @@ def generate_stamp(message_id, stamp_cost): else: stamp, rounds = job_linux(stamp_cost, workblock) - + duration = time.time() - start_time speed = rounds/duration value = stamp_value(workblock, stamp) @@ -118,7 +118,7 @@ def job_linux(stamp_cost, workblock): stop_event.set() result_queue.put(pstamp) rounds_queue.put(rounds) - + job_procs = [] RNS.log(f"Starting {jobs} stamp generation workers", RNS.LOG_DEBUG) for jpn in range(jobs): @@ -175,12 +175,12 @@ def job_android(stamp_cost, workblock): # Android, so we need to manually dispatch and # manage workloads here, while periodically # checking in on the progress. - + stamp = None start_time = time.time() total_rounds = 0 rounds_per_worker = 1000 - + use_nacl = False try: import nacl.encoding @@ -255,7 +255,7 @@ def job_android(stamp_cost, workblock): elapsed = time.time() - start_time speed = total_rounds/elapsed RNS.log(f"Stamp generation running. {total_rounds} rounds completed so far, {int(speed)} rounds per second", RNS.LOG_DEBUG) - + except Exception as e: RNS.log(f"Stamp generation job error: {e}") RNS.trace_exception(e) diff --git a/LXMF/Utilities/lxmd.py b/LXMF/Utilities/lxmd.py index 6c43254..33e8e44 100644 --- a/LXMF/Utilities/lxmd.py +++ b/LXMF/Utilities/lxmd.py @@ -77,7 +77,7 @@ def apply_config(): active_configuration["peer_announce_interval"] = lxmd_config["lxmf"].as_int("announce_interval")*60 else: active_configuration["peer_announce_interval"] = None - + if "lxmf" in lxmd_config and "delivery_transfer_max_accepted_size" in lxmd_config["lxmf"]: active_configuration["delivery_transfer_max_accepted_size"] = lxmd_config["lxmf"].as_float("delivery_transfer_max_accepted_size") if active_configuration["delivery_transfer_max_accepted_size"] < 0.38: @@ -127,14 +127,14 @@ def apply_config(): active_configuration["message_storage_limit"] = 0.005 else: active_configuration["message_storage_limit"] = 2000 - + if "propagation" in lxmd_config and "propagation_transfer_max_accepted_size" in lxmd_config["propagation"]: active_configuration["propagation_transfer_max_accepted_size"] = lxmd_config["propagation"].as_float("propagation_transfer_max_accepted_size") if active_configuration["propagation_transfer_max_accepted_size"] < 0.38: active_configuration["propagation_transfer_max_accepted_size"] = 0.38 else: active_configuration["propagation_transfer_max_accepted_size"] = 256 - + if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]: active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations") else: @@ -259,7 +259,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo RNS.log(f"Could not parse the configuration at {configpath}", RNS.LOG_ERROR) RNS.log("Check your configuration file for errors!", RNS.LOG_ERROR) RNS.panic() - + apply_config() RNS.log(f"Configuration loaded from {configpath}", RNS.LOG_VERBOSE) @@ -268,7 +268,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo if verbosity != 0 or quietness != 0: targetloglevel = targetloglevel+verbosity-quietness - + # Start Reticulum RNS.log("Substantiating Reticulum...") reticulum = RNS.Reticulum(configdir=rnsconfigdir, loglevel=targetloglevel, logdest=targetlogdest) @@ -296,7 +296,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo RNS.log("Could not create and save a new Primary Identity", RNS.LOG_ERROR) RNS.log(f"The contained exception was: {e}", RNS.LOG_ERROR) exit(2) - + # Start LXMF message_router = LXMF.LXMRouter( identity = identity, @@ -326,7 +326,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo if len(active_configuration["allowed_identities"]) == 0: RNS.log(f"Clint authentication was enabled, but no identity hashes could be loaded from {allowedpath}. Nobody will be able to sync messages from this propagation node.", RNS.LOG_WARNING) - + for identity_hash in active_configuration["allowed_identities"]: message_router.allow(identity_hash) @@ -357,7 +357,7 @@ def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbo def jobs(): global active_configuration, last_peer_announce, last_node_announce global message_router, lxmf_destination - + while True: try: if "peer_announce_interval" in active_configuration and active_configuration["peer_announce_interval"] != None: @@ -406,7 +406,7 @@ def main(): parser.add_argument("-s", "--service", action="store_true", default=False, help="lxmd is running as a service and should log to file") parser.add_argument("--exampleconfig", action="store_true", default=False, help="print verbose configuration example to stdout and exit") parser.add_argument("--version", action="version", version=f"lxmd {__version__}") - + args = parser.parse_args() if args.exampleconfig: diff --git a/README.md b/README.md index faced95..550ca1e 100644 --- a/README.md +++ b/README.md @@ -193,6 +193,6 @@ LXMF is actively being developed, and the following improvements and features ar - ~~Update examples in readme to actually work~~ - ~~Sync affinity based on link speeds and distances, for more intelligently choosing peer sync order~~ - Sneakernet and physical transport functionality -- Content Destinations, and easy to use API for group messaging and discussion threads +- Content Destinations, and easy to use API for group messaging and discussion threads - Write and release full API and protocol documentation - Documenting and possibly expanding LXMF limits and priorities diff --git a/docs/example_sender.py b/docs/example_sender.py index bcb8d36..3a227c5 100644 --- a/docs/example_sender.py +++ b/docs/example_sender.py @@ -62,7 +62,7 @@ while True: # Finally dispatch the message to the LXMF message # router, which will handle the delivery according # to the specified message parameters and options: - + router.handle_outbound(lxm) # Wait for user input before starting over