mirror of
https://github.com/markqvist/NomadNet.git
synced 2025-04-06 14:03:50 -04:00
Use lock on announce stream updates
This commit is contained in:
parent
9ef34fc774
commit
eeb15dcb43
@ -3,6 +3,7 @@ import RNS
|
||||
import LXMF
|
||||
import time
|
||||
import nomadnet
|
||||
import threading
|
||||
import RNS.vendor.umsgpack as msgpack
|
||||
|
||||
class PNAnnounceHandler:
|
||||
@ -55,6 +56,7 @@ class Directory:
|
||||
self.directory_entries = {}
|
||||
self.announce_stream = []
|
||||
self.app = app
|
||||
self.announce_lock = threading.Lock()
|
||||
self.load_from_disk()
|
||||
|
||||
self.pn_announce_handler = PNAnnounceHandler(self)
|
||||
@ -124,91 +126,94 @@ class Directory:
|
||||
RNS.log("Could not load directory from disk. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
def lxmf_announce_received(self, source_hash, app_data):
|
||||
if app_data != None:
|
||||
if self.app.compact_stream:
|
||||
try:
|
||||
remove_announces = []
|
||||
for announce in self.announce_stream:
|
||||
if announce[1] == source_hash:
|
||||
remove_announces.append(announce)
|
||||
with self.announce_lock:
|
||||
if app_data != None:
|
||||
if self.app.compact_stream:
|
||||
try:
|
||||
remove_announces = []
|
||||
for announce in self.announce_stream:
|
||||
if announce[1] == source_hash:
|
||||
remove_announces.append(announce)
|
||||
|
||||
for a in remove_announces:
|
||||
self.announce_stream.remove(a)
|
||||
for a in remove_announces:
|
||||
self.announce_stream.remove(a)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("An error occurred while compacting the announce stream. The contained exception was:"+str(e), RNS.LOG_ERROR)
|
||||
|
||||
timestamp = time.time()
|
||||
self.announce_stream.insert(0, (timestamp, source_hash, app_data, "peer"))
|
||||
while len(self.announce_stream) > Directory.ANNOUNCE_STREAM_MAXLENGTH:
|
||||
self.announce_stream.pop()
|
||||
|
||||
if hasattr(self.app, "ui") and self.app.ui != None:
|
||||
if hasattr(self.app.ui, "main_display"):
|
||||
self.app.ui.main_display.sub_displays.network_display.directory_change_callback()
|
||||
|
||||
def node_announce_received(self, source_hash, app_data, associated_peer):
|
||||
with self.announce_lock:
|
||||
if app_data != None:
|
||||
if self.app.compact_stream:
|
||||
try:
|
||||
remove_announces = []
|
||||
for announce in self.announce_stream:
|
||||
if announce[1] == source_hash:
|
||||
remove_announces.append(announce)
|
||||
|
||||
for a in remove_announces:
|
||||
self.announce_stream.remove(a)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("An error occurred while compacting the announce stream. The contained exception was:"+str(e), RNS.LOG_ERROR)
|
||||
|
||||
timestamp = time.time()
|
||||
self.announce_stream.insert(0, (timestamp, source_hash, app_data, "node"))
|
||||
while len(self.announce_stream) > Directory.ANNOUNCE_STREAM_MAXLENGTH:
|
||||
self.announce_stream.pop()
|
||||
|
||||
if self.trust_level(associated_peer) == DirectoryEntry.TRUSTED:
|
||||
existing_entry = self.find(source_hash)
|
||||
if not existing_entry:
|
||||
node_entry = DirectoryEntry(source_hash, display_name=app_data.decode("utf-8"), trust_level=DirectoryEntry.TRUSTED, hosts_node=True)
|
||||
self.remember(node_entry)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("An error occurred while compacting the announce stream. The contained exception was:"+str(e), RNS.LOG_ERROR)
|
||||
|
||||
timestamp = time.time()
|
||||
self.announce_stream.insert(0, (timestamp, source_hash, app_data, "peer"))
|
||||
while len(self.announce_stream) > Directory.ANNOUNCE_STREAM_MAXLENGTH:
|
||||
self.announce_stream.pop()
|
||||
|
||||
if hasattr(self.app, "ui") and self.app.ui != None:
|
||||
if hasattr(self.app.ui, "main_display"):
|
||||
self.app.ui.main_display.sub_displays.network_display.directory_change_callback()
|
||||
|
||||
def node_announce_received(self, source_hash, app_data, associated_peer):
|
||||
if app_data != None:
|
||||
if self.app.compact_stream:
|
||||
try:
|
||||
remove_announces = []
|
||||
for announce in self.announce_stream:
|
||||
if announce[1] == source_hash:
|
||||
remove_announces.append(announce)
|
||||
|
||||
for a in remove_announces:
|
||||
self.announce_stream.remove(a)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("An error occurred while compacting the announce stream. The contained exception was:"+str(e), RNS.LOG_ERROR)
|
||||
|
||||
timestamp = time.time()
|
||||
self.announce_stream.insert(0, (timestamp, source_hash, app_data, "node"))
|
||||
while len(self.announce_stream) > Directory.ANNOUNCE_STREAM_MAXLENGTH:
|
||||
self.announce_stream.pop()
|
||||
|
||||
if self.trust_level(associated_peer) == DirectoryEntry.TRUSTED:
|
||||
existing_entry = self.find(source_hash)
|
||||
if not existing_entry:
|
||||
node_entry = DirectoryEntry(source_hash, display_name=app_data.decode("utf-8"), trust_level=DirectoryEntry.TRUSTED, hosts_node=True)
|
||||
self.remember(node_entry)
|
||||
|
||||
if hasattr(self.app.ui, "main_display"):
|
||||
self.app.ui.main_display.sub_displays.network_display.directory_change_callback()
|
||||
|
||||
def pn_announce_received(self, source_hash, app_data, associated_peer, associated_node):
|
||||
found_node = None
|
||||
for sh in self.directory_entries:
|
||||
if sh == associated_node:
|
||||
found_node = True
|
||||
break
|
||||
with self.announce_lock:
|
||||
found_node = None
|
||||
for sh in self.directory_entries:
|
||||
if sh == associated_node:
|
||||
found_node = True
|
||||
break
|
||||
|
||||
for e in self.announce_stream:
|
||||
if e[1] == associated_node:
|
||||
found_node = True
|
||||
break
|
||||
for e in self.announce_stream:
|
||||
if e[1] == associated_node:
|
||||
found_node = True
|
||||
break
|
||||
|
||||
if not found_node:
|
||||
if self.app.compact_stream:
|
||||
try:
|
||||
remove_announces = []
|
||||
for announce in self.announce_stream:
|
||||
if announce[1] == source_hash:
|
||||
remove_announces.append(announce)
|
||||
if not found_node:
|
||||
if self.app.compact_stream:
|
||||
try:
|
||||
remove_announces = []
|
||||
for announce in self.announce_stream:
|
||||
if announce[1] == source_hash:
|
||||
remove_announces.append(announce)
|
||||
|
||||
for a in remove_announces:
|
||||
self.announce_stream.remove(a)
|
||||
for a in remove_announces:
|
||||
self.announce_stream.remove(a)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("An error occurred while compacting the announce stream. The contained exception was:"+str(e), RNS.LOG_ERROR)
|
||||
|
||||
timestamp = time.time()
|
||||
self.announce_stream.insert(0, (timestamp, source_hash, app_data, "pn"))
|
||||
while len(self.announce_stream) > Directory.ANNOUNCE_STREAM_MAXLENGTH:
|
||||
self.announce_stream.pop()
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("An error occurred while compacting the announce stream. The contained exception was:"+str(e), RNS.LOG_ERROR)
|
||||
|
||||
timestamp = time.time()
|
||||
self.announce_stream.insert(0, (timestamp, source_hash, app_data, "pn"))
|
||||
while len(self.announce_stream) > Directory.ANNOUNCE_STREAM_MAXLENGTH:
|
||||
self.announce_stream.pop()
|
||||
|
||||
if hasattr(self.app.ui, "main_display"):
|
||||
self.app.ui.main_display.sub_displays.network_display.directory_change_callback()
|
||||
if hasattr(self.app.ui, "main_display"):
|
||||
self.app.ui.main_display.sub_displays.network_display.directory_change_callback()
|
||||
|
||||
def remove_announce_with_timestamp(self, timestamp):
|
||||
selected_announce = None
|
||||
|
Loading…
x
Reference in New Issue
Block a user