diff --git a/nomadnet/Directory.py b/nomadnet/Directory.py index 90dd714..53c8572 100644 --- a/nomadnet/Directory.py +++ b/nomadnet/Directory.py @@ -3,6 +3,7 @@ import RNS import LXMF import time import nomadnet +import threading import RNS.vendor.umsgpack as msgpack class PNAnnounceHandler: @@ -55,6 +56,7 @@ class Directory: self.directory_entries = {} self.announce_stream = [] self.app = app + self.announce_lock = threading.Lock() self.load_from_disk() self.pn_announce_handler = PNAnnounceHandler(self) @@ -124,91 +126,94 @@ class Directory: RNS.log("Could not load directory from disk. The contained exception was: "+str(e), RNS.LOG_ERROR) def lxmf_announce_received(self, source_hash, app_data): - if app_data != None: - if self.app.compact_stream: - try: - remove_announces = [] - for announce in self.announce_stream: - if announce[1] == source_hash: - remove_announces.append(announce) + with self.announce_lock: + if app_data != None: + if self.app.compact_stream: + try: + remove_announces = [] + for announce in self.announce_stream: + if announce[1] == source_hash: + remove_announces.append(announce) - for a in remove_announces: - self.announce_stream.remove(a) + for a in remove_announces: + self.announce_stream.remove(a) + + except Exception as e: + RNS.log("An error occurred while compacting the announce stream. The contained exception was:"+str(e), RNS.LOG_ERROR) + + timestamp = time.time() + self.announce_stream.insert(0, (timestamp, source_hash, app_data, "peer")) + while len(self.announce_stream) > Directory.ANNOUNCE_STREAM_MAXLENGTH: + self.announce_stream.pop() + + if hasattr(self.app, "ui") and self.app.ui != None: + if hasattr(self.app.ui, "main_display"): + self.app.ui.main_display.sub_displays.network_display.directory_change_callback() + + def node_announce_received(self, source_hash, app_data, associated_peer): + with self.announce_lock: + if app_data != None: + if self.app.compact_stream: + try: + remove_announces = [] + for announce in self.announce_stream: + if announce[1] == source_hash: + remove_announces.append(announce) + + for a in remove_announces: + self.announce_stream.remove(a) + + except Exception as e: + RNS.log("An error occurred while compacting the announce stream. The contained exception was:"+str(e), RNS.LOG_ERROR) + + timestamp = time.time() + self.announce_stream.insert(0, (timestamp, source_hash, app_data, "node")) + while len(self.announce_stream) > Directory.ANNOUNCE_STREAM_MAXLENGTH: + self.announce_stream.pop() + + if self.trust_level(associated_peer) == DirectoryEntry.TRUSTED: + existing_entry = self.find(source_hash) + if not existing_entry: + node_entry = DirectoryEntry(source_hash, display_name=app_data.decode("utf-8"), trust_level=DirectoryEntry.TRUSTED, hosts_node=True) + self.remember(node_entry) - except Exception as e: - RNS.log("An error occurred while compacting the announce stream. The contained exception was:"+str(e), RNS.LOG_ERROR) - - timestamp = time.time() - self.announce_stream.insert(0, (timestamp, source_hash, app_data, "peer")) - while len(self.announce_stream) > Directory.ANNOUNCE_STREAM_MAXLENGTH: - self.announce_stream.pop() - - if hasattr(self.app, "ui") and self.app.ui != None: if hasattr(self.app.ui, "main_display"): self.app.ui.main_display.sub_displays.network_display.directory_change_callback() - def node_announce_received(self, source_hash, app_data, associated_peer): - if app_data != None: - if self.app.compact_stream: - try: - remove_announces = [] - for announce in self.announce_stream: - if announce[1] == source_hash: - remove_announces.append(announce) - - for a in remove_announces: - self.announce_stream.remove(a) - - except Exception as e: - RNS.log("An error occurred while compacting the announce stream. The contained exception was:"+str(e), RNS.LOG_ERROR) - - timestamp = time.time() - self.announce_stream.insert(0, (timestamp, source_hash, app_data, "node")) - while len(self.announce_stream) > Directory.ANNOUNCE_STREAM_MAXLENGTH: - self.announce_stream.pop() - - if self.trust_level(associated_peer) == DirectoryEntry.TRUSTED: - existing_entry = self.find(source_hash) - if not existing_entry: - node_entry = DirectoryEntry(source_hash, display_name=app_data.decode("utf-8"), trust_level=DirectoryEntry.TRUSTED, hosts_node=True) - self.remember(node_entry) - - if hasattr(self.app.ui, "main_display"): - self.app.ui.main_display.sub_displays.network_display.directory_change_callback() - def pn_announce_received(self, source_hash, app_data, associated_peer, associated_node): - found_node = None - for sh in self.directory_entries: - if sh == associated_node: - found_node = True - break + with self.announce_lock: + found_node = None + for sh in self.directory_entries: + if sh == associated_node: + found_node = True + break - for e in self.announce_stream: - if e[1] == associated_node: - found_node = True - break + for e in self.announce_stream: + if e[1] == associated_node: + found_node = True + break - if not found_node: - if self.app.compact_stream: - try: - remove_announces = [] - for announce in self.announce_stream: - if announce[1] == source_hash: - remove_announces.append(announce) + if not found_node: + if self.app.compact_stream: + try: + remove_announces = [] + for announce in self.announce_stream: + if announce[1] == source_hash: + remove_announces.append(announce) - for a in remove_announces: - self.announce_stream.remove(a) + for a in remove_announces: + self.announce_stream.remove(a) + + except Exception as e: + RNS.log("An error occurred while compacting the announce stream. The contained exception was:"+str(e), RNS.LOG_ERROR) + + timestamp = time.time() + self.announce_stream.insert(0, (timestamp, source_hash, app_data, "pn")) + while len(self.announce_stream) > Directory.ANNOUNCE_STREAM_MAXLENGTH: + self.announce_stream.pop() - except Exception as e: - RNS.log("An error occurred while compacting the announce stream. The contained exception was:"+str(e), RNS.LOG_ERROR) - - timestamp = time.time() - self.announce_stream.insert(0, (timestamp, source_hash, app_data, "pn")) - while len(self.announce_stream) > Directory.ANNOUNCE_STREAM_MAXLENGTH: - self.announce_stream.pop() - - if hasattr(self.app.ui, "main_display"): - self.app.ui.main_display.sub_displays.network_display.directory_change_callback() + if hasattr(self.app.ui, "main_display"): + self.app.ui.main_display.sub_displays.network_display.directory_change_callback() def remove_announce_with_timestamp(self, timestamp): selected_announce = None