LXMF/LXMF/LXMRouter.py

2482 lines
126 KiB
Python

import os
import sys
import time
import math
import random
import base64
import atexit
import signal
import threading
from collections import deque
import RNS
import RNS.vendor.umsgpack as msgpack
from .LXMF import APP_NAME
from .LXMF import FIELD_TICKET
from .LXMPeer import LXMPeer
from .LXMessage import LXMessage
from .Handlers import LXMFDeliveryAnnounceHandler
from .Handlers import LXMFPropagationAnnounceHandler
import LXMF.LXStamper as LXStamper
class LXMRouter:
MAX_DELIVERY_ATTEMPTS = 5
PROCESSING_INTERVAL = 4
DELIVERY_RETRY_WAIT = 10
PATH_REQUEST_WAIT = 7
MAX_PATHLESS_TRIES = 1
LINK_MAX_INACTIVITY = 10*60
P_LINK_MAX_INACTIVITY = 3*60
MESSAGE_EXPIRY = 30*24*60*60
STAMP_COST_EXPIRY = 45*24*60*60
NODE_ANNOUNCE_DELAY = 20
MAX_PEERS = 50
AUTOPEER = True
AUTOPEER_MAXDEPTH = 4
FASTEST_N_RANDOM_POOL = 2
ROTATION_HEADROOM_PCT = 10
ROTATION_AR_MAX = 0.5
PROPAGATION_COST = 12
PROPAGATION_COST_MIN = 10
PROPAGATION_COST_FLEX = 3
PROPAGATION_LIMIT = 256
SYNC_LIMIT = PROPAGATION_LIMIT*40
DELIVERY_LIMIT = 1000
PR_PATH_TIMEOUT = 10
PR_IDLE = 0x00
PR_PATH_REQUESTED = 0x01
PR_LINK_ESTABLISHING = 0x02
PR_LINK_ESTABLISHED = 0x03
PR_REQUEST_SENT = 0x04
PR_RECEIVING = 0x05
PR_RESPONSE_RECEIVED = 0x06
PR_COMPLETE = 0x07
PR_NO_PATH = 0xf0
PR_LINK_FAILED = 0xf1
PR_TRANSFER_FAILED = 0xf2
PR_NO_IDENTITY_RCVD = 0xf3
PR_NO_ACCESS = 0xf4
PR_FAILED = 0xfe
PR_ALL_MESSAGES = 0x00
STATS_GET_PATH = "/pn/get/stats"
### Developer-facing API ##############################
#######################################################
def __init__(self, identity=None, storagepath=None, autopeer=AUTOPEER, autopeer_maxdepth=None,
propagation_limit=PROPAGATION_LIMIT, delivery_limit=DELIVERY_LIMIT, sync_limit=SYNC_LIMIT,
enforce_ratchets=False, enforce_stamps=False, static_peers = [], max_peers=None,
from_static_only=False, sync_strategy=LXMPeer.STRATEGY_PERSISTENT,
propagation_cost=PROPAGATION_COST, propagation_cost_flexibility=PROPAGATION_COST_FLEX):
random.seed(os.urandom(10))
self.pending_inbound = []
self.pending_outbound = []
self.failed_outbound = []
self.direct_links = {}
self.backchannel_links = {}
self.delivery_destinations = {}
self.prioritised_list = []
self.ignored_list = []
self.allowed_list = []
self.auth_required = False
self.retain_synced_on_node = False
self.default_sync_strategy = sync_strategy
self.processing_outbound = False
self.processing_inbound = False
self.processing_count = 0
self.propagation_node = False
self.propagation_node_start_time = None
if storagepath == None: raise ValueError("LXMF cannot be initialised without a storage path")
else:
self.storagepath = storagepath+"/lxmf"
self.ratchetpath = self.storagepath+"/ratchets"
self.outbound_propagation_node = None
self.outbound_propagation_link = None
if delivery_limit == None: delivery_limit = LXMRouter.DELIVERY_LIMIT
self.message_storage_limit = None
self.information_storage_limit = None
self.propagation_per_transfer_limit = propagation_limit
self.propagation_per_sync_limit = sync_limit
self.delivery_per_transfer_limit = delivery_limit
self.propagation_stamp_cost = propagation_cost
self.propagation_stamp_cost_flexibility = propagation_cost_flexibility
self.enforce_ratchets = enforce_ratchets
self._enforce_stamps = enforce_stamps
self.pending_deferred_stamps = {}
if sync_limit == None or self.propagation_per_sync_limit < self.propagation_per_transfer_limit:
self.propagation_per_sync_limit = self.propagation_per_transfer_limit
self.wants_download_on_path_available_from = None
self.wants_download_on_path_available_to = None
self.propagation_transfer_state = LXMRouter.PR_IDLE
self.propagation_transfer_progress = 0.0
self.propagation_transfer_last_result = None
self.propagation_transfer_max_messages = None
self.prioritise_rotating_unreachable_peers = False
self.active_propagation_links = []
self.locally_delivered_transient_ids = {}
self.locally_processed_transient_ids = {}
self.outbound_stamp_costs = {}
self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}}
self.cost_file_lock = threading.Lock()
self.ticket_file_lock = threading.Lock()
self.stamp_gen_lock = threading.Lock()
self.exit_handler_running = False
if identity == None:
identity = RNS.Identity()
self.identity = identity
self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation")
self.control_destination = None
self.client_propagation_messages_received = 0
self.client_propagation_messages_served = 0
self.unpeered_propagation_incoming = 0
self.unpeered_propagation_rx_bytes = 0
if autopeer != None: self.autopeer = autopeer
else: self.autopeer = LXMRouter.AUTOPEER
if autopeer_maxdepth != None: self.autopeer_maxdepth = autopeer_maxdepth
else: self.autopeer_maxdepth = LXMRouter.AUTOPEER_MAXDEPTH
if max_peers == None: self.max_peers = LXMRouter.MAX_PEERS
else:
if type(max_peers) == int and max_peers >= 0: self.max_peers = max_peers
else: raise ValueError(f"Invalid value for max_peers: {max_peers}")
self.from_static_only = from_static_only
if type(static_peers) != list: raise ValueError(f"Invalid type supplied for static peer list: {type(static_peers)}")
else:
for static_peer in static_peers:
if type(static_peer) != bytes: raise ValueError(f"Invalid static peer destination hash: {static_peer}")
else:
if len(static_peer) != RNS.Reticulum.TRUNCATED_HASHLENGTH//8: raise ValueError(f"Invalid static peer destination hash: {static_peer}")
self.static_peers = static_peers
self.peers = {}
self.propagation_entries = {}
self.peer_distribution_queue = deque()
RNS.Transport.register_announce_handler(LXMFDeliveryAnnounceHandler(self))
RNS.Transport.register_announce_handler(LXMFPropagationAnnounceHandler(self))
self.__delivery_callback = None
try:
if os.path.isfile(self.storagepath+"/local_deliveries"):
locally_delivered_file = open(self.storagepath+"/local_deliveries", "rb")
data = locally_delivered_file.read()
locally_delivered_file.close()
self.locally_delivered_transient_ids = msgpack.unpackb(data)
if not type(self.locally_delivered_transient_ids) == dict:
RNS.log("Invalid data format for loaded locally delivered transient IDs, recreating...", RNS.LOG_ERROR)
self.locally_delivered_transient_ids = {}
except Exception as e:
RNS.log("Could not load locally delivered message ID cache from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
self.locally_delivered_transient_ids = {}
try:
if os.path.isfile(self.storagepath+"/locally_processed"):
locally_processed_file = open(self.storagepath+"/locally_processed", "rb")
data = locally_processed_file.read()
locally_processed_file.close()
self.locally_processed_transient_ids = msgpack.unpackb(data)
if not type(self.locally_processed_transient_ids) == dict:
RNS.log("Invalid data format for loaded locally processed transient IDs, recreating...", RNS.LOG_ERROR)
self.locally_processed_transient_ids = {}
except Exception as e:
RNS.log("Could not load locally processed message ID cache from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
self.locally_processed_transient_ids = {}
try:
self.clean_transient_id_caches()
except Exception as e:
RNS.log("Could not clean transient ID caches. The contained exception was : "+str(e), RNS.LOG_ERROR)
self.locally_delivered_transient_ids = {}
self.locally_processed_transient_ids = {}
try:
if os.path.isfile(self.storagepath+"/outbound_stamp_costs"):
with self.cost_file_lock:
with open(self.storagepath+"/outbound_stamp_costs", "rb") as outbound_stamp_cost_file:
data = outbound_stamp_cost_file.read()
self.outbound_stamp_costs = msgpack.unpackb(data)
if not type(self.outbound_stamp_costs) == dict:
RNS.log("Invalid data format for loaded outbound stamp costs, recreating...", RNS.LOG_ERROR)
self.outbound_stamp_costs = {}
self.clean_outbound_stamp_costs()
self.save_outbound_stamp_costs()
except Exception as e:
RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
try:
if os.path.isfile(self.storagepath+"/available_tickets"):
with self.ticket_file_lock:
with open(self.storagepath+"/available_tickets", "rb") as available_tickets_file:
data = available_tickets_file.read()
self.available_tickets = msgpack.unpackb(data)
if not type(self.available_tickets) == dict:
RNS.log("Invalid data format for loaded available tickets, recreating...", RNS.LOG_ERROR)
self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}}
if not "outbound" in self.available_tickets:
RNS.log("Missing outbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
self.available_tickets["outbound"] = {}
if not "inbound" in self.available_tickets:
RNS.log("Missing inbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
self.available_tickets["inbound"] = {}
if not "last_deliveries" in self.available_tickets:
RNS.log("Missing local_deliveries entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
self.available_tickets["last_deliveries"] = {}
self.clean_available_tickets()
self.save_available_tickets()
except Exception as e:
RNS.log("Could not load outbound stamp costs from storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
atexit.register(self.exit_handler)
signal.signal(signal.SIGINT, self.sigint_handler)
signal.signal(signal.SIGTERM, self.sigterm_handler)
job_thread = threading.Thread(target=self.jobloop)
job_thread.setDaemon(True)
job_thread.start()
def announce(self, destination_hash, attached_interface=None):
if destination_hash in self.delivery_destinations:
self.delivery_destinations[destination_hash].announce(app_data=self.get_announce_app_data(destination_hash), attached_interface=attached_interface)
def announce_propagation_node(self):
def delayed_announce():
time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY)
node_state = self.propagation_node and not self.from_static_only
stamp_cost = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility]
announce_data = [
node_state, # Boolean flag signalling propagation node state
int(time.time()), # Current node timebase
self.propagation_per_transfer_limit, # Per-transfer limit for message propagation in kilobytes
self.propagation_per_sync_limit, # Limit for incoming propagation node syncs
stamp_cost, # Propagation stamp cost for this node
]
data = msgpack.packb(announce_data)
self.propagation_destination.announce(app_data=data)
da_thread = threading.Thread(target=delayed_announce)
da_thread.setDaemon(True)
da_thread.start()
def register_delivery_identity(self, identity, display_name = None, stamp_cost = None):
if len(self.delivery_destinations) != 0:
RNS.log("Currently only one delivery identity is supported per LXMF router instance", RNS.LOG_ERROR)
return None
if not os.path.isdir(self.ratchetpath):
os.makedirs(self.ratchetpath)
delivery_destination = RNS.Destination(identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "delivery")
delivery_destination.enable_ratchets(f"{self.ratchetpath}/{RNS.hexrep(delivery_destination.hash, delimit=False)}.ratchets")
delivery_destination.set_packet_callback(self.delivery_packet)
delivery_destination.set_link_established_callback(self.delivery_link_established)
delivery_destination.display_name = display_name
if self.enforce_ratchets:
delivery_destination.enforce_ratchets()
if display_name != None:
def get_app_data():
return self.get_announce_app_data(delivery_destination.hash)
delivery_destination.set_default_app_data(get_app_data)
self.delivery_destinations[delivery_destination.hash] = delivery_destination
self.set_inbound_stamp_cost(delivery_destination.hash, stamp_cost)
return delivery_destination
def register_delivery_callback(self, callback):
self.__delivery_callback = callback
def set_inbound_stamp_cost(self, destination_hash, stamp_cost):
if destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash]
if stamp_cost == None:
delivery_destination.stamp_cost = None
return True
elif type(stamp_cost) == int:
if stamp_cost < 1:
delivery_destination.stamp_cost = None
elif stamp_cost < 255:
delivery_destination.stamp_cost = stamp_cost
else:
return False
return True
return False
def get_outbound_stamp_cost(self, destination_hash):
if destination_hash in self.outbound_stamp_costs:
stamp_cost = self.outbound_stamp_costs[destination_hash][1]
return stamp_cost
else:
return None
def set_active_propagation_node(self, destination_hash):
self.set_outbound_propagation_node(destination_hash)
# self.set_inbound_propagation_node(destination_hash)
def set_outbound_propagation_node(self, destination_hash):
if len(destination_hash) != RNS.Identity.TRUNCATED_HASHLENGTH//8 or type(destination_hash) != bytes:
raise ValueError("Invalid destination hash for outbound propagation node")
else:
if self.outbound_propagation_node != destination_hash:
self.outbound_propagation_node = destination_hash
if self.outbound_propagation_link != None:
if self.outbound_propagation_link.destination.hash != destination_hash:
self.outbound_propagation_link.teardown()
self.outbound_propagation_link = None
def get_outbound_propagation_node(self):
return self.outbound_propagation_node
def set_inbound_propagation_node(self, destination_hash):
# TODO: Implement
raise NotImplementedError("Inbound/outbound propagation node differentiation is currently not implemented")
def get_inbound_propagation_node(self):
return self.get_outbound_propagation_node()
def set_retain_node_lxms(self, retain):
if retain == True:
self.retain_synced_on_node = True
else:
self.retain_synced_on_node = False
def set_authentication(self, required=None):
if required != None:
self.auth_required = required
def requires_authentication(self):
return self.auth_required
def allow(self, identity_hash=None):
if isinstance(identity_hash, bytes) and len(identity_hash) == RNS.Identity.TRUNCATED_HASHLENGTH//8:
if not identity_hash in self.allowed_list:
self.allowed_list.append(identity_hash)
else:
raise ValueError("Allowed identity hash must be "+str(RNS.Identity.TRUNCATED_HASHLENGTH//8)+" bytes")
def disallow(self, identity_hash=None):
if isinstance(identity_hash, bytes) and len(identity_hash) == RNS.Identity.TRUNCATED_HASHLENGTH//8:
if identity_hash in self.allowed_list:
self.allowed_list.pop(identity_hash)
else:
raise ValueError("Disallowed identity hash must be "+str(RNS.Identity.TRUNCATED_HASHLENGTH//8)+" bytes")
def prioritise(self, destination_hash=None):
if isinstance(destination_hash, bytes) and len(destination_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8:
if not destination_hash in self.prioritised_list:
self.prioritised_list.append(destination_hash)
else:
raise ValueError("Prioritised destination hash must be "+str(RNS.Reticulum.TRUNCATED_HASHLENGTH//8)+" bytes")
def unprioritise(self, identity_hash=None):
if isinstance(destination_hash, bytes) and len(destination_hash) == RNS.Reticulum.TRUNCATED_HASHLENGTH//8:
if destination_hash in self.prioritised_list:
self.prioritised_list.pop(destination_hash)
else:
raise ValueError("Prioritised destination hash must be "+str(RNS.Reticulum.TRUNCATED_HASHLENGTH//8)+" bytes")
def request_messages_from_propagation_node(self, identity, max_messages = PR_ALL_MESSAGES):
if max_messages == None:
max_messages = LXMRouter.PR_ALL_MESSAGES
self.propagation_transfer_progress = 0.0
self.propagation_transfer_max_messages = max_messages
if self.outbound_propagation_node != None:
if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.ACTIVE:
self.propagation_transfer_state = LXMRouter.PR_LINK_ESTABLISHED
RNS.log("Requesting message list from propagation node", RNS.LOG_DEBUG)
self.outbound_propagation_link.identify(identity)
self.outbound_propagation_link.request(
LXMPeer.MESSAGE_GET_PATH,
[None, None], # Set both want and have fields to None to get message list
response_callback=self.message_list_response,
failed_callback=self.message_get_failed
)
self.propagation_transfer_state = LXMRouter.PR_REQUEST_SENT
else:
if self.outbound_propagation_link == None:
if RNS.Transport.has_path(self.outbound_propagation_node):
self.wants_download_on_path_available_from = None
self.propagation_transfer_state = LXMRouter.PR_LINK_ESTABLISHING
RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for message download", RNS.LOG_DEBUG)
propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node)
propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
def msg_request_established_callback(link):
self.request_messages_from_propagation_node(identity, self.propagation_transfer_max_messages)
self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=msg_request_established_callback)
else:
RNS.log("No path known for message download from propagation node "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG)
RNS.Transport.request_path(self.outbound_propagation_node)
self.wants_download_on_path_available_from = self.outbound_propagation_node
self.wants_download_on_path_available_to = identity
self.wants_download_on_path_available_timeout = time.time() + LXMRouter.PR_PATH_TIMEOUT
self.propagation_transfer_state = LXMRouter.PR_PATH_REQUESTED
self.request_messages_path_job()
else:
RNS.log("Waiting for propagation node link to become active", RNS.LOG_EXTREME)
else:
RNS.log("Cannot request LXMF propagation node sync, no default propagation node configured", RNS.LOG_WARNING)
def cancel_propagation_node_requests(self):
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
self.outbound_propagation_link = None
self.acknowledge_sync_completion(reset_state=True)
def enable_propagation(self):
try:
self.messagepath = self.storagepath+"/messagestore"
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
if not os.path.isdir(self.messagepath):
os.makedirs(self.messagepath)
self.propagation_entries = {}
st = time.time(); RNS.log("Indexing messagestore...", RNS.LOG_NOTICE)
for filename in os.listdir(self.messagepath):
components = filename.split("_")
if len(components) == 2:
if float(components[1]) > 0:
if len(components[0]) == RNS.Identity.HASHLENGTH//8*2:
try:
transient_id = bytes.fromhex(components[0])
received = float(components[1])
filepath = self.messagepath+"/"+filename
msg_size = os.path.getsize(filepath)
file = open(filepath, "rb")
destination_hash = file.read(LXMessage.DESTINATION_LENGTH)
file.close()
self.propagation_entries[transient_id] = [
destination_hash, # 0: Destination hash
filepath, # 1: Storage location
received, # 2: Receive timestamp
msg_size, # 3: Message size
[], # 4: Handled peers
[], # 5: Unhandled peers
]
except Exception as e:
RNS.log("Could not read LXM from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
et = time.time(); mps = 0 if et-st == 0 else math.floor(len(self.propagation_entries)/(et-st))
RNS.log(f"Indexed {len(self.propagation_entries)} messages in {RNS.prettytime(et-st)}, {mps} msgs/s", RNS.LOG_NOTICE)
RNS.log("Rebuilding peer synchronisation states...", RNS.LOG_NOTICE)
st = time.time()
if os.path.isfile(self.storagepath+"/peers"):
peers_file = open(self.storagepath+"/peers", "rb")
peers_data = peers_file.read()
peers_file.close()
if len(peers_data) > 0:
serialised_peers = msgpack.unpackb(peers_data)
del peers_data
while len(serialised_peers) > 0:
serialised_peer = serialised_peers.pop()
peer = LXMPeer.from_bytes(serialised_peer, self)
del serialised_peer
if peer.destination_hash in self.static_peers and peer.last_heard == 0:
# TODO: Allow path request responses through announce handler
# momentarily here, so peering config can be updated even if
# the static peer is not available to directly send an announce.
RNS.Transport.request_path(peer.destination_hash)
if peer.identity != None:
self.peers[peer.destination_hash] = peer
lim_str = ", no transfer limit"
if peer.propagation_transfer_limit != None:
lim_str = ", "+RNS.prettysize(peer.propagation_transfer_limit*1000)+" transfer limit"
RNS.log("Rebuilt peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(peer.unhandled_message_count)+" unhandled messages"+lim_str, RNS.LOG_DEBUG)
else:
RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG)
del peer
del serialised_peers
if len(self.static_peers) > 0:
for static_peer in self.static_peers:
if not static_peer in self.peers:
RNS.log(f"Activating static peering with {RNS.prettyhexrep(static_peer)}", RNS.LOG_NOTICE)
self.peers[static_peer] = LXMPeer(self, static_peer, sync_strategy=self.default_sync_strategy)
if self.peers[static_peer].last_heard == 0:
# TODO: Allow path request responses through announce handler
# momentarily here, so peering config can be updated even if
# the static peer is not available to directly send an announce.
RNS.Transport.request_path(static_peer)
RNS.log(f"Rebuilt synchronisation state for {len(self.peers)} peers in {RNS.prettytime(time.time()-st)}", RNS.LOG_NOTICE)
try:
if os.path.isfile(self.storagepath+"/node_stats"):
node_stats_file = open(self.storagepath+"/node_stats", "rb")
data = node_stats_file.read()
node_stats_file.close()
node_stats = msgpack.unpackb(data)
if not type(node_stats) == dict:
RNS.log("Invalid data format for loaded local node stats, node stats will be reset", RNS.LOG_ERROR)
else:
self.client_propagation_messages_received = node_stats["client_propagation_messages_received"]
self.client_propagation_messages_served = node_stats["client_propagation_messages_served"]
self.unpeered_propagation_incoming = node_stats["unpeered_propagation_incoming"]
self.unpeered_propagation_rx_bytes = node_stats["unpeered_propagation_rx_bytes"]
except Exception as e:
RNS.log("Could not load local node stats. The contained exception was: "+str(e), RNS.LOG_ERROR)
self.propagation_node = True
self.propagation_node_start_time = time.time()
self.propagation_destination.set_link_established_callback(self.propagation_link_established)
self.propagation_destination.set_packet_callback(self.propagation_packet)
self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL)
self.propagation_destination.register_request_handler(LXMPeer.MESSAGE_GET_PATH, self.message_get_request, allow = RNS.Destination.ALLOW_ALL)
self.control_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation", "control")
self.control_destination.register_request_handler(LXMRouter.STATS_GET_PATH, self.stats_get_request, allow = RNS.Destination.ALLOW_LIST, allowed_list=[self.identity.hash])
if self.message_storage_limit != None:
limit_str = ", limit is "+RNS.prettysize(self.message_storage_limit)
else:
limit_str = ""
RNS.log("LXMF Propagation Node message store size is "+RNS.prettysize(self.message_storage_size())+limit_str, RNS.LOG_DEBUG)
self.announce_propagation_node()
except Exception as e:
RNS.log("Could not enable propagation node. The contained exception was: "+str(e), RNS.LOG_ERROR)
raise e
RNS.panic()
def disable_propagation(self):
self.propagation_node = False
self.announce_propagation_node()
def enforce_stamps(self):
self._enforce_stamps = True
def ignore_stamps(self):
self._enforce_stamps = False
def ignore_destination(self, destination_hash):
if not destination_hash in self.ignored_list:
self.ignored_list.append(destination_hash)
def unignore_destination(self, destination_hash):
if destination_hash in self.ignored_list:
self.ignored_list.remove(destination_hash)
def set_message_storage_limit(self, kilobytes = None, megabytes = None, gigabytes = None):
limit_bytes = 0
if kilobytes != None:
limit_bytes += kilobytes*1000
if megabytes != None:
limit_bytes += megabytes*1000*1000
if gigabytes != None:
limit_bytes += gigabytes*1000*1000*1000
if limit_bytes == 0:
limit_bytes = None
try:
if limit_bytes == None or int(limit_bytes) > 0:
self.message_storage_limit = int(limit_bytes)
else:
raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))
except Exception as e:
raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))
def message_storage_limit(self):
return self.message_storage_limit
def message_storage_size(self):
if self.propagation_node:
return sum(self.propagation_entries[f][3] for f in self.propagation_entries)
else:
return None
def set_information_storage_limit(self, kilobytes = None, megabytes = None, gigabytes = None):
limit_bytes = 0
if kilobytes != None:
limit_bytes += kilobytes*1000
if megabytes != None:
limit_bytes += megabytes*1000*1000
if gigabytes != None:
limit_bytes += gigabytes*1000*1000*1000
if limit_bytes == 0:
limit_bytes = None
try:
if limit_bytes == None or int(limit_bytes) > 0:
self.information_storage_limit = int(limit_bytes)
else:
raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))
except Exception as e:
raise ValueError("Cannot set LXMF information storage limit to "+str(limit_bytes))
def information_storage_limit(self):
return self.information_storage_limit
def information_storage_size(self):
pass
def delivery_link_available(self, destination_hash):
if destination_hash in self.direct_links or destination_hash in self.backchannel_links:
return True
else:
return False
### Propagation Node Control ##########################
#######################################################
def compile_stats(self):
if not self.propagation_node:
return None
else:
peer_stats = {}
for peer_id in self.peers.copy():
peer = self.peers[peer_id]
peer_stats[peer_id] = {
"type": "static" if peer_id in self.static_peers else "discovered",
"state": peer.state,
"alive": peer.alive,
"last_heard": int(peer.last_heard),
"next_sync_attempt": peer.next_sync_attempt,
"last_sync_attempt": peer.last_sync_attempt,
"sync_backoff": peer.sync_backoff,
"peering_timebase": peer.peering_timebase,
"ler": int(peer.link_establishment_rate),
"str": int(peer.sync_transfer_rate),
"transfer_limit": peer.propagation_transfer_limit,
"sync_limit": peer.propagation_sync_limit,
"target_stamp_cost": peer.propagation_stamp_cost,
"stamp_cost_flexibility": peer.propagation_stamp_cost_flexibility,
"network_distance": RNS.Transport.hops_to(peer_id),
"rx_bytes": peer.rx_bytes,
"tx_bytes": peer.tx_bytes,
"messages": {
"offered": peer.offered,
"outgoing": peer.outgoing,
"incoming": peer.incoming,
"unhandled": peer.unhandled_message_count
},
}
node_stats = {
"identity_hash": self.identity.hash,
"destination_hash": self.propagation_destination.hash,
"uptime": time.time()-self.propagation_node_start_time,
"delivery_limit": self.delivery_per_transfer_limit,
"propagation_limit": self.propagation_per_transfer_limit,
"sync_limit": self.propagation_per_sync_limit,
"target_stamp_cost": self.propagation_stamp_cost,
"stamp_cost_flexibility": self.propagation_stamp_cost_flexibility,
"autopeer_maxdepth": self.autopeer_maxdepth,
"from_static_only": self.from_static_only,
"messagestore": {
"count": len(self.propagation_entries),
"bytes": self.message_storage_size(),
"limit": self.message_storage_limit,
},
"clients" : {
"client_propagation_messages_received": self.client_propagation_messages_received,
"client_propagation_messages_served": self.client_propagation_messages_served,
},
"unpeered_propagation_incoming": self.unpeered_propagation_incoming,
"unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes,
"static_peers": len(self.static_peers),
"discovered_peers": len(self.peers)-len(self.static_peers),
"total_peers": len(self.peers),
"max_peers": self.max_peers,
"peers": peer_stats,
}
return node_stats
def stats_get_request(self, path, data, request_id, remote_identity, requested_at):
if remote_identity == None: return LXMPeer.ERROR_NO_IDENTITY
elif remote_identity.hash != self.identity.hash: return LXMPeer.ERROR_NO_ACCESS
else: return self.compile_stats()
### Utility & Maintenance #############################
#######################################################
JOB_OUTBOUND_INTERVAL = 1
JOB_STAMPS_INTERVAL = 1
JOB_LINKS_INTERVAL = 1
JOB_TRANSIENT_INTERVAL = 60
JOB_STORE_INTERVAL = 120
JOB_PEERSYNC_INTERVAL = 6
JOB_PEERINGEST_INTERVAL= JOB_PEERSYNC_INTERVAL
JOB_ROTATE_INTERVAL = 56*JOB_PEERINGEST_INTERVAL
def jobs(self):
if not self.exit_handler_running:
self.processing_count += 1
if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0:
self.process_outbound()
if self.processing_count % LXMRouter.JOB_STAMPS_INTERVAL == 0:
threading.Thread(target=self.process_deferred_stamps, daemon=True).start()
if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0:
self.clean_links()
if self.processing_count % LXMRouter.JOB_TRANSIENT_INTERVAL == 0:
self.clean_transient_id_caches()
if self.processing_count % LXMRouter.JOB_STORE_INTERVAL == 0:
if self.propagation_node == True:
self.clean_message_store()
if self.processing_count % LXMRouter.JOB_PEERINGEST_INTERVAL == 0:
if self.propagation_node == True:
self.flush_queues()
if self.processing_count % LXMRouter.JOB_ROTATE_INTERVAL == 0:
if self.propagation_node == True:
self.rotate_peers()
if self.processing_count % LXMRouter.JOB_PEERSYNC_INTERVAL == 0:
if self.propagation_node == True:
self.sync_peers()
def jobloop(self):
while (True):
# TODO: Improve this to scheduling, so manual
# triggers can delay next run
try:
self.jobs()
except Exception as e:
RNS.log("An error ocurred while running LXMF Router jobs.", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
RNS.trace_exception(e)
time.sleep(LXMRouter.PROCESSING_INTERVAL)
def flush_queues(self):
if len(self.peers) > 0:
self.flush_peer_distribution_queue()
RNS.log("Calculating peer distribution queue mappings...", RNS.LOG_DEBUG); st = time.time()
for peer_id in self.peers.copy():
if peer_id in self.peers:
peer = self.peers[peer_id]
if peer.queued_items():
peer.process_queues()
RNS.log(f"Distribution queue mapping completed in {RNS.prettytime(time.time()-st)}", RNS.LOG_DEBUG)
def clean_links(self):
closed_links = []
for link_hash in self.direct_links:
link = self.direct_links[link_hash]
inactive_time = link.no_data_for()
if inactive_time > LXMRouter.LINK_MAX_INACTIVITY:
link.teardown()
closed_links.append(link_hash)
for link_hash in closed_links:
cleaned_link = self.direct_links.pop(link_hash)
RNS.log("Cleaned link "+str(cleaned_link), RNS.LOG_DEBUG)
try:
inactive_links = []
for link in self.active_propagation_links:
if link.no_data_for() > LXMRouter.P_LINK_MAX_INACTIVITY:
inactive_links.append(link)
for link in inactive_links:
self.active_propagation_links.remove(link)
link.teardown()
except Exception as e:
RNS.log("An error occurred while cleaning inbound propagation links. The contained exception was: "+str(e), RNS.LOG_ERROR)
if self.outbound_propagation_link != None and self.outbound_propagation_link.status == RNS.Link.CLOSED:
self.outbound_propagation_link = None
if self.propagation_transfer_state == LXMRouter.PR_COMPLETE:
self.acknowledge_sync_completion()
elif self.propagation_transfer_state < LXMRouter.PR_LINK_ESTABLISHED:
self.acknowledge_sync_completion(failure_state=LXMRouter.PR_LINK_FAILED)
elif self.propagation_transfer_state >= LXMRouter.PR_LINK_ESTABLISHED and self.propagation_transfer_state < LXMRouter.PR_COMPLETE:
self.acknowledge_sync_completion(failure_state=LXMRouter.PR_TRANSFER_FAILED)
else:
RNS.log(f"Unknown propagation transfer state on link cleaning: {self.propagation_transfer_state}", RNS.LOG_DEBUG)
self.acknowledge_sync_completion()
RNS.log("Cleaned outbound propagation link", RNS.LOG_DEBUG)
def clean_transient_id_caches(self):
now = time.time()
removed_entries = []
for transient_id in self.locally_delivered_transient_ids:
timestamp = self.locally_delivered_transient_ids[transient_id]
if now > timestamp+LXMRouter.MESSAGE_EXPIRY*6.0:
removed_entries.append(transient_id)
for transient_id in removed_entries:
self.locally_delivered_transient_ids.pop(transient_id)
RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from local delivery cache", RNS.LOG_DEBUG)
removed_entries = []
for transient_id in self.locally_processed_transient_ids:
timestamp = self.locally_processed_transient_ids[transient_id]
if now > timestamp+LXMRouter.MESSAGE_EXPIRY*6.0:
removed_entries.append(transient_id)
for transient_id in removed_entries:
self.locally_processed_transient_ids.pop(transient_id)
RNS.log("Cleaned "+RNS.prettyhexrep(transient_id)+" from locally processed cache", RNS.LOG_DEBUG)
def update_stamp_cost(self, destination_hash, stamp_cost):
RNS.log(f"Updating outbound stamp cost for {RNS.prettyhexrep(destination_hash)} to {stamp_cost}", RNS.LOG_DEBUG)
self.outbound_stamp_costs[destination_hash] = [time.time(), stamp_cost]
def job():
self.save_outbound_stamp_costs()
threading.Thread(target=self.save_outbound_stamp_costs, daemon=True).start()
def get_wanted_inbound_peers(self):
# TODO: Implement/rethink.
# Probably not necessary anymore.
return None
def get_announce_app_data(self, destination_hash):
if destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash]
display_name = None
if delivery_destination.display_name != None:
display_name = delivery_destination.display_name.encode("utf-8")
stamp_cost = None
if delivery_destination.stamp_cost != None and type(delivery_destination.stamp_cost) == int:
if delivery_destination.stamp_cost > 0 and delivery_destination.stamp_cost < 255:
stamp_cost = delivery_destination.stamp_cost
peer_data = [display_name, stamp_cost]
return msgpack.packb(peer_data)
def get_weight(self, transient_id):
dst_hash = self.propagation_entries[transient_id][0]
lxm_rcvd = self.propagation_entries[transient_id][2]
lxm_size = self.propagation_entries[transient_id][3]
now = time.time()
age_weight = max(1, (now - lxm_rcvd)/60/60/24/4)
if dst_hash in self.prioritised_list:
priority_weight = 0.1
else:
priority_weight = 1.0
weight = priority_weight * age_weight * lxm_size
return weight
def generate_ticket(self, destination_hash, expiry=LXMessage.TICKET_EXPIRY):
now = time.time()
ticket = None
if destination_hash in self.available_tickets["last_deliveries"]:
last_delivery = self.available_tickets["last_deliveries"][destination_hash]
elapsed = now - last_delivery
if elapsed < LXMessage.TICKET_INTERVAL:
RNS.log(f"A ticket for {RNS.prettyhexrep(destination_hash)} was already delivered {RNS.prettytime(elapsed)} ago, not including another ticket yet", RNS.LOG_DEBUG)
return None
if destination_hash in self.available_tickets["inbound"]:
for ticket in self.available_tickets["inbound"][destination_hash]:
ticket_entry = self.available_tickets["inbound"][destination_hash][ticket]
expires = ticket_entry[0]; validity_left = expires - now
if validity_left > LXMessage.TICKET_RENEW:
RNS.log(f"Found generated ticket for {RNS.prettyhexrep(destination_hash)} with {RNS.prettytime(validity_left)} of validity left, re-using this one", RNS.LOG_DEBUG)
return [expires, ticket]
else:
self.available_tickets["inbound"][destination_hash] = {}
RNS.log(f"No generated tickets for {RNS.prettyhexrep(destination_hash)} with enough validity found, generating a new one", RNS.LOG_DEBUG)
expires = now+expiry
ticket = os.urandom(LXMessage.TICKET_LENGTH)
self.available_tickets["inbound"][destination_hash][ticket] = [expires]
self.save_available_tickets()
return [expires, ticket]
def remember_ticket(self, destination_hash, ticket_entry):
expires = ticket_entry[0]-time.time()
RNS.log(f"Remembering ticket for {RNS.prettyhexrep(destination_hash)}, expires in {RNS.prettytime(expires)}", RNS.LOG_DEBUG)
self.available_tickets["outbound"][destination_hash] = [ticket_entry[0], ticket_entry[1]]
def get_outbound_ticket(self, destination_hash):
if destination_hash in self.available_tickets["outbound"]:
entry = self.available_tickets["outbound"][destination_hash]
if entry[0] > time.time():
return entry[1]
return None
def get_outbound_ticket_expiry(self, destination_hash):
if destination_hash in self.available_tickets["outbound"]:
entry = self.available_tickets["outbound"][destination_hash]
if entry[0] > time.time():
return entry[0]
return None
def get_inbound_tickets(self, destination_hash):
now = time.time()
available_tickets = []
if destination_hash in self.available_tickets["inbound"]:
for inbound_ticket in self.available_tickets["inbound"][destination_hash]:
if now < self.available_tickets["inbound"][destination_hash][inbound_ticket][0]:
available_tickets.append(inbound_ticket)
if len(available_tickets) == 0:
return None
else:
return available_tickets
def get_size(self, transient_id):
lxm_size = self.propagation_entries[transient_id][3]
return lxm_size
def clean_message_store(self):
RNS.log("Cleaning message store", RNS.LOG_VERBOSE)
# Check and remove expired messages
now = time.time()
removed_entries = {}
for transient_id in self.propagation_entries.copy():
entry = self.propagation_entries[transient_id]
filepath = entry[1]
components = filepath.split("_")
if len(components) == 2 and float(components[1]) > 0 and len(os.path.split(components[0])[1]) == (RNS.Identity.HASHLENGTH//8)*2:
timestamp = float(components[1])
if now > timestamp+LXMRouter.MESSAGE_EXPIRY:
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to expiry", RNS.LOG_EXTREME)
removed_entries[transient_id] = filepath
else:
RNS.log("Purging message "+RNS.prettyhexrep(transient_id)+" due to invalid file path", RNS.LOG_WARNING)
removed_entries[transient_id] = filepath
removed_count = 0
for transient_id in removed_entries:
try:
filepath = removed_entries[transient_id]
self.propagation_entries.pop(transient_id)
if os.path.isfile(filepath):
os.unlink(filepath)
removed_count += 1
except Exception as e:
RNS.log("Could not remove "+RNS.prettyhexrep(transient_id)+" from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
if removed_count > 0:
RNS.log("Cleaned "+str(removed_count)+" entries from the message store", RNS.LOG_VERBOSE)
# Check size of message store and cull if needed
try:
message_storage_size = self.message_storage_size()
if message_storage_size != None:
if self.message_storage_limit != None and message_storage_size > self.message_storage_limit:
# Clean the message storage according to priorities
bytes_needed = message_storage_size - self.message_storage_limit
bytes_cleaned = 0
weighted_entries = []
for transient_id in self.propagation_entries.copy():
weighted_entries.append([
self.propagation_entries[transient_id],
self.get_weight(transient_id),
transient_id
])
weighted_entries.sort(key=lambda we: we[1], reverse=True)
i = 0
while i < len(weighted_entries) and bytes_cleaned < bytes_needed:
try:
w = weighted_entries[i]
entry = w[0]
transient_id = w[2]
filepath = entry[1]
if os.path.isfile(filepath):
os.unlink(filepath)
self.propagation_entries.pop(transient_id)
bytes_cleaned += entry[3]
RNS.log("Removed "+RNS.prettyhexrep(transient_id)+" with weight "+str(w[1])+" to clear up "+RNS.prettysize(entry[3])+", now cleaned "+RNS.prettysize(bytes_cleaned)+" out of "+RNS.prettysize(bytes_needed)+" needed", RNS.LOG_EXTREME)
except Exception as e:
RNS.log("Error while cleaning LXMF message from message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
finally:
i += 1
RNS.log("LXMF message store size is now "+RNS.prettysize(self.message_storage_size())+" for "+str(len(self.propagation_entries))+" items", RNS.LOG_EXTREME)
except Exception as e:
RNS.log("Could not clean the LXMF message store. The contained exception was: "+str(e), RNS.LOG_ERROR)
def save_locally_delivered_transient_ids(self):
try:
if len(self.locally_delivered_transient_ids) > 0:
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
with open(self.storagepath+"/local_deliveries", "wb") as locally_delivered_file:
locally_delivered_file.write(msgpack.packb(self.locally_delivered_transient_ids))
except Exception as e:
RNS.log("Could not save locally delivered message ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def save_locally_processed_transient_ids(self):
try:
if len(self.locally_processed_transient_ids) > 0:
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
with open(self.storagepath+"/locally_processed", "wb") as locally_processed_file:
locally_processed_file.write(msgpack.packb(self.locally_processed_transient_ids))
except Exception as e:
RNS.log("Could not save locally processed transient ID cache to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def save_node_stats(self):
try:
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
with open(self.storagepath+"/node_stats", "wb") as stats_file:
node_stats = {
"client_propagation_messages_received": self.client_propagation_messages_received,
"client_propagation_messages_served": self.client_propagation_messages_served,
"unpeered_propagation_incoming": self.unpeered_propagation_incoming,
"unpeered_propagation_rx_bytes": self.unpeered_propagation_rx_bytes,
}
stats_file.write(msgpack.packb(node_stats))
except Exception as e:
RNS.log("Could not save local node stats to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def clean_outbound_stamp_costs(self):
try:
expired = []
for destination_hash in self.outbound_stamp_costs:
entry = self.outbound_stamp_costs[destination_hash]
if time.time() > entry[0] + LXMRouter.STAMP_COST_EXPIRY:
expired.append(destination_hash)
for destination_hash in expired:
self.outbound_stamp_costs.pop(destination_hash)
except Exception as e:
RNS.log(f"Error while cleaning outbound stamp costs. The contained exception was: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
def save_outbound_stamp_costs(self):
with self.cost_file_lock:
try:
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
outbound_stamp_costs_file = open(self.storagepath+"/outbound_stamp_costs", "wb")
outbound_stamp_costs_file.write(msgpack.packb(self.outbound_stamp_costs))
outbound_stamp_costs_file.close()
except Exception as e:
RNS.log("Could not save outbound stamp costs to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def clean_available_tickets(self):
try:
# Clean outbound tickets
expired_outbound = []
for destination_hash in self.available_tickets["outbound"]:
entry = self.available_tickets["outbound"][destination_hash]
if time.time() > entry[0]:
expired_outbound.append(destination_hash)
for destination_hash in expired_outbound:
self.available_tickets["outbound"].pop(destination_hash)
# Clean inbound tickets
for destination_hash in self.available_tickets["inbound"]:
expired_inbound = []
for inbound_ticket in self.available_tickets["inbound"][destination_hash]:
entry = self.available_tickets["inbound"][destination_hash][inbound_ticket]
ticket_expiry = entry[0]
if time.time() > ticket_expiry+LXMessage.TICKET_GRACE:
expired_inbound.append(inbound_ticket)
for inbound_ticket in expired_inbound:
self.available_tickets["inbound"][destination_hash].pop(inbound_ticket)
except Exception as e:
RNS.log(f"Error while cleaning available tickets. The contained exception was: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
def save_available_tickets(self):
with self.ticket_file_lock:
try:
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
available_tickets_file = open(self.storagepath+"/available_tickets", "wb")
available_tickets_file.write(msgpack.packb(self.available_tickets))
available_tickets_file.close()
except Exception as e:
RNS.log("Could not save available tickets to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
def reload_available_tickets(self):
RNS.log("Reloading available tickets from storage", RNS.LOG_DEBUG)
try:
with self.ticket_file_lock:
with open(self.storagepath+"/available_tickets", "rb") as available_tickets_file:
data = available_tickets_file.read()
self.available_tickets = msgpack.unpackb(data)
if not type(self.available_tickets) == dict:
RNS.log("Invalid data format for loaded available tickets, recreating...", RNS.LOG_ERROR)
self.available_tickets = {"outbound": {}, "inbound": {}, "last_deliveries": {}}
if not "outbound" in self.available_tickets:
RNS.log("Missing outbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
self.available_tickets["outbound"] = {}
if not "inbound" in self.available_tickets:
RNS.log("Missing inbound entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
self.available_tickets["inbound"] = {}
if not "last_deliveries" in self.available_tickets:
RNS.log("Missing local_deliveries entry in loaded available tickets, recreating...", RNS.LOG_ERROR)
self.available_tickets["last_deliveries"] = {}
except Exception as e:
RNS.log(f"An error occurred while reloading available tickets from storage: {e}", RNS.LOG_ERROR)
def exit_handler(self):
if self.exit_handler_running:
return
self.exit_handler_running = True
RNS.log("Tearing down delivery destinations...", RNS.LOG_NOTICE)
for destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash]
delivery_destination.set_packet_callback(None)
delivery_destination.set_link_established_callback(None)
for link in delivery_destination.links:
try:
if link.status == RNS.Link.ACTIVE:
link.teardown()
except Exception as e:
RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR)
if self.propagation_node:
RNS.log("Tearing down propagation node destination...", RNS.LOG_NOTICE)
self.propagation_destination.set_link_established_callback(None)
self.propagation_destination.set_packet_callback(None)
self.propagation_destination.deregister_request_handler(LXMPeer.OFFER_REQUEST_PATH)
self.propagation_destination.deregister_request_handler(LXMPeer.MESSAGE_GET_PATH)
self.propagation_destination.deregister_request_handler(LXMRouter.STATS_GET_PATH)
for link in self.active_propagation_links:
try:
if link.status == RNS.Link.ACTIVE:
link.teardown()
except Exception as e:
RNS.log("Error while tearing down propagation link: {e}", RNS.LOG_ERROR)
RNS.log("Persisting LXMF state data to storage...", RNS.LOG_NOTICE)
self.flush_queues()
if self.propagation_node:
try:
st = time.time(); RNS.log(f"Saving {len(self.peers)} peer synchronisation states to storage...", RNS.LOG_NOTICE)
serialised_peers = []
peer_dict = self.peers.copy()
for peer_id in peer_dict:
peer = self.peers[peer_id]
serialised_peers.append(peer.to_bytes())
peers_file = open(self.storagepath+"/peers", "wb")
peers_file.write(msgpack.packb(serialised_peers))
peers_file.close()
RNS.log(f"Saved {len(serialised_peers)} peers to storage in {RNS.prettyshorttime(time.time()-st)}", RNS.LOG_NOTICE)
except Exception as e:
RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
self.save_locally_delivered_transient_ids()
self.save_locally_processed_transient_ids()
self.save_node_stats()
def sigint_handler(self, signal, frame):
if not self.exit_handler_running:
RNS.log("Received SIGINT, shutting down now!", RNS.LOG_WARNING)
sys.exit(0)
else:
RNS.log("Received SIGINT, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING)
def sigterm_handler(self, signal, frame):
if not self.exit_handler_running:
RNS.log("Received SIGTERM, shutting down now!", RNS.LOG_WARNING)
sys.exit(0)
else:
RNS.log("Received SIGTERM, but exit handler is running, keeping process alive until storage persist is complete", RNS.LOG_WARNING)
def __str__(self):
return "<LXMRouter "+RNS.hexrep(self.identity.hash, delimit=False)+">"
### Message Download ##################################
#######################################################
def request_messages_path_job(self):
job_thread = threading.Thread(target=self.__request_messages_path_job)
job_thread.setDaemon(True)
job_thread.start()
def __request_messages_path_job(self):
path_timeout = self.wants_download_on_path_available_timeout
while not RNS.Transport.has_path(self.wants_download_on_path_available_from) and time.time() < path_timeout:
time.sleep(0.1)
if RNS.Transport.has_path(self.wants_download_on_path_available_from):
self.request_messages_from_propagation_node(self.wants_download_on_path_available_to, self.propagation_transfer_max_messages)
else:
RNS.log("Propagation node path request timed out", RNS.LOG_DEBUG)
self.acknowledge_sync_completion(failure_state=LXMRouter.PR_NO_PATH)
def identity_allowed(self, identity):
if self.auth_required:
if identity.hash in self.allowed_list:
return True
else:
return False
else:
return True
def message_get_request(self, path, data, request_id, remote_identity, requested_at):
if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY
elif not self.identity_allowed(remote_identity):
return LXMPeer.ERROR_NO_ACCESS
else:
try:
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "delivery")
# If both want and have fields are empty, send a list of
# available messages.
if data[0] == None and data[1] == None:
available_messages = []
for transient_id in self.propagation_entries:
message_entry = self.propagation_entries[transient_id]
if message_entry[0] == remote_destination.hash:
message_size = os.path.getsize(message_entry[1])
available_entry = [transient_id, message_size]
available_messages.append(available_entry)
available_messages.sort(key=lambda e: e[1], reverse=False)
transient_ids = []
for available_entry in available_messages:
transient_ids.append(available_entry[0])
return transient_ids
else:
# Process messages the client already have
if data[1] != None and len(data[1]) > 0:
for transient_id in data[1]:
if transient_id in self.propagation_entries and self.propagation_entries[transient_id][0] == remote_destination.hash:
try:
filepath = self.propagation_entries[transient_id][1]
self.propagation_entries.pop(transient_id)
os.unlink(filepath)
RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" purged message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG)
except Exception as e:
RNS.log("Error while processing message purge request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
# Process wanted messages
response_messages = []
if data[0] != None and len(data[0]) > 0:
client_transfer_limit = None
if len(data) >= 3:
try:
client_transfer_limit = float(data[2])*1000
RNS.log("Client indicates transfer limit of "+RNS.prettysize(client_transfer_limit), RNS.LOG_DEBUG)
except:
pass
per_message_overhead = 16 # Really only 2 bytes, but set a bit higher for now
cumulative_size = 24 # Initialised to highest reasonable binary structure overhead
for transient_id in data[0]:
if transient_id in self.propagation_entries and self.propagation_entries[transient_id][0] == remote_destination.hash:
try:
filepath = self.propagation_entries[transient_id][1]
RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" requested message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG)
message_file = open(filepath, "rb")
lxmf_data = message_file.read()
message_file.close()
lxm_size = len(lxmf_data)
next_size = cumulative_size + (lxm_size+per_message_overhead)
if client_transfer_limit != None and next_size > client_transfer_limit:
pass
else:
response_messages.append(lxmf_data)
cumulative_size += (lxm_size+per_message_overhead)
except Exception as e:
RNS.log("Error while processing message download request from "+RNS.prettyhexrep(remote_destination.hash)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
self.client_propagation_messages_served += len(response_messages)
return response_messages
except Exception as e:
RNS.log("Error occurred while generating response for download request, the contained exception was: "+str(e), RNS.LOG_DEBUG)
return None
def message_list_response(self, request_receipt):
if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY:
RNS.log("Propagation node indicated missing identification on list request, tearing down link.", RNS.LOG_DEBUG)
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
self.propagation_transfer_state = LXMRouter.PR_NO_IDENTITY_RCVD
elif request_receipt.response == LXMPeer.ERROR_NO_ACCESS:
RNS.log("Propagation node did not allow list request, tearing down link.", RNS.LOG_DEBUG)
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
self.propagation_transfer_state = LXMRouter.PR_NO_ACCESS
else:
if request_receipt.response != None and isinstance(request_receipt.response, list):
haves = []
wants = []
if len(request_receipt.response) > 0:
for transient_id in request_receipt.response:
if self.has_message(transient_id):
if not self.retain_synced_on_node:
haves.append(transient_id)
else:
if self.propagation_transfer_max_messages == LXMRouter.PR_ALL_MESSAGES or len(wants) < self.propagation_transfer_max_messages:
wants.append(transient_id)
ms = "" if len(wants) == 1 else "s"
RNS.log(f"Requesting {len(wants)} message{ms} from propagation node", RNS.LOG_DEBUG)
request_receipt.link.request(
LXMPeer.MESSAGE_GET_PATH,
[wants, haves, self.delivery_per_transfer_limit],
response_callback=self.message_get_response,
failed_callback=self.message_get_failed,
progress_callback=self.message_get_progress)
else:
self.propagation_transfer_state = LXMRouter.PR_COMPLETE
self.propagation_transfer_progress = 1.0
self.propagation_transfer_last_result = 0
else:
RNS.log("Invalid message list data received from propagation node", RNS.LOG_DEBUG)
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
def message_get_response(self, request_receipt):
if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY:
RNS.log("Propagation node indicated missing identification on get request, tearing down link.", RNS.LOG_DEBUG)
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
self.propagation_transfer_state = LXMRouter.PR_NO_IDENTITY_RCVD
elif request_receipt.response == LXMPeer.ERROR_NO_ACCESS:
RNS.log("Propagation node did not allow get request, tearing down link.", RNS.LOG_DEBUG)
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
self.propagation_transfer_state = LXMRouter.PR_NO_ACCESS
else:
if request_receipt.response != None and len(request_receipt.response) > 0:
haves = []
for lxmf_data in request_receipt.response:
self.lxmf_propagation(lxmf_data)
haves.append(RNS.Identity.full_hash(lxmf_data))
# Return a list of successfully received messages to the node.
# This deletes the messages on the propagation node.
# TODO: Add option to keep messages on node.
request_receipt.link.request(
LXMPeer.MESSAGE_GET_PATH,
[None, haves],
# response_callback=self.message_syncfinal_response,
failed_callback=self.message_get_failed,
# progress_callback=self.message_get_progress
)
self.propagation_transfer_state = LXMRouter.PR_COMPLETE
self.propagation_transfer_progress = 1.0
self.propagation_transfer_last_result = len(request_receipt.response)
self.save_locally_delivered_transient_ids()
def message_get_progress(self, request_receipt):
self.propagation_transfer_state = LXMRouter.PR_RECEIVING
self.propagation_transfer_progress = request_receipt.get_progress()
def message_get_failed(self, request_receipt):
RNS.log("Message list/get request failed", RNS.LOG_DEBUG)
if self.outbound_propagation_link != None:
self.outbound_propagation_link.teardown()
def acknowledge_sync_completion(self, reset_state=False, failure_state=None):
self.propagation_transfer_last_result = None
if reset_state or self.propagation_transfer_state <= LXMRouter.PR_COMPLETE:
if failure_state == None:
self.propagation_transfer_state = LXMRouter.PR_IDLE
else:
self.propagation_transfer_state = failure_state
self.propagation_transfer_progress = 0.0
self.wants_download_on_path_available_from = None
self.wants_download_on_path_available_to = None
def has_message(self, transient_id):
if transient_id in self.locally_delivered_transient_ids:
return True
else:
return False
def cancel_outbound(self, message_id):
try:
if message_id in self.pending_deferred_stamps:
lxm = self.pending_deferred_stamps[message_id]
RNS.log(f"Cancelling deferred stamp generation for {lxm}", RNS.LOG_DEBUG)
lxm.state = LXMessage.CANCELLED
LXStamper.cancel_work(message_id)
lxmessage = None
for lxm in self.pending_outbound:
if lxm.message_id == message_id:
lxmessage = lxm
if lxmessage != None:
lxmessage.state = LXMessage.CANCELLED
if lxmessage in self.pending_outbound:
RNS.log(f"Cancelling {lxmessage} in outbound queue", RNS.LOG_DEBUG)
if lxmessage.representation == LXMessage.RESOURCE:
if lxmessage.resource_representation != None:
lxmessage.resource_representation.cancel()
self.process_outbound()
except Exception as e:
RNS.log(f"An error occurred while cancelling {lxmessage}: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
def handle_outbound(self, lxmessage):
destination_hash = lxmessage.get_destination().hash
if lxmessage.stamp_cost == None:
if destination_hash in self.outbound_stamp_costs:
stamp_cost = self.outbound_stamp_costs[destination_hash][1]
lxmessage.stamp_cost = stamp_cost
RNS.log(f"No stamp cost set on LXM to {RNS.prettyhexrep(destination_hash)}, autoconfigured to {stamp_cost}, as required by latest announce", RNS.LOG_DEBUG)
lxmessage.state = LXMessage.OUTBOUND
# If an outbound ticket is available for this
# destination, attach it to the message.
lxmessage.outbound_ticket = self.get_outbound_ticket(destination_hash)
if lxmessage.outbound_ticket != None and lxmessage.defer_stamp:
RNS.log(f"Deferred stamp generation was requested for {lxmessage}, but outbound ticket was applied, processing immediately", RNS.LOG_DEBUG)
lxmessage.defer_stamp = False
# If requested, include a ticket to allow the
# destination to reply without generating a stamp.
if lxmessage.include_ticket:
ticket = self.generate_ticket(lxmessage.destination_hash)
if ticket:
lxmessage.fields[FIELD_TICKET] = ticket
if not lxmessage.packed:
lxmessage.pack()
unknown_path_requested = False
if not RNS.Transport.has_path(destination_hash) and lxmessage.method == LXMessage.OPPORTUNISTIC:
RNS.log(f"Pre-emptively requesting unknown path for opportunistic {lxmessage}", RNS.LOG_DEBUG)
RNS.Transport.request_path(destination_hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
unknown_path_requested = True
lxmessage.determine_transport_encryption()
if lxmessage.defer_stamp and lxmessage.stamp_cost == None:
RNS.log(f"Deferred stamp generation was requested for {lxmessage}, but no stamp is required, processing immediately", RNS.LOG_DEBUG)
lxmessage.defer_stamp = False
if not lxmessage.defer_stamp:
while not unknown_path_requested and self.processing_outbound:
time.sleep(0.05)
self.pending_outbound.append(lxmessage)
if not unknown_path_requested:
self.process_outbound()
else:
self.pending_deferred_stamps[lxmessage.message_id] = lxmessage
def get_outbound_progress(self, lxm_hash):
for lxm in self.pending_outbound:
if lxm.hash == lxm_hash:
return lxm.progress
for lxm_id in self.pending_deferred_stamps:
if self.pending_deferred_stamps[lxm_id].hash == lxm_hash:
return self.pending_deferred_stamps[lxm_id].progress
return None
def get_outbound_lxm_stamp_cost(self, lxm_hash):
for lxm in self.pending_outbound:
if lxm.hash == lxm_hash:
return lxm.stamp_cost
for lxm_id in self.pending_deferred_stamps:
if self.pending_deferred_stamps[lxm_id].hash == lxm_hash:
return self.pending_deferred_stamps[lxm_id].stamp_cost
return None
### Message Routing & Delivery ########################
#######################################################
def lxmf_delivery(self, lxmf_data, destination_type = None, phy_stats = None, ratchet_id = None, method = None, no_stamp_enforcement=False, allow_duplicate=False):
try:
message = LXMessage.unpack_from_bytes(lxmf_data)
if ratchet_id and not message.ratchet_id:
message.ratchet_id = ratchet_id
if method:
message.method = method
if message.signature_validated and FIELD_TICKET in message.fields:
ticket_entry = message.fields[FIELD_TICKET]
if type(ticket_entry) == list and len(ticket_entry) > 1:
expires = ticket_entry[0]
ticket = ticket_entry[1]
if time.time() < expires:
if type(ticket) == bytes and len(ticket) == LXMessage.TICKET_LENGTH:
self.remember_ticket(message.source_hash, ticket_entry)
def save_job():
self.save_available_tickets()
threading.Thread(target=save_job, daemon=True).start()
required_stamp_cost = self.delivery_destinations[message.destination_hash].stamp_cost
if required_stamp_cost != None:
destination_tickets = self.get_inbound_tickets(message.source_hash)
if message.validate_stamp(required_stamp_cost, tickets=destination_tickets):
message.stamp_valid = True
message.stamp_checked = True
else:
message.stamp_valid = False
message.stamp_checked = True
if not message.stamp_valid:
if no_stamp_enforcement:
RNS.log(f"Received {message} with invalid stamp, but allowing anyway, since stamp enforcement was temporarily disabled", RNS.LOG_NOTICE)
else:
if self._enforce_stamps:
RNS.log(f"Dropping {message} with invalid stamp", RNS.LOG_NOTICE)
return False
else:
RNS.log(f"Received {message} with invalid stamp, but allowing anyway, since stamp enforcement is disabled", RNS.LOG_NOTICE)
else:
RNS.log(f"Received {message} with valid stamp", RNS.LOG_DEBUG)
if phy_stats != None:
if "rssi" in phy_stats: message.rssi = phy_stats["rssi"]
if "snr" in phy_stats: message.snr = phy_stats["snr"]
if "q" in phy_stats: message.q = phy_stats["q"]
# TODO: Update these descriptions to account for ratchets
if destination_type == RNS.Destination.SINGLE:
message.transport_encrypted = True
message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC
elif destination_type == RNS.Destination.GROUP:
message.transport_encrypted = True
message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_AES
elif destination_type == RNS.Destination.LINK:
message.transport_encrypted = True
message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC
else:
message.transport_encrypted = False
message.transport_encryption = None
if message.source_hash in self.ignored_list:
RNS.log(str(self)+" ignored message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG)
return False
if not allow_duplicate and self.has_message(message.hash):
RNS.log(str(self)+" ignored already received message from "+RNS.prettyhexrep(message.source_hash), RNS.LOG_DEBUG)
return False
else:
self.locally_delivered_transient_ids[message.hash] = time.time()
if self.__delivery_callback != None and callable(self.__delivery_callback):
try:
self.__delivery_callback(message)
except Exception as e:
RNS.log("An error occurred in the external delivery callback for "+str(message), RNS.LOG_ERROR)
RNS.trace_exception(e)
return True
except Exception as e:
RNS.log("Could not assemble LXMF message from received data", RNS.LOG_NOTICE)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
return False
def delivery_packet(self, data, packet):
packet.prove()
try:
method = None
if packet.destination_type != RNS.Destination.LINK:
method = LXMessage.OPPORTUNISTIC
lxmf_data = b""
lxmf_data += packet.destination.hash
lxmf_data += data
else:
method = LXMessage.DIRECT
lxmf_data = data
try:
reticulum = RNS.Reticulum.get_instance()
if packet.rssi == None: packet.rssi = reticulum.get_packet_rssi(packet.packet_hash)
if packet.snr == None: packet.snr = reticulum.get_packet_snr(packet.packet_hash)
if packet.q == None: packet.q = reticulum.get_packet_q(packet.packet_hash)
except Exception as e:
RNS.log("Error while retrieving physical link stats for LXMF delivery packet: "+str(e), RNS.LOG_ERROR)
phy_stats = {"rssi": packet.rssi, "snr": packet.snr, "q": packet.q}
self.lxmf_delivery(lxmf_data, packet.destination_type, phy_stats=phy_stats, ratchet_id=packet.ratchet_id, method=method)
except Exception as e:
RNS.log("Exception occurred while parsing incoming LXMF data.", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
def delivery_link_established(self, link):
link.track_phy_stats(True)
link.set_packet_callback(self.delivery_packet)
link.set_resource_strategy(RNS.Link.ACCEPT_APP)
link.set_resource_callback(self.delivery_resource_advertised)
link.set_resource_started_callback(self.resource_transfer_began)
link.set_resource_concluded_callback(self.delivery_resource_concluded)
link.set_remote_identified_callback(self.delivery_remote_identified)
def delivery_link_closed(self, link):
pass
def resource_transfer_began(self, resource):
RNS.log("Transfer began for LXMF delivery resource "+str(resource), RNS.LOG_DEBUG)
def delivery_resource_advertised(self, resource):
size = resource.get_data_size()
limit = self.delivery_per_transfer_limit*1000
if limit != None and size > limit:
RNS.log("Rejecting "+RNS.prettysize(size)+" incoming LXMF delivery resource, since it exceeds the limit of "+RNS.prettysize(limit), RNS.LOG_DEBUG)
return False
else:
return True
def delivery_resource_concluded(self, resource):
RNS.log("Transfer concluded for LXMF delivery resource "+str(resource), RNS.LOG_DEBUG)
if resource.status == RNS.Resource.COMPLETE:
ratchet_id = None
# Set ratchet ID to link ID if available
if resource.link and hasattr(resource.link, "link_id"):
ratchet_id = resource.link.link_id
phy_stats = {"rssi": resource.link.rssi, "snr": resource.link.snr, "q": resource.link.q}
self.lxmf_delivery(resource.data.read(), resource.link.type, phy_stats=phy_stats, ratchet_id=ratchet_id, method=LXMessage.DIRECT)
def delivery_remote_identified(self, link, identity):
destination_hash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", identity)
self.backchannel_links[destination_hash] = link
RNS.log(f"Backchannel became available for {RNS.prettyhexrep(destination_hash)} on delivery link {link}", RNS.LOG_DEBUG)
### Peer Sync & Propagation ###########################
#######################################################
def peer(self, destination_hash, timestamp, propagation_transfer_limit, propagation_sync_limit, propagation_stamp_cost, propagation_stamp_cost_flexibility):
if destination_hash in self.peers:
peer = self.peers[destination_hash]
if timestamp > peer.peering_timebase:
peer.alive = True
peer.sync_backoff = 0
peer.next_sync_attempt = 0
peer.peering_timebase = timestamp
peer.last_heard = time.time()
peer.propagation_stamp_cost = propagation_stamp_cost
peer.propagation_stamp_cost_flexibility = propagation_stamp_cost_flexibility
peer.propagation_transfer_limit = propagation_transfer_limit
if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit
else: peer.propagation_sync_limit = propagation_transfer_limit
RNS.log(f"Peering config updated for {RNS.prettyhexrep(destination_hash)}", RNS.LOG_VERBOSE)
else:
if len(self.peers) < self.max_peers:
peer = LXMPeer(self, destination_hash, sync_strategy=self.default_sync_strategy)
peer.alive = True
peer.last_heard = time.time()
peer.propagation_stamp_cost = propagation_stamp_cost
peer.propagation_stamp_cost_flexibility = propagation_stamp_cost_flexibility
peer.propagation_transfer_limit = propagation_transfer_limit
if propagation_sync_limit != None: peer.propagation_sync_limit = propagation_sync_limit
else: peer.propagation_sync_limit = propagation_transfer_limit
self.peers[destination_hash] = peer
RNS.log(f"Peered with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_NOTICE)
else:
RNS.log(f"Max peers reached, not peering with {RNS.prettyhexrep(destination_hash)}", RNS.LOG_DEBUG)
def unpeer(self, destination_hash, timestamp = None):
if timestamp == None:
timestamp = int(time.time())
if destination_hash in self.peers:
peer = self.peers[destination_hash]
if timestamp >= peer.peering_timebase:
self.peers.pop(destination_hash)
RNS.log("Broke peering with "+str(peer.destination))
def rotate_peers(self):
try:
rotation_headroom = max(1, math.floor(self.max_peers*(LXMRouter.ROTATION_HEADROOM_PCT/100.0)))
required_drops = len(self.peers) - (self.max_peers - rotation_headroom)
if required_drops > 0 and len(self.peers) - required_drops > 1:
peers = self.peers.copy()
untested_peers = []
for peer_id in self.peers:
peer = self.peers[peer_id]
if peer.last_sync_attempt == 0:
untested_peers.append(peer)
if len(untested_peers) >= rotation_headroom:
RNS.log("Newly added peer threshold reached, postponing peer rotation", RNS.LOG_DEBUG)
return
fully_synced_peers = {}
for peer_id in peers:
peer = peers[peer_id]
if peer.unhandled_message_count == 0:
fully_synced_peers[peer_id] = peer
if len(fully_synced_peers) > 0:
peers = fully_synced_peers
ms = "" if len(fully_synced_peers) == 1 else "s"
RNS.log(f"Found {len(fully_synced_peers)} fully synced peer{ms}, using as peer rotation pool basis", RNS.LOG_DEBUG)
culled_peers = []
waiting_peers = []
unresponsive_peers = []
for peer_id in peers:
peer = peers[peer_id]
if not peer_id in self.static_peers and peer.state == LXMPeer.IDLE:
if peer.alive:
if peer.offered == 0:
# Don't consider for unpeering until at
# least one message has been offered
pass
else:
waiting_peers.append(peer)
else:
unresponsive_peers.append(peer)
drop_pool = []
if len(unresponsive_peers) > 0:
drop_pool.extend(unresponsive_peers)
if not self.prioritise_rotating_unreachable_peers:
drop_pool.extend(waiting_peers)
else:
drop_pool.extend(waiting_peers)
if len(drop_pool) > 0:
drop_count = min(required_drops, len(drop_pool))
low_acceptance_rate_peers = sorted(
drop_pool,
key=lambda p: ( 0 if p.offered == 0 else (p.outgoing/p.offered) ),
reverse=False
)[0:drop_count]
dropped_peers = 0
for peer in low_acceptance_rate_peers:
ar = 0 if peer.offered == 0 else round((peer.outgoing/peer.offered)*100, 2)
if ar < LXMRouter.ROTATION_AR_MAX*100:
reachable_str = "reachable" if peer.alive else "unreachable"
RNS.log(f"Acceptance rate for {reachable_str} peer {RNS.prettyhexrep(peer.destination_hash)} was: {ar}% ({peer.outgoing}/{peer.offered}, {peer.unhandled_message_count} unhandled messages)", RNS.LOG_DEBUG)
self.unpeer(peer.destination_hash)
dropped_peers += 1
ms = "" if dropped_peers == 1 else "s"
RNS.log(f"Dropped {dropped_peers} low acceptance rate peer{ms} to increase peering headroom", RNS.LOG_DEBUG)
except Exception as e:
RNS.log(f"An error occurred during peer rotation: {e}", RNS.LOG_ERROR)
RNS.trace_exception(e)
def sync_peers(self):
culled_peers = []
waiting_peers = []
unresponsive_peers = []
peers = self.peers.copy()
for peer_id in peers:
peer = peers[peer_id]
if time.time() > peer.last_heard + LXMPeer.MAX_UNREACHABLE:
if not peer_id in self.static_peers: culled_peers.append(peer_id)
else:
if peer.state == LXMPeer.IDLE and len(peer.unhandled_messages) > 0:
if peer.alive: waiting_peers.append(peer)
else:
if hasattr(peer, "next_sync_attempt") and time.time() > peer.next_sync_attempt: unresponsive_peers.append(peer)
else: pass # RNS.log("Not adding peer "+str(peer)+" since it is in sync backoff", RNS.LOG_DEBUG)
peer_pool = []
if len(waiting_peers) > 0:
fastest_peers = sorted(
waiting_peers,
key=lambda p: p.sync_transfer_rate,
reverse=True
)[0:min(LXMRouter.FASTEST_N_RANDOM_POOL, len(waiting_peers))]
peer_pool.extend(fastest_peers)
unknown_speed_peers = [p for p in waiting_peers if p.sync_transfer_rate == 0]
if len(unknown_speed_peers) > 0:
peer_pool.extend(
unknown_speed_peers[
0:min(
len(unknown_speed_peers),
len(fastest_peers)
)]
)
RNS.log("Selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_DEBUG)
elif len(unresponsive_peers) > 0:
RNS.log("No active peers available, randomly selecting peer to sync from "+str(len(unresponsive_peers))+" unresponsive peers.", RNS.LOG_DEBUG)
peer_pool = unresponsive_peers
if len(peer_pool) > 0:
selected_index = random.randint(0,len(peer_pool)-1)
selected_peer = peer_pool[selected_index]
RNS.log("Selected waiting peer "+str(selected_index)+": "+RNS.prettyhexrep(selected_peer.destination.hash), RNS.LOG_DEBUG)
selected_peer.sync()
for peer_id in culled_peers:
RNS.log("Removing peer "+RNS.prettyhexrep(peer_id)+" due to excessive unreachability", RNS.LOG_WARNING)
try:
if peer_id in self.peers:
self.peers.pop(peer_id)
except Exception as e:
RNS.log("Error while removing peer "+RNS.prettyhexrep(peer_id)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
def propagation_link_established(self, link):
link.set_packet_callback(self.propagation_packet)
link.set_resource_strategy(RNS.Link.ACCEPT_APP)
link.set_resource_callback(self.propagation_resource_advertised)
link.set_resource_started_callback(self.resource_transfer_began)
link.set_resource_concluded_callback(self.propagation_resource_concluded)
self.active_propagation_links.append(link)
def propagation_resource_advertised(self, resource):
if self.from_static_only:
remote_identity = resource.link.get_remote_identity()
if remote_identity == None:
RNS.log(f"Rejecting propagation resource from unidentified peer", RNS.LOG_DEBUG)
return False
else:
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
remote_hash = remote_destination.hash
remote_str = RNS.prettyhexrep(remote_hash)
if not remote_hash in self.static_peers:
RNS.log(f"Rejecting propagation resource from {remote_str} not in static peers list", RNS.LOG_DEBUG)
return False
size = resource.get_data_size()
limit = self.propagation_per_sync_limit*1000
if limit != None and size > limit:
RNS.log(f"Rejecting {RNS.prettysize(size)} incoming propagation resource, since it exceeds the limit of {RNS.prettysize(limit)}", RNS.LOG_DEBUG)
return False
else:
return True
def propagation_packet(self, data, packet):
try:
if packet.destination_type != RNS.Destination.LINK:
pass
else:
data = msgpack.unpackb(data)
remote_timebase = data[0]
messages = data[1]
for lxmf_data in messages:
self.lxmf_propagation(lxmf_data)
self.client_propagation_messages_received += 1
packet.prove()
except Exception as e:
RNS.log("Exception occurred while parsing incoming LXMF propagation data.", RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
def offer_request(self, path, data, request_id, link_id, remote_identity, requested_at):
if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY
else:
if self.from_static_only:
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
remote_hash = remote_destination.hash
remote_str = RNS.prettyhexrep(remote_hash)
if not remote_hash in self.static_peers:
RNS.log(f"Rejecting propagation request from {remote_str} not in static peers list", RNS.LOG_DEBUG)
return LXMPeer.ERROR_NO_ACCESS
try:
transient_ids = data
wanted_ids = []
for transient_id in transient_ids:
if not transient_id in self.propagation_entries:
wanted_ids.append(transient_id)
if len(wanted_ids) == 0:
return False
elif len(wanted_ids) == len(transient_ids):
return True
else:
return wanted_ids
except Exception as e:
RNS.log("Error occurred while generating response for sync request, the contained exception was: "+str(e), RNS.LOG_DEBUG)
return None
def propagation_resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE:
# TODO: The peer this was received from should
# have the transient id added to its list of
# already handled messages.
try:
data = msgpack.unpackb(resource.data.read())
if type(data) == list and len(data) == 2 and type(data[0] == float) and type(data[1]) == list:
# This is a series of propagation messages from a peer or originator
remote_timebase = data[0]
remote_hash = None
remote_str = "unknown peer"
remote_identity = resource.link.get_remote_identity()
if remote_identity != None:
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
remote_hash = remote_destination.hash
remote_str = RNS.prettyhexrep(remote_hash)
if not remote_hash in self.peers:
if self.autopeer and RNS.Transport.hops_to(remote_hash) <= self.autopeer_maxdepth:
# TODO: Query cache for an announce and get propagation
# transfer limit from that. For now, initialise it to a
# sane default value, and wait for an announce to arrive
# that will update the peering config to the actual limit.
propagation_transfer_limit = LXMRouter.PROPAGATION_LIMIT//4
wanted_inbound_peers = None
self.peer(remote_hash, remote_timebase, propagation_transfer_limit, wanted_inbound_peers)
else:
remote_str = f"peer {remote_str}"
messages = data[1]
ms = "" if len(messages) == 1 else "s"
RNS.log(f"Received {len(messages)} message{ms} from {remote_str}", RNS.LOG_VERBOSE)
for lxmf_data in messages:
peer = None
transient_id = RNS.Identity.full_hash(lxmf_data)
if remote_hash != None and remote_hash in self.peers:
peer = self.peers[remote_hash]
peer.incoming += 1
peer.rx_bytes += len(lxmf_data)
else:
if remote_identity != None:
self.unpeered_propagation_incoming += 1
self.unpeered_propagation_rx_bytes += len(lxmf_data)
else:
self.client_propagation_messages_received += 1
self.lxmf_propagation(lxmf_data, from_peer=peer)
if peer != None:
peer.queue_handled_message(transient_id)
else:
RNS.log("Invalid data structure received at propagation destination, ignoring", RNS.LOG_DEBUG)
except Exception as e:
RNS.log("Error while unpacking received propagation resource", RNS.LOG_DEBUG)
RNS.trace_exception(e)
def enqueue_peer_distribution(self, transient_id, from_peer):
self.peer_distribution_queue.append([transient_id, from_peer])
def flush_peer_distribution_queue(self):
if len(self.peer_distribution_queue) > 0:
entries = []
while len(self.peer_distribution_queue) > 0:
entries.append(self.peer_distribution_queue.pop())
for peer_id in self.peers.copy():
if peer_id in self.peers:
peer = self.peers[peer_id]
for entry in entries:
transient_id = entry[0]
from_peer = entry[1]
if peer != from_peer:
peer.queue_unhandled_message(transient_id)
def lxmf_propagation(self, lxmf_data, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False, is_paper_message=False, from_peer=None):
no_stamp_enforcement = False
if is_paper_message:
no_stamp_enforcement = True
try:
if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD:
transient_id = RNS.Identity.full_hash(lxmf_data)
if (not transient_id in self.propagation_entries and not transient_id in self.locally_processed_transient_ids) or allow_duplicate == True:
received = time.time()
destination_hash = lxmf_data[:LXMessage.DESTINATION_LENGTH]
self.locally_processed_transient_ids[transient_id] = received
if destination_hash in self.delivery_destinations:
delivery_destination = self.delivery_destinations[destination_hash]
encrypted_lxmf_data = lxmf_data[LXMessage.DESTINATION_LENGTH:]
decrypted_lxmf_data = delivery_destination.decrypt(encrypted_lxmf_data)
if decrypted_lxmf_data != None:
delivery_data = lxmf_data[:LXMessage.DESTINATION_LENGTH]+decrypted_lxmf_data
self.lxmf_delivery(delivery_data, delivery_destination.type, ratchet_id=delivery_destination.latest_ratchet_id, method=LXMessage.PROPAGATED, no_stamp_enforcement=no_stamp_enforcement, allow_duplicate=allow_duplicate)
self.locally_delivered_transient_ids[transient_id] = time.time()
if signal_local_delivery != None:
return signal_local_delivery
else:
if self.propagation_node:
file_path = self.messagepath+"/"+RNS.hexrep(transient_id, delimit=False)+"_"+str(received)
msg_file = open(file_path, "wb")
msg_file.write(lxmf_data)
msg_file.close()
RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_EXTREME)
self.propagation_entries[transient_id] = [destination_hash, file_path, time.time(), len(lxmf_data), [], []]
self.enqueue_peer_distribution(transient_id, from_peer)
else:
# TODO: Add message to sneakernet queues when implemented
RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", but this instance is not hosting a propagation node, discarding message.", RNS.LOG_DEBUG)
return True
else:
if signal_duplicate != None:
return signal_duplicate
else:
return False
return False
except Exception as e:
RNS.log("Could not assemble propagated LXMF message from received data", RNS.LOG_DEBUG)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
RNS.trace_exception(e)
return False
def ingest_lxm_uri(self, uri, signal_local_delivery=None, signal_duplicate=None, allow_duplicate=False):
try:
if not uri.lower().startswith(LXMessage.URI_SCHEMA+"://"):
RNS.log("Cannot ingest LXM, invalid URI provided.", RNS.LOG_ERROR)
return False
else:
lxmf_data = base64.urlsafe_b64decode(uri.replace(LXMessage.URI_SCHEMA+"://", "").replace("/", "")+"==")
transient_id = RNS.Identity.full_hash(lxmf_data)
router_propagation_result = self.lxmf_propagation(lxmf_data, signal_local_delivery=signal_local_delivery, signal_duplicate=signal_duplicate, allow_duplicate=allow_duplicate, is_paper_message=True)
if router_propagation_result != False:
RNS.log("LXM with transient ID "+RNS.prettyhexrep(transient_id)+" was ingested.", RNS.LOG_DEBUG)
return router_propagation_result
else:
RNS.log("No valid LXM could be ingested from the provided URI", RNS.LOG_DEBUG)
return False
except Exception as e:
RNS.log("Error while decoding URI-encoded LXMF message. The contained exception was: "+str(e), RNS.LOG_ERROR)
return False
def fail_message(self, lxmessage):
RNS.log(str(lxmessage)+" failed to send", RNS.LOG_DEBUG)
if lxmessage in self.pending_outbound:
self.pending_outbound.remove(lxmessage)
self.failed_outbound.append(lxmessage)
if lxmessage.state != LXMessage.REJECTED:
lxmessage.state = LXMessage.FAILED
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
lxmessage.failed_callback(lxmessage)
def process_deferred_stamps(self):
if len(self.pending_deferred_stamps) > 0:
if self.stamp_gen_lock.locked():
return
else:
with self.stamp_gen_lock:
selected_lxm = None
selected_message_id = None
for message_id in self.pending_deferred_stamps:
lxmessage = self.pending_deferred_stamps[message_id]
if selected_lxm == None:
selected_lxm = lxmessage
selected_message_id = message_id
if selected_lxm != None:
if selected_lxm.state == LXMessage.CANCELLED:
RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_DEBUG)
selected_lxm.stamp_generation_failed = True
self.pending_deferred_stamps.pop(selected_message_id)
if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback):
selected_lxm.failed_callback(lxmessage)
return
RNS.log(f"Starting stamp generation for {selected_lxm}...", RNS.LOG_DEBUG)
generated_stamp = selected_lxm.get_stamp()
if generated_stamp:
selected_lxm.stamp = generated_stamp
selected_lxm.defer_stamp = False
selected_lxm.packed = None
selected_lxm.pack()
self.pending_deferred_stamps.pop(selected_message_id)
self.pending_outbound.append(selected_lxm)
RNS.log(f"Stamp generation completed for {selected_lxm}", RNS.LOG_DEBUG)
else:
if selected_lxm.state == LXMessage.CANCELLED:
RNS.log(f"Message cancelled during deferred stamp generation for {selected_lxm}.", RNS.LOG_DEBUG)
selected_lxm.stamp_generation_failed = True
self.pending_deferred_stamps.pop(selected_message_id)
if selected_lxm.failed_callback != None and callable(selected_lxm.failed_callback):
selected_lxm.failed_callback(lxmessage)
else:
RNS.log(f"Deferred stamp generation did not succeed. Failing {selected_lxm}.", RNS.LOG_ERROR)
selected_lxm.stamp_generation_failed = True
self.pending_deferred_stamps.pop(selected_message_id)
self.fail_message(selected_lxm)
def process_outbound(self, sender = None):
if self.processing_outbound:
return
for lxmessage in self.pending_outbound:
if lxmessage.state == LXMessage.DELIVERED:
RNS.log("Delivery has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
# Udate ticket delivery stats
if lxmessage.include_ticket and FIELD_TICKET in lxmessage.fields:
RNS.log(f"Updating latest ticket delivery for {RNS.prettyhexrep(lxmessage.destination_hash)}", RNS.LOG_DEBUG)
self.available_tickets["last_deliveries"][lxmessage.destination_hash] = time.time()
self.save_available_tickets()
# Prepare link for backchannel communications
delivery_destination_hash = lxmessage.get_destination().hash
if lxmessage.method == LXMessage.DIRECT and delivery_destination_hash in self.direct_links:
direct_link = self.direct_links[delivery_destination_hash]
if not hasattr(direct_link, "backchannel_identified") or direct_link.backchannel_identified == False:
if direct_link.initiator == True:
source_destination_hash = lxmessage.get_source().hash
if source_destination_hash in self.delivery_destinations:
backchannel_identity = self.delivery_destinations[source_destination_hash].identity
backchannel_desthash = RNS.Destination.hash_from_name_and_identity("lxmf.delivery", backchannel_identity)
direct_link.identify(backchannel_identity)
direct_link.backchannel_identified = True
self.delivery_link_established(direct_link)
RNS.log(f"Performed backchannel identification as {RNS.prettyhexrep(backchannel_desthash)} on {direct_link}", RNS.LOG_DEBUG)
elif lxmessage.method == LXMessage.PROPAGATED and lxmessage.state == LXMessage.SENT:
RNS.log("Propagation has occurred for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
elif lxmessage.state == LXMessage.CANCELLED:
RNS.log("Cancellation requested for "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
lxmessage.failed_callback(lxmessage)
elif lxmessage.state == LXMessage.REJECTED:
RNS.log("Receiver rejected "+str(lxmessage)+", removing from outbound queue", RNS.LOG_DEBUG)
self.pending_outbound.remove(lxmessage)
if lxmessage.failed_callback != None and callable(lxmessage.failed_callback):
lxmessage.failed_callback(lxmessage)
else:
RNS.log("Outbound processing for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
if lxmessage.progress == None or lxmessage.progress < 0.01: lxmessage.progress = 0.01
# Outbound handling for opportunistic messages
if lxmessage.method == LXMessage.OPPORTUNISTIC:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
if lxmessage.delivery_attempts >= LXMRouter.MAX_PATHLESS_TRIES and not RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log(f"Requesting path to {RNS.prettyhexrep(lxmessage.get_destination().hash)} after {lxmessage.delivery_attempts} pathless tries for {lxmessage}", RNS.LOG_DEBUG)
lxmessage.delivery_attempts += 1
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
elif lxmessage.delivery_attempts == LXMRouter.MAX_PATHLESS_TRIES+1 and RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log(f"Opportunistic delivery for {lxmessage} still unsuccessful after {lxmessage.delivery_attempts} attempts, trying to rediscover path to {RNS.prettyhexrep(lxmessage.get_destination().hash)}", RNS.LOG_DEBUG)
lxmessage.delivery_attempts += 1
RNS.Reticulum.get_instance().drop_path(lxmessage.get_destination().hash)
def rediscover_job():
time.sleep(0.5)
RNS.Transport.request_path(lxmessage.get_destination().hash)
threading.Thread(target=rediscover_job, daemon=True).start()
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
else:
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
RNS.log("Opportunistic delivery attempt "+str(lxmessage.delivery_attempts)+" for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
lxmessage.send()
else:
RNS.log("Max delivery attempts reached for oppertunistic "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)
# Outbound handling for messages transferred
# over a direct link to the final recipient
elif lxmessage.method == LXMessage.DIRECT:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
delivery_destination_hash = lxmessage.get_destination().hash
direct_link = None
if delivery_destination_hash in self.direct_links:
# An established direct link already exists to
# the destination, so we'll try to use it for
# delivering the message
direct_link = self.direct_links[delivery_destination_hash]
RNS.log(f"Using available direct link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
elif delivery_destination_hash in self.backchannel_links:
# An established backchannel link exists to
# the destination, so we'll try to use it for
# delivering the message
direct_link = self.backchannel_links[delivery_destination_hash]
RNS.log(f"Using available backchannel link {direct_link} to {RNS.prettyhexrep(delivery_destination_hash)}", RNS.LOG_DEBUG)
if direct_link != None:
if direct_link.status == RNS.Link.ACTIVE:
if lxmessage.progress == None or lxmessage.progress < 0.05:
lxmessage.progress = 0.05
if lxmessage.state != LXMessage.SENDING:
RNS.log("Starting transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" on link "+str(direct_link), RNS.LOG_DEBUG)
lxmessage.set_delivery_destination(direct_link)
lxmessage.send()
else:
if lxmessage.representation == LXMessage.RESOURCE:
RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
else:
RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
elif direct_link.status == RNS.Link.CLOSED:
if direct_link.activated_at != None:
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was closed unexpectedly, retrying path request...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
else:
if not hasattr(lxmessage, "path_request_retried"):
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated, retrying path request...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.path_request_retried = True
else:
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" was never activated", RNS.LOG_DEBUG)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.set_delivery_destination(None)
if delivery_destination_hash in self.direct_links:
self.direct_links.pop(delivery_destination_hash)
if delivery_destination_hash in self.backchannel_links:
self.backchannel_links.pop(delivery_destination_hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
else:
# Simply wait for the link to become active or close
RNS.log("The link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
else:
# No link exists, so we'll try to establish one, but
# only if we've never tried before, or the retry wait
# period has elapsed.
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS:
if RNS.Transport.has_path(lxmessage.get_destination().hash):
RNS.log("Establishing link to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
delivery_link = RNS.Link(lxmessage.get_destination())
delivery_link.set_link_established_callback(self.process_outbound)
self.direct_links[delivery_destination_hash] = delivery_link
lxmessage.progress = 0.03
else:
RNS.log("No path known for delivery attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+". Requesting path...", RNS.LOG_DEBUG)
RNS.Transport.request_path(lxmessage.get_destination().hash)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
lxmessage.progress = 0.01
else:
RNS.log("Max delivery attempts reached for direct "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)
# Outbound handling for messages transported via
# propagation to a LXMF router network.
elif lxmessage.method == LXMessage.PROPAGATED:
RNS.log("Attempting propagated delivery for "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
if self.outbound_propagation_node == None:
RNS.log("No outbound propagation node specified for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_ERROR)
self.fail_message(lxmessage)
else:
if lxmessage.delivery_attempts <= LXMRouter.MAX_DELIVERY_ATTEMPTS:
if self.outbound_propagation_link != None:
# A link already exists, so we'll try to use it
# to deliver the message
if self.outbound_propagation_link.status == RNS.Link.ACTIVE:
if lxmessage.state != LXMessage.SENDING:
RNS.log("Starting propagation transfer of "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash)+" via "+RNS.prettyhexrep(self.outbound_propagation_node), RNS.LOG_DEBUG)
lxmessage.set_delivery_destination(self.outbound_propagation_link)
lxmessage.send()
else:
if lxmessage.representation == LXMessage.RESOURCE:
RNS.log("The transfer of "+str(lxmessage)+" is in progress ("+str(round(lxmessage.progress*100, 1))+"%)", RNS.LOG_DEBUG)
else:
RNS.log("Waiting for proof for "+str(lxmessage)+" sent as link packet", RNS.LOG_DEBUG)
elif self.outbound_propagation_link.status == RNS.Link.CLOSED:
RNS.log("The link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" was closed", RNS.LOG_DEBUG)
self.outbound_propagation_link = None
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
else:
# Simply wait for the link to become
# active or close
RNS.log("The propagation link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" is pending, waiting for link to become active", RNS.LOG_DEBUG)
else:
# No link exists, so we'll try to establish one, but
# only if we've never tried before, or the retry wait
# period has elapsed.
if not hasattr(lxmessage, "next_delivery_attempt") or time.time() > lxmessage.next_delivery_attempt:
lxmessage.delivery_attempts += 1
lxmessage.next_delivery_attempt = time.time() + LXMRouter.DELIVERY_RETRY_WAIT
if lxmessage.delivery_attempts < LXMRouter.MAX_DELIVERY_ATTEMPTS:
if RNS.Transport.has_path(self.outbound_propagation_node):
RNS.log("Establishing link to "+RNS.prettyhexrep(self.outbound_propagation_node)+" for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
propagation_node_identity = RNS.Identity.recall(self.outbound_propagation_node)
propagation_node_destination = RNS.Destination(propagation_node_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
self.outbound_propagation_link = RNS.Link(propagation_node_destination, established_callback=self.process_outbound)
else:
RNS.log("No path known for propagation attempt "+str(lxmessage.delivery_attempts)+" to "+RNS.prettyhexrep(self.outbound_propagation_node)+". Requesting path...", RNS.LOG_DEBUG)
RNS.Transport.request_path(self.outbound_propagation_node)
lxmessage.next_delivery_attempt = time.time() + LXMRouter.PATH_REQUEST_WAIT
else:
RNS.log("Max delivery attempts reached for propagated "+str(lxmessage)+" to "+RNS.prettyhexrep(lxmessage.get_destination().hash), RNS.LOG_DEBUG)
self.fail_message(lxmessage)