Restructured LXMRouter for legibility

This commit is contained in:
Mark Qvist 2022-06-17 09:15:56 +02:00
parent 039df2bd46
commit ca84f4c8fe

View File

@ -41,6 +41,10 @@ class LXMRouter:
PR_ALL_MESSAGES = 0x00
### Developer-facing API ##############################
#######################################################
def __init__(self, identity = None, storagepath = None, autopeer = AUTOPEER, autopeer_maxdepth = AUTOPEER_MAXDEPTH):
random.seed(os.urandom(10))
@ -110,34 +114,20 @@ class LXMRouter:
job_thread.setDaemon(True)
job_thread.start()
def exit_handler(self):
if self.propagation_node:
try:
serialised_peers = []
for peer_id in self.peers:
peer = self.peers[peer_id]
serialised_peers.append(peer.to_bytes())
def announce(self, destination_hash):
if destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash]
delivery_destination.announce(delivery_destination.display_name.encode("utf-8"))
peers_file = open(self.storagepath+"/peers", "wb")
peers_file.write(msgpack.packb(serialised_peers))
peers_file.close()
RNS.log("Saved "+str(len(serialised_peers))+" peers to storage", RNS.LOG_DEBUG)
except Exception as e:
RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
try:
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
locally_delivered_file = open(self.storagepath+"/local_deliveries", "wb")
locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids))
locally_delivered_file.close()
except Exception as e:
RNS.log("Could not save locally delivered message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def announce_propagation_node(self):
def delayed_announce():
time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY)
data = msgpack.packb([self.propagation_node, int(time.time())])
self.propagation_destination.announce(app_data=data)
da_thread = threading.Thread(target=delayed_announce)
da_thread.setDaemon(True)
da_thread.start()
def register_delivery_identity(self, identity, display_name = None):
delivery_destination = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE, "lxmf", "delivery")
@ -167,20 +157,6 @@ class LXMRouter:
def get_outbound_propagation_node(self):
return self.outbound_propagation_node
def cancel_propagation_node_requests(self):
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
self.outbound_propagation_link = None
self.acknowledge_sync_completion()
def acknowledge_sync_completion(self):
self.propagation_transfer_state = LXMRouter.PR_IDLE
self.propagation_transfer_progress = 0.0
self.propagation_transfer_last_result = None
self.wants_download_on_path_available_from = None
self.wants_download_on_path_available_to = None
def request_messages_from_propagation_node(self, identity, max_messages = PR_ALL_MESSAGES):
if max_messages == None:
max_messages = LXMRouter.PR_ALL_MESSAGES
@ -224,181 +200,13 @@ class LXMRouter:
else:
RNS.log("Cannot request LXMF propagation node sync, no default propagation node configured", RNS.LOG_WARNING)
def request_messages_path_job(self):
job_thread = threading.Thread(target=self.__request_messages_path_job)
job_thread.setDaemon(True)
job_thread.start()
def __request_messages_path_job(self):
while not RNS.Transport.has_path(self.wants_download_on_path_available_from) and time.time() < self.wants_download_on_path_available_timeout:
time.sleep(0.1)
if RNS.Transport.has_path(self.wants_download_on_path_available_from):
self.request_messages_from_propagation_node(self.wants_download_on_path_available_to, self.propagation_transfer_max_messages)
else:
RNS.log("Propagation node path request timed out", RNS.LOG_DEBUG)
self.acknowledge_sync_completion()
def has_message(self, transient_id):
if transient_id in self.locally_delivered_transient_ids:
return True
else:
return False
def message_get_failed(self, request_receipt):
RNS.log("Message list/get request failed", RNS.LOG_DEBUG)
def cancel_propagation_node_requests(self):
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
self.outbound_propagation_link = None
def message_list_response(self, request_receipt):
if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY:
RNS.log("Propagation node indicated missing identification on list request, tearing down link.", RNS.LOG_DEBUG)
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
else:
if request_receipt.response != None:
haves = []
wants = []
if len(request_receipt.response) > 0:
for transient_id in request_receipt.response:
if self.has_message(transient_id):
haves.append(transient_id)
else:
if self.propagation_transfer_max_messages == LXMRouter.PR_ALL_MESSAGES or len(wants) < self.propagation_transfer_max_messages:
wants.append(transient_id)
self.acknowledge_sync_completion()
request_receipt.link.request(
LXMPeer.MESSAGE_GET_PATH,
[wants, haves],
response_callback=self.message_get_response,
failed_callback=self.message_get_failed,
progress_callback=self.message_get_progress
)
else:
self.propagation_transfer_state = LXMRouter.PR_COMPLETE
self.propagation_transfer_progress = 1.0
self.propagation_transfer_last_result = 0
def message_get_progress(self, request_receipt):
self.propagation_transfer_state = LXMRouter.PR_RECEIVING
self.propagation_transfer_progress = request_receipt.get_progress()
def message_get_response(self, request_receipt):
if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY:
RNS.log("Propagation node indicated missing identification on get request, tearing down link.", RNS.LOG_DEBUG)
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
else:
if request_receipt.response != None and len(request_receipt.response) > 0:
haves = []
for lxmf_data in request_receipt.response:
self.lxmf_propagation(lxmf_data)
haves.append(RNS.Identity.full_hash(lxmf_data))
# Return a list of successfully received messages to the node.
# This deletes the messages on the propagation node.
# TODO: Add option to keep messages on node.
request_receipt.link.request(
LXMPeer.MESSAGE_GET_PATH,
[None, haves],
# response_callback=self.message_syncfinal_response,
failed_callback=self.message_get_failed,
# progress_callback=self.message_get_progress
)
self.propagation_transfer_state = LXMRouter.PR_COMPLETE
self.propagation_transfer_progress = 1.0
self.propagation_transfer_last_result = len(request_receipt.response)
def announce(self, destination_hash):
if destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash]
delivery_destination.announce(delivery_destination.display_name.encode("utf-8"))
def handle_outbound(self, lxmessage):
lxmessage.state = LXMessage.OUTBOUND
if not lxmessage.packed:
lxmessage.pack()
lxmessage.determine_transport_encryption()
while self.processing_outbound:
time.sleep(0.1)
self.pending_outbound.append(lxmessage)
self.process_outbound()
def lxmf_delivery(self, lxmf_data, destination_type = None):
try:
message = LXMessage.unpack_from_bytes(lxmf_data)
if destination_type == RNS.Destination.SINGLE:
message.transport_encrypted = True
message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC
elif destination_type == RNS.Destination.GROUP:
message.transport_encrypted = True
message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_AES
elif destination_type == RNS.Destination.LINK:
message.transport_encrypted = True
message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC
else:
message.transport_encrypted = False
message.transport_encryption = None
if message.source_hash in self.ignored_list:
RNS.log(str(self)+" ignored message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG)
return False
if self.__delivery_callback != None and callable(self.__delivery_callback):
try:
self.__delivery_callback(message)
except Exception as e:
RNS.log("An error occurred in the external delivery callback for "+str(message), RNS.LOG_ERROR)
return True
except Exception as e:
RNS.log("Could not assemble LXMF message from received data", RNS.LOG_NOTICE)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
return False
def delivery_packet(self, data, packet):
try:
if packet.destination_type != RNS.Destination.LINK:
lxmf_data = b""
lxmf_data += packet.destination.hash
lxmf_data += data
else:
lxmf_data = data
if self.lxmf_delivery(lxmf_data, packet.destination_type):
packet.prove()
except Exception as e:
RNS.log("Exception occurred while parsing incoming LXMF data.", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
def delivery_link_established(self, link):
link.set_packet_callback(self.delivery_packet)
link.set_resource_strategy(RNS.Link.ACCEPT_ALL)
link.set_resource_started_callback(self.resource_transfer_began)
link.set_resource_concluded_callback(self.delivery_resource_concluded)
def delivery_link_closed(self, link):
pass
def resource_transfer_began(self, resource):
RNS.log("Transfer began for resource "+str(resource), RNS.LOG_DEBUG)
def delivery_resource_concluded(self, resource):
RNS.log("Transfer concluded for delivery resource "+str(resource), RNS.LOG_DEBUG)
if resource.status == RNS.Resource.COMPLETE:
self.lxmf_delivery(resource.data.read(), resource.link.type)
def enable_propagation(self):
try:
self.messagepath = self.storagepath+"/messagestore"
@ -459,41 +267,160 @@ class LXMRouter:
self.propagation_node = False
self.announce_propagation_node()
def announce_propagation_node(self):
def delayed_announce():
time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY)
data = msgpack.packb([self.propagation_node, int(time.time())])
self.propagation_destination.announce(app_data=data)
def ignore_destination(self, destination_hash):
if not destination_hash in self.ignored_list:
self.ignored_list.append(destination_hash)
da_thread = threading.Thread(target=delayed_announce)
da_thread.setDaemon(True)
da_thread.start()
def unignore_destination(self, destination_hash):
if destination_hash in self.ignored_list:
self.ignored_list.remove(destination_hash)
def offer_request(self, path, data, request_id, remote_identity, requested_at):
if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY
else:
### Utility & Maintenance #############################
#######################################################
def jobloop(self):
while (True):
# TODO: Improve this to scheduling, so manual
# triggers can delay next run
self.jobs()
time.sleep(LXMRouter.PROCESSING_INTERVAL)
JOB_OUTBOUND_INTERVAL = 1
JOB_LINKS_INTERVAL = 1
JOB_TRANSIENT_INTERVAL = 60
JOB_STORE_INTERVAL = 120
JOB_PEERSYNC_INTERVAL = 12
def jobs(self):
self.processing_count += 1
if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0:
self.process_outbound()
if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0:
self.clean_links()
if self.processing_count % LXMRouter.JOB_TRANSIENT_INTERVAL == 0:
self.clean_transient_id_cache()
if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0:
self.clean_message_store()
if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0:
self.sync_peers()
def clean_links(self):
closed_links = []
for link_hash in self.direct_links:
link = self.direct_links[link_hash]
inactive_time = link.inactive_for()
if inactive_time > LXMRouter.LINK_MAX_INACTIVITY:
link.teardown()
closed_links.append(link_hash)
for link_hash in closed_links:
cleaned_link = self.direct_links.pop(link_hash)
RNS.log("Cleaned link "+str(cleaned_link), RNS.LOG_DEBUG)
if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.CLOSED:
self.outbound_propagation_link = None
self.acknowledge_sync_completion()
RNS.log("Cleaned outbound propagation link", RNS.LOG_DEBUG)
def clean_transient_id_cache(self):
now = time.time()
removed_entries = []
for transient_id in self.locally_delivered_transient_ids:
timestamp = self.locally_delivered_transient_ids[transient_id]
if now > timestamp+LXMRouter.MESSAGE_EXPIRY*1.1:
removed_entries.append(transient_id)
for transient_id in removed_entries:
self.locally_delivered_transient_ids.pop(transient_id)
RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from local delivery cache", RNS.LOG_DEBUG)
def clean_message_store(self):
now = time.time()
removed_entries = {}
for transient_id in self.propagation_entries:
entry = self.propagation_entries[transient_id]
filepath = entry[1]
components = filepath.split("_")
if len(components) == 2 and float(components[1]) > 0 and len(os.path.split(components[0])[1]) == (RNS.Identity.HASHLENGTH//8)*2:
timestamp = float(components[1])
if now > timestamp+LXMRouter.MESSAGE_EXPIRY:
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_DEBUG)
removed_entries[transient_id] = filepath
else:
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING)
removed_entries[transient_id] = filepath
removed_count = 0
for transient_id in removed_entries:
try:
transient_ids = data
wanted_ids = []
filepath = removed_entries[transient_id]
self.propagation_entries.pop(transient_id)
if os.path.isfile(filepath):
os.unlink(filepath)
removed_count += 1
except Exception as e:
RNS.log("Could not remove "+RNS.prettyhexrep(transient_id)+" from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
for transient_id in transient_ids:
if not transient_id in self.propagation_entries:
wanted_ids.append(transient_id)
if removed_count > 0:
RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_DEBUG)
if len(wanted_ids) == 0:
return False
def exit_handler(self):
if self.propagation_node:
try:
serialised_peers = []
for peer_id in self.peers:
peer = self.peers[peer_id]
serialised_peers.append(peer.to_bytes())
elif len(wanted_ids) == len(transient_ids):
return True
peers_file = open(self.storagepath+"/peers", "wb")
peers_file.write(msgpack.packb(serialised_peers))
peers_file.close()
else:
return wanted_ids
RNS.log("Saved "+str(len(serialised_peers))+" peers to storage", RNS.LOG_DEBUG)
except Exception as e:
RNS.log("Error occurred while generating response for sync request, the contained exception was: "+str(e), RNS.LOG_DEBUG)
return None
RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
try:
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
locally_delivered_file = open(self.storagepath+"/local_deliveries", "wb")
locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids))
locally_delivered_file.close()
except Exception as e:
RNS.log("Could not save locally delivered message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def __str__(self):
return "<LXMRouter "+RNS.hexrep(self.identity.hash, delimit=False)+">"
### Message Download ##################################
#######################################################
def request_messages_path_job(self):
job_thread = threading.Thread(target=self.__request_messages_path_job)
job_thread.setDaemon(True)
job_thread.start()
def __request_messages_path_job(self):
while not RNS.Transport.has_path(self.wants_download_on_path_available_from) and time.time() < self.wants_download_on_path_available_timeout:
time.sleep(0.1)
if RNS.Transport.has_path(self.wants_download_on_path_available_from):
self.request_messages_from_propagation_node(self.wants_download_on_path_available_to, self.propagation_transfer_max_messages)
else:
RNS.log("Propagation node path request timed out", RNS.LOG_DEBUG)
self.acknowledge_sync_completion()
def message_get_request(self, path, data, request_id, remote_identity, requested_at):
if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY
@ -559,13 +486,228 @@ class LXMRouter:
RNS.log("Error occurred while generating response for download request, the contained exception was: "+str(e), RNS.LOG_DEBUG)
return None
def message_list_response(self, request_receipt):
if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY:
RNS.log("Propagation node indicated missing identification on list request, tearing down link.", RNS.LOG_DEBUG)
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
else:
if request_receipt.response != None:
haves = []
wants = []
if len(request_receipt.response) > 0:
for transient_id in request_receipt.response:
if self.has_message(transient_id):
haves.append(transient_id)
else:
if self.propagation_transfer_max_messages == LXMRouter.PR_ALL_MESSAGES or len(wants) < self.propagation_transfer_max_messages:
wants.append(transient_id)
request_receipt.link.request(
LXMPeer.MESSAGE_GET_PATH,
[wants, haves],
response_callback=self.message_get_response,
failed_callback=self.message_get_failed,
progress_callback=self.message_get_progress
)
else:
self.propagation_transfer_state = LXMRouter.PR_COMPLETE
self.propagation_transfer_progress = 1.0
self.propagation_transfer_last_result = 0
def message_get_response(self, request_receipt):
if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY:
RNS.log("Propagation node indicated missing identification on get request, tearing down link.", RNS.LOG_DEBUG)
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
else:
if request_receipt.response != None and len(request_receipt.response) > 0:
haves = []
for lxmf_data in request_receipt.response:
self.lxmf_propagation(lxmf_data)
haves.append(RNS.Identity.full_hash(lxmf_data))
# Return a list of successfully received messages to the node.
# This deletes the messages on the propagation node.
# TODO: Add option to keep messages on node.
request_receipt.link.request(
LXMPeer.MESSAGE_GET_PATH,
[None, haves],
# response_callback=self.message_syncfinal_response,
failed_callback=self.message_get_failed,
# progress_callback=self.message_get_progress
)
self.propagation_transfer_state = LXMRouter.PR_COMPLETE
self.propagation_transfer_progress = 1.0
self.propagation_transfer_last_result = len(request_receipt.response)
def message_get_progress(self, request_receipt):
self.propagation_transfer_state = LXMRouter.PR_RECEIVING
self.propagation_transfer_progress = request_receipt.get_progress()
def message_get_failed(self, request_receipt):
RNS.log("Message list/get request failed", RNS.LOG_DEBUG)
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
def acknowledge_sync_completion(self):
self.propagation_transfer_state = LXMRouter.PR_IDLE
self.propagation_transfer_progress = 0.0
self.propagation_transfer_last_result = None
self.wants_download_on_path_available_from = None
self.wants_download_on_path_available_to = None
def has_message(self, transient_id):
if transient_id in self.locally_delivered_transient_ids:
return True
else:
return False
### Message Routing & Delivery ########################
#######################################################
def handle_outbound(self, lxmessage):
lxmessage.state = LXMessage.OUTBOUND
if not lxmessage.packed:
lxmessage.pack()
lxmessage.determine_transport_encryption()
while self.processing_outbound:
time.sleep(0.1)
self.pending_outbound.append(lxmessage)
self.process_outbound()
def lxmf_delivery(self, lxmf_data, destination_type = None):
try:
message = LXMessage.unpack_from_bytes(lxmf_data)
if destination_type == RNS.Destination.SINGLE:
message.transport_encrypted = True
message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC
elif destination_type == RNS.Destination.GROUP:
message.transport_encrypted = True
message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_AES
elif destination_type == RNS.Destination.LINK:
message.transport_encrypted = True
message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC
else:
message.transport_encrypted = False
message.transport_encryption = None
if message.source_hash in self.ignored_list:
RNS.log(str(self)+" ignored message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG)
return False
if self.__delivery_callback != None and callable(self.__delivery_callback):
try:
self.__delivery_callback(message)
except Exception as e:
RNS.log("An error occurred in the external delivery callback for "+str(message), RNS.LOG_ERROR)
return True
except Exception as e:
RNS.log("Could not assemble LXMF message from received data", RNS.LOG_NOTICE)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
return False
def delivery_packet(self, data, packet):
try:
if packet.destination_type != RNS.Destination.LINK:
lxmf_data = b""
lxmf_data += packet.destination.hash
lxmf_data += data
else:
lxmf_data = data
if self.lxmf_delivery(lxmf_data, packet.destination_type):
packet.prove()
except Exception as e:
RNS.log("Exception occurred while parsing incoming LXMF data.", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
def delivery_link_established(self, link):
link.set_packet_callback(self.delivery_packet)
link.set_resource_strategy(RNS.Link.ACCEPT_ALL)
link.set_resource_started_callback(self.resource_transfer_began)
link.set_resource_concluded_callback(self.delivery_resource_concluded)
def delivery_link_closed(self, link):
pass
def resource_transfer_began(self, resource):
RNS.log("Transfer began for LXMF delivery resource "+str(resource), RNS.LOG_DEBUG)
def delivery_resource_concluded(self, resource):
RNS.log("Transfer concluded for LXMF delivery resource "+str(resource), RNS.LOG_DEBUG)
if resource.status == RNS.Resource.COMPLETE:
self.lxmf_delivery(resource.data.read(), resource.link.type)
### Peer Sync & Propagation ###########################
#######################################################
def peer(self, destination_hash, timestamp):
if destination_hash in self.peers:
peer = self.peers[destination_hash]
peer.alive = True
peer.peering_timebase = timestamp
peer.last_heard = time.time()
else:
peer = LXMPeer(self, destination_hash)
peer.alive = True
peer.last_heard = time.time()
self.peers[destination_hash] = peer
RNS.log("Peered with "+str(peer.destination))
def unpeer(self, destination_hash, timestamp = None):
if timestamp == None:
timestamp = int(time.time())
if destination_hash in self.peers:
peer = self.peers[destination_hash]
if timestamp >= peer.peering_timebase:
self.peers.pop(destination_hash)
RNS.log("Broke peering with "+str(peer.destination))
def sync_peers(self):
culled_peers = []
waiting_peers = []
for peer_id in self.peers:
peer = self.peers[peer_id]
if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE:
culled_peers.append(peer_id)
else:
if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0:
waiting_peers.append(peer)
if len(waiting_peers) > 0:
RNS.log("Randomly selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_DEBUG)
selected_index = random.randint(0,len(waiting_peers)-1)
selected_peer = waiting_peers[selected_index]
RNS.log("Selected waiting peer "+str(selected_index)+": "+RNS.prettyhexrep(selected_peer.destination.hash), RNS.LOG_DEBUG)
selected_peer.sync()
for peer_id in culled_peers:
RNS.log("Removing peer "+RNS.prettyhexrep(peer_id)+" due to excessive unreachability", RNS.LOG_WARNING)
try:
if peer_id in self.peers:
self.peers.pop(peer_id)
except Exception as e:
RNS.log("Error while removing peer "+RNS.prettyhexrep(peer_id)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
def propagation_link_established(self, link):
link.set_packet_callback(self.propagation_packet)
link.set_resource_strategy(RNS.Link.ACCEPT_ALL)
link.set_resource_started_callback(self.resource_transfer_began)
link.set_resource_concluded_callback(self.propagation_resource_concluded)
def propagation_packet(self, data, packet):
try:
if packet.destination_type != RNS.Destination.LINK:
@ -584,6 +726,30 @@ class LXMRouter:
RNS.log("Exception occurred while parsing incoming LXMF propagation data.", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
def offer_request(self, path, data, request_id, remote_identity, requested_at):
if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY
else:
try:
transient_ids = data
wanted_ids = []
for transient_id in transient_ids:
if not transient_id in self.propagation_entries:
wanted_ids.append(transient_id)
if len(wanted_ids) == 0:
return False
elif len(wanted_ids) == len(transient_ids):
return True
else:
return wanted_ids
except Exception as e:
RNS.log("Error occurred while generating response for sync request, the contained exception was: "+str(e), RNS.LOG_DEBUG)
return None
def propagation_resource_concluded(self, resource):
RNS.log("Transfer concluded for incoming propagation resource "+str(resource), RNS.LOG_DEBUG)
@ -621,7 +787,6 @@ class LXMRouter:
except Exception as e:
RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG)
def lxmf_propagation(self, lxmf_data):
try:
if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD:
@ -662,162 +827,6 @@ class LXMRouter:
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
return False
def peer(self, destination_hash, timestamp):
if destination_hash in self.peers:
peer = self.peers[destination_hash]
peer.alive = True
peer.peering_timebase = timestamp
peer.last_heard = time.time()
else:
peer = LXMPeer(self, destination_hash)
peer.alive = True
peer.last_heard = time.time()
self.peers[destination_hash] = peer
RNS.log("Peered with "+str(peer.destination))
def unpeer(self, destination_hash, timestamp = None):
if timestamp == None:
timestamp = int(time.time())
if destination_hash in self.peers:
peer = self.peers[destination_hash]
if timestamp >= peer.peering_timebase:
self.peers.pop(destination_hash)
RNS.log("Broke peering with "+str(peer.destination))
def jobloop(self):
while (True):
# TODO: Improve this to scheduling, so manual
# triggers can delay next run
self.jobs()
time.sleep(LXMRouter.PROCESSING_INTERVAL)
JOB_OUTBOUND_INTERVAL = 1
JOB_LINKS_INTERVAL = 1
JOB_TRANSIENT_INTERVAL = 60
JOB_STORE_INTERVAL = 120
JOB_PEERSYNC_INTERVAL = 12
def jobs(self):
self.processing_count += 1
if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0:
self.process_outbound()
if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0:
self.clean_links()
if self.processing_count % LXMRouter.JOB_TRANSIENT_INTERVAL == 0:
self.clean_transient_id_cache()
if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0:
self.clean_message_store()
if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0:
self.sync_peers()
def clean_links(self):
closed_links = []
for link_hash in self.direct_links:
link = self.direct_links[link_hash]
inactive_time = link.inactive_for()
if inactive_time > LXMRouter.LINK_MAX_INACTIVITY:
link.teardown()
closed_links.append(link_hash)
for link_hash in closed_links:
cleaned_link = self.direct_links.pop(link_hash)
RNS.log("Cleaned link "+str(cleaned_link), RNS.LOG_DEBUG)
if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.CLOSED:
self.outbound_propagation_link = None
self.acknowledge_sync_completion()
RNS.log("Cleaned outbound propagation link", RNS.LOG_DEBUG)
def clean_transient_id_cache(self):
now = time.time()
removed_entries = []
for transient_id in self.locally_delivered_transient_ids:
timestamp = self.locally_delivered_transient_ids[transient_id]
if now > timestamp+LXMRouter.MESSAGE_EXPIRY*1.1:
removed_entries.append(transient_id)
for transient_id in removed_entries:
self.locally_delivered_transient_ids.pop(transient_id)
RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from local delivery cache", RNS.LOG_DEBUG)
def clean_message_store(self):
now = time.time()
removed_entries = {}
for transient_id in self.propagation_entries:
entry = self.propagation_entries[transient_id]
filepath = entry[1]
components = filepath.split("_")
if len(components) == 2 and float(components[1]) > 0 and len(os.path.split(components[0])[1]) == (RNS.Identity.HASHLENGTH//8)*2:
timestamp = float(components[1])
if now > timestamp+LXMRouter.MESSAGE_EXPIRY:
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_DEBUG)
removed_entries[transient_id] = filepath
else:
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING)
removed_entries[transient_id] = filepath
removed_count = 0
for transient_id in removed_entries:
try:
filepath = removed_entries[transient_id]
self.propagation_entries.pop(transient_id)
if os.path.isfile(filepath):
os.unlink(filepath)
removed_count += 1
except Exception as e:
RNS.log("Could not remove "+RNS.prettyhexrep(transient_id)+" from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
if removed_count > 0:
RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_DEBUG)
def sync_peers(self):
culled_peers = []
waiting_peers = []
for peer_id in self.peers:
peer = self.peers[peer_id]
if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE:
culled_peers.append(peer_id)
else:
if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0:
waiting_peers.append(peer)
if len(waiting_peers) > 0:
RNS.log("Randomly selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_DEBUG)
selected_index = random.randint(0,len(waiting_peers)-1)
selected_peer = waiting_peers[selected_index]
RNS.log("Selected waiting peer "+str(selected_index)+": "+RNS.prettyhexrep(selected_peer.destination.hash), RNS.LOG_DEBUG)
selected_peer.sync()
for peer_id in culled_peers:
RNS.log("Removing peer "+RNS.prettyhexrep(peer_id)+" due to excessive unreachability", RNS.LOG_WARNING)
try:
if peer_id in self.peers:
self.peers.pop(peer_id)
except Exception as e:
RNS.log("Error while removing peer "+RNS.prettyhexrep(peer_id)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
def ignore_destination(self, destination_hash):
if not destination_hash in self.ignored_list:
self.ignored_list.append(destination_hash)
def unignore_destination(self, destination_hash):
if destination_hash in self.ignored_list:
self.ignored_list.remove(destination_hash)
def fail_message(self, lxmessage):
RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG)
@ -828,9 +837,6 @@ class LXMRouter:
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
lxmessage.failed_callback(lxmessage)
def __str__(self):
return "<LXMRouter "+RNS.hexrep(self.identity.hash, delimit=False)+">"
def process_outbound(self, sender = None):
if self.processing_outbound:
return