mirror of
https://github.com/markqvist/LXMF.git
synced 2025-02-24 17:09:52 -05:00
Optimise structure overhead
This commit is contained in:
parent
7bd3cf986d
commit
356cb6412f
@ -63,12 +63,13 @@ class LXMPeer:
|
|||||||
|
|
||||||
for transient_id in dictionary["handled_ids"]:
|
for transient_id in dictionary["handled_ids"]:
|
||||||
if transient_id in router.propagation_entries:
|
if transient_id in router.propagation_entries:
|
||||||
peer.handled_messages[transient_id] = router.propagation_entries[transient_id]
|
peer.handled_messages.append(transient_id)
|
||||||
|
|
||||||
for transient_id in dictionary["unhandled_ids"]:
|
for transient_id in dictionary["unhandled_ids"]:
|
||||||
if transient_id in router.propagation_entries:
|
if transient_id in router.propagation_entries:
|
||||||
peer.unhandled_messages[transient_id] = router.propagation_entries[transient_id]
|
peer.unhandled_messages.append(transient_id)
|
||||||
|
|
||||||
|
del dictionary
|
||||||
return peer
|
return peer
|
||||||
|
|
||||||
def to_bytes(self):
|
def to_bytes(self):
|
||||||
@ -108,8 +109,8 @@ class LXMPeer:
|
|||||||
self.link = None
|
self.link = None
|
||||||
self.state = LXMPeer.IDLE
|
self.state = LXMPeer.IDLE
|
||||||
|
|
||||||
self.unhandled_messages = {}
|
self.unhandled_messages = []
|
||||||
self.handled_messages = {}
|
self.handled_messages = []
|
||||||
self.last_offer = []
|
self.last_offer = []
|
||||||
|
|
||||||
self.router = router
|
self.router = router
|
||||||
@ -118,6 +119,7 @@ class LXMPeer:
|
|||||||
if self.identity != None:
|
if self.identity != None:
|
||||||
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||||
else:
|
else:
|
||||||
|
self.destination = None
|
||||||
RNS.log(f"Could not recall identity for LXMF propagation peer {RNS.prettyhexrep(self.destination_hash)}, will retry identity resolution on next sync", RNS.LOG_WARNING)
|
RNS.log(f"Could not recall identity for LXMF propagation peer {RNS.prettyhexrep(self.destination_hash)}, will retry identity resolution on next sync", RNS.LOG_WARNING)
|
||||||
|
|
||||||
def sync(self):
|
def sync(self):
|
||||||
@ -171,7 +173,7 @@ class LXMPeer:
|
|||||||
|
|
||||||
for transient_id in purged_ids:
|
for transient_id in purged_ids:
|
||||||
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
|
RNS.log("Dropping unhandled message "+RNS.prettyhexrep(transient_id)+" for peer "+RNS.prettyhexrep(self.destination_hash)+" since it no longer exists in the message store.", RNS.LOG_DEBUG)
|
||||||
self.unhandled_messages.pop(transient_id)
|
self.unhandled_messages.remove(transient_id)
|
||||||
|
|
||||||
unhandled_entries.sort(key=lambda e: e[1], reverse=False)
|
unhandled_entries.sort(key=lambda e: e[1], reverse=False)
|
||||||
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
|
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
|
||||||
@ -189,7 +191,7 @@ class LXMPeer:
|
|||||||
|
|
||||||
RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
|
RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
|
||||||
self.last_offer = unhandled_ids
|
self.last_offer = unhandled_ids
|
||||||
self.link.request(LXMPeer.OFFER_REQUEST_PATH, self.last_offer, response_callback=self.offer_response, failed_callback=self.request_failed)
|
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
|
||||||
self.state = LXMPeer.REQUEST_SENT
|
self.state = LXMPeer.REQUEST_SENT
|
||||||
|
|
||||||
else:
|
else:
|
||||||
@ -226,13 +228,14 @@ class LXMPeer:
|
|||||||
# Peer already has all advertised messages
|
# Peer already has all advertised messages
|
||||||
for transient_id in self.last_offer:
|
for transient_id in self.last_offer:
|
||||||
if transient_id in self.unhandled_messages:
|
if transient_id in self.unhandled_messages:
|
||||||
self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id)
|
self.handled_messages.append(transient_id)
|
||||||
|
self.unhandled_messages.remove(transient_id)
|
||||||
|
|
||||||
|
|
||||||
elif response == True:
|
elif response == True:
|
||||||
# Peer wants all advertised messages
|
# Peer wants all advertised messages
|
||||||
for transient_id in self.last_offer:
|
for transient_id in self.last_offer:
|
||||||
wanted_messages.append(self.unhandled_messages[transient_id])
|
wanted_messages.append(self.router.propagation_entries[transient_id])
|
||||||
wanted_message_ids.append(transient_id)
|
wanted_message_ids.append(transient_id)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
@ -242,10 +245,11 @@ class LXMPeer:
|
|||||||
# already received it from another peer.
|
# already received it from another peer.
|
||||||
if not transient_id in response:
|
if not transient_id in response:
|
||||||
if transient_id in self.unhandled_messages:
|
if transient_id in self.unhandled_messages:
|
||||||
self.handled_messages[transient_id] = self.unhandled_messages.pop(transient_id)
|
self.handled_messages.append(transient_id)
|
||||||
|
self.unhandled_messages.remove(transient_id)
|
||||||
|
|
||||||
for transient_id in response:
|
for transient_id in response:
|
||||||
wanted_messages.append(self.unhandled_messages[transient_id])
|
wanted_messages.append(self.router.propagation_entries[transient_id])
|
||||||
wanted_message_ids.append(transient_id)
|
wanted_message_ids.append(transient_id)
|
||||||
|
|
||||||
if len(wanted_messages) > 0:
|
if len(wanted_messages) > 0:
|
||||||
@ -288,8 +292,8 @@ class LXMPeer:
|
|||||||
def resource_concluded(self, resource):
|
def resource_concluded(self, resource):
|
||||||
if resource.status == RNS.Resource.COMPLETE:
|
if resource.status == RNS.Resource.COMPLETE:
|
||||||
for transient_id in resource.transferred_messages:
|
for transient_id in resource.transferred_messages:
|
||||||
message = self.unhandled_messages.pop(transient_id)
|
self.handled_messages.append(transient_id)
|
||||||
self.handled_messages[transient_id] = message
|
self.unhandled_messages.remove(transient_id)
|
||||||
|
|
||||||
if self.link != None:
|
if self.link != None:
|
||||||
self.link.teardown()
|
self.link.teardown()
|
||||||
@ -330,7 +334,7 @@ class LXMPeer:
|
|||||||
|
|
||||||
def handle_message(self, transient_id):
|
def handle_message(self, transient_id):
|
||||||
if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages:
|
if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages:
|
||||||
self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id]
|
self.unhandled_messages.append(transient_id)
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
if self.destination_hash:
|
if self.destination_hash:
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
|
import math
|
||||||
import random
|
import random
|
||||||
import base64
|
import base64
|
||||||
import atexit
|
import atexit
|
||||||
@ -427,6 +428,8 @@ class LXMRouter:
|
|||||||
os.makedirs(self.messagepath)
|
os.makedirs(self.messagepath)
|
||||||
|
|
||||||
self.propagation_entries = {}
|
self.propagation_entries = {}
|
||||||
|
|
||||||
|
st = time.time(); RNS.log("Indexing messagestore...", RNS.LOG_NOTICE)
|
||||||
for filename in os.listdir(self.messagepath):
|
for filename in os.listdir(self.messagepath):
|
||||||
components = filename.split("_")
|
components = filename.split("_")
|
||||||
if len(components) == 2:
|
if len(components) == 2:
|
||||||
@ -452,9 +455,13 @@ class LXMRouter:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||||
|
|
||||||
|
et = time.time(); RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {math.floor(len(self.propagation_entries)/(et-st))} msgs/s", RNS.LOG_NOTICE)
|
||||||
|
st = time.time(); RNS.log("Loading propagation node peers...", RNS.LOG_NOTICE)
|
||||||
|
|
||||||
if os.path.isfile(self.storagepath+"/peers"):
|
if os.path.isfile(self.storagepath+"/peers"):
|
||||||
peers_file = open(self.storagepath+"/peers", "rb")
|
peers_file = open(self.storagepath+"/peers", "rb")
|
||||||
peers_data = peers_file.read()
|
peers_data = peers_file.read()
|
||||||
|
peers_file.close()
|
||||||
|
|
||||||
if len(peers_data) > 0:
|
if len(peers_data) > 0:
|
||||||
serialised_peers = msgpack.unpackb(peers_data)
|
serialised_peers = msgpack.unpackb(peers_data)
|
||||||
@ -468,8 +475,13 @@ class LXMRouter:
|
|||||||
lim_str = ", "+RNS.prettysize(peer.propagation_transfer_limit*1000)+" transfer limit"
|
lim_str = ", "+RNS.prettysize(peer.propagation_transfer_limit*1000)+" transfer limit"
|
||||||
RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages"+lim_str, RNS.LOG_DEBUG)
|
RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages"+lim_str, RNS.LOG_DEBUG)
|
||||||
else:
|
else:
|
||||||
|
del peer
|
||||||
RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG)
|
RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG)
|
||||||
|
|
||||||
|
del serialised_peers
|
||||||
|
del peers_data
|
||||||
|
|
||||||
|
RNS.log(f"Loaded {len(self.peers)} peers in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE)
|
||||||
|
|
||||||
self.propagation_node = True
|
self.propagation_node = True
|
||||||
self.propagation_destination.set_link_established_callback(self.propagation_link_established)
|
self.propagation_destination.set_link_established_callback(self.propagation_link_established)
|
||||||
@ -1676,7 +1688,7 @@ class LXMRouter:
|
|||||||
if remote_hash != None and remote_hash in self.peers:
|
if remote_hash != None and remote_hash in self.peers:
|
||||||
transient_id = RNS.Identity.full_hash(lxmf_data)
|
transient_id = RNS.Identity.full_hash(lxmf_data)
|
||||||
peer = self.peers[remote_hash]
|
peer = self.peers[remote_hash]
|
||||||
peer.handled_messages[transient_id] = [transient_id, remote_timebase, lxmf_data]
|
peer.handled_messages.append(transient_id)
|
||||||
|
|
||||||
self.lxmf_propagation(lxmf_data)
|
self.lxmf_propagation(lxmf_data)
|
||||||
else:
|
else:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user