Implemented advanced telemetry configuration

This commit is contained in:
Mark Qvist 2023-10-30 13:45:58 +01:00
parent a78a773886
commit ab15093ec8
7 changed files with 331 additions and 168 deletions

View file

@ -527,6 +527,7 @@ class SidebandCore():
self.config["telemetry_display_trusted_only"] = False
if not "telemetry_receive_trusted_only" in self.config:
self.config["telemetry_receive_trusted_only"] = False
if not "telemetry_send_all_to_collector" in self.config:
self.config["telemetry_send_all_to_collector"] = False
if not "telemetry_use_propagation_only" in self.config:
@ -535,6 +536,10 @@ class SidebandCore():
self.config["telemetry_try_propagation_on_fail"] = True
if not "telemetry_requests_only_send_latest" in self.config:
self.config["telemetry_requests_only_send_latest"] = True
if not "telemetry_allow_requests_from_trusted" in self.config:
self.config["telemetry_allow_requests_from_trusted"] = False
if not "telemetry_allow_requests_from_anyone" in self.config:
self.config["telemetry_allow_requests_from_anyone"] = False
if not "telemetry_s_location" in self.config:
self.config["telemetry_s_location"] = False
@ -740,8 +745,19 @@ class SidebandCore():
def requests_allowed_from(self, context_dest):
try:
if self.config["telemetry_allow_requests_from_anyone"] == True:
return True
existing_conv = self._db_conversation(context_dest)
if existing_conv != None:
if existing_conv["trust"] == 1:
trusted = True
else:
trusted = False
if self.config["telemetry_allow_requests_from_trusted"] == True:
return trusted
cd = existing_conv["data"]
if cd != None and "allow_requests" in cd and cd["allow_requests"] == True:
return True
@ -908,114 +924,120 @@ class SidebandCore():
def request_latest_telemetry(self, from_addr=None):
if self.getstate(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.request_sending") == True:
RNS.log("Not sending new telemetry request, since an earlier transfer is already in progress", RNS.LOG_DEBUG)
return "in_progress"
if from_addr == None:
return "no_address"
else:
if self.getstate(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.request_sending") == True:
RNS.log("Not sending new telemetry request, since an earlier transfer is already in progress", RNS.LOG_DEBUG)
return "in_progress"
if from_addr != None:
dest_identity = RNS.Identity.recall(from_addr)
if dest_identity == None:
RNS.log("The identity for "+RNS.prettyhexrep(from_addr)+" could not be recalled. Requesting identity from network...", RNS.LOG_DEBUG)
RNS.Transport.request_path(from_addr)
return "destination_unknown"
if from_addr != None:
dest_identity = RNS.Identity.recall(from_addr)
if dest_identity == None:
RNS.log("The identity for "+RNS.prettyhexrep(from_addr)+" could not be recalled. Requesting identity from network...", RNS.LOG_DEBUG)
RNS.Transport.request_path(from_addr)
return "destination_unknown"
else:
now = time.time()
dest = RNS.Destination(dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
source = self.lxmf_destination
if self.config["telemetry_use_propagation_only"] == True:
desired_method = LXMF.LXMessage.PROPAGATED
else:
desired_method = LXMF.LXMessage.DIRECT
request_timebase = self.getpersistent(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.timebase") or now - self.telemetry_request_max_history
lxm_fields = { LXMF.FIELD_COMMANDS: [
{Commands.TELEMETRY_REQUEST: request_timebase},
]}
lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields)
lxm.request_timebase = request_timebase
lxm.register_delivery_callback(self.telemetry_request_finished)
lxm.register_failed_callback(self.telemetry_request_finished)
if self.message_router.get_outbound_propagation_node() != None:
if self.config["telemetry_try_propagation_on_fail"]:
lxm.try_propagation_on_fail = True
RNS.log(f"Sending telemetry request with timebase {request_timebase}", RNS.LOG_DEBUG)
self.setpersistent(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.last_request_attempt", time.time())
self.setstate(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.request_sending", True)
self.message_router.handle_outbound(lxm)
return "sent"
else:
now = time.time()
dest = RNS.Destination(dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
source = self.lxmf_destination
if self.config["telemetry_use_propagation_only"] == True:
desired_method = LXMF.LXMessage.PROPAGATED
else:
desired_method = LXMF.LXMessage.DIRECT
request_timebase = self.getpersistent(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.timebase") or now - self.telemetry_request_max_history
lxm_fields = { LXMF.FIELD_COMMANDS: [
{Commands.TELEMETRY_REQUEST: request_timebase},
]}
lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields)
lxm.request_timebase = request_timebase
lxm.register_delivery_callback(self.telemetry_request_finished)
lxm.register_failed_callback(self.telemetry_request_finished)
if self.message_router.get_outbound_propagation_node() != None:
if self.config["telemetry_try_propagation_on_fail"]:
lxm.try_propagation_on_fail = True
RNS.log(f"Sending telemetry request with timebase {request_timebase}", RNS.LOG_DEBUG)
self.setpersistent(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.last_request_attempt", time.time())
self.setstate(f"telemetry.{RNS.hexrep(from_addr, delimit=False)}.request_sending", True)
self.message_router.handle_outbound(lxm)
return "sent"
else:
return "not_sent"
return "not_sent"
def send_latest_telemetry(self, to_addr=None, stream=None):
if self.getstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending") == True:
RNS.log("Not sending new telemetry update, since an earlier transfer is already in progress", RNS.LOG_DEBUG)
return "in_progress"
if to_addr == None:
return "no_address"
else:
if self.getstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending") == True:
RNS.log("Not sending new telemetry update, since an earlier transfer is already in progress", RNS.LOG_DEBUG)
return "in_progress"
if to_addr != None and self.latest_packed_telemetry != None and self.latest_telemetry != None:
dest_identity = RNS.Identity.recall(to_addr)
if dest_identity == None:
RNS.log("The identity for "+RNS.prettyhexrep(to_addr)+" could not be recalled. Requesting identity from network...", RNS.LOG_DEBUG)
RNS.Transport.request_path(to_addr)
return "destination_unknown"
if self.latest_packed_telemetry != None and self.latest_telemetry != None:
dest_identity = RNS.Identity.recall(to_addr)
if dest_identity == None:
RNS.log("The identity for "+RNS.prettyhexrep(to_addr)+" could not be recalled. Requesting identity from network...", RNS.LOG_DEBUG)
RNS.Transport.request_path(to_addr)
return "destination_unknown"
else:
dest = RNS.Destination(dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
source = self.lxmf_destination
if self.config["telemetry_use_propagation_only"] == True:
desired_method = LXMF.LXMessage.PROPAGATED
else:
desired_method = LXMF.LXMessage.DIRECT
lxm_fields = self.get_message_fields(to_addr)
if stream != None and len(stream) > 0:
lxm_fields[LXMF.FIELD_TELEMETRY_STREAM] = stream
if lxm_fields != None and (LXMF.FIELD_TELEMETRY in lxm_fields or LXMF.FIELD_TELEMETRY_STREAM in lxm_fields):
if LXMF.FIELD_TELEMETRY in lxm_fields:
telemeter = Telemeter.from_packed(lxm_fields[LXMF.FIELD_TELEMETRY])
telemetry_timebase = telemeter.read_all()["time"]["utc"]
elif LXMF.FIELD_TELEMETRY_STREAM in lxm_fields:
telemetry_timebase = 0
for te in lxm_fields[LXMF.FIELD_TELEMETRY_STREAM]:
ts = te[1]
telemetry_timebase = max(telemetry_timebase, ts)
if telemetry_timebase > (self.getpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_success_timebase") or 0):
lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields)
lxm.telemetry_timebase = telemetry_timebase
lxm.register_delivery_callback(self.outbound_telemetry_finished)
lxm.register_failed_callback(self.outbound_telemetry_finished)
if self.message_router.get_outbound_propagation_node() != None:
if self.config["telemetry_try_propagation_on_fail"]:
lxm.try_propagation_on_fail = True
RNS.log(f"Sending telemetry update with timebase {telemetry_timebase}", RNS.LOG_DEBUG)
self.setpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_attempt", time.time())
self.setstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending", True)
self.message_router.handle_outbound(lxm)
return "sent"
else:
RNS.log(f"Telemetry update with timebase {telemetry_timebase} was already successfully sent", RNS.LOG_DEBUG)
return "already_sent"
else:
return "not_sent"
else:
dest = RNS.Destination(dest_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "lxmf", "delivery")
source = self.lxmf_destination
if self.config["telemetry_use_propagation_only"] == True:
desired_method = LXMF.LXMessage.PROPAGATED
else:
desired_method = LXMF.LXMessage.DIRECT
lxm_fields = self.get_message_fields(to_addr)
if stream != None and len(stream) > 0:
lxm_fields[LXMF.FIELD_TELEMETRY_STREAM] = stream
if lxm_fields != None and (LXMF.FIELD_TELEMETRY in lxm_fields or LXMF.FIELD_TELEMETRY_STREAM in lxm_fields):
if LXMF.FIELD_TELEMETRY in lxm_fields:
telemeter = Telemeter.from_packed(lxm_fields[LXMF.FIELD_TELEMETRY])
telemetry_timebase = telemeter.read_all()["time"]["utc"]
elif LXMF.FIELD_TELEMETRY_STREAM in lxm_fields:
telemetry_timebase = 0
for te in lxm_fields[LXMF.FIELD_TELEMETRY_STREAM]:
ts = te[1]
telemetry_timebase = max(telemetry_timebase, ts)
if telemetry_timebase > (self.getpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_success_timebase") or 0):
lxm = LXMF.LXMessage(dest, source, "", desired_method=desired_method, fields = lxm_fields)
lxm.telemetry_timebase = telemetry_timebase
lxm.register_delivery_callback(self.outbound_telemetry_finished)
lxm.register_failed_callback(self.outbound_telemetry_finished)
if self.message_router.get_outbound_propagation_node() != None:
if self.config["telemetry_try_propagation_on_fail"]:
lxm.try_propagation_on_fail = True
RNS.log(f"Sending telemetry update with timebase {telemetry_timebase}", RNS.LOG_DEBUG)
self.setpersistent(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.last_send_attempt", time.time())
self.setstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending", True)
self.message_router.handle_outbound(lxm)
return "sent"
else:
RNS.log(f"Telemetry update with timebase {telemetry_timebase} was already successfully sent", RNS.LOG_DEBUG)
return "already_sent"
else:
return "not_sent"
else:
RNS.log("A telemetry update was requested, but there was nothing to send.", RNS.LOG_WARNING)
return "not_sent"
RNS.log("A telemetry update was requested, but there was nothing to send.", RNS.LOG_WARNING)
return "not_sent"
def list_telemetry(self, context_dest = None, after = None, before = None, limit = None):
@ -2931,7 +2953,10 @@ class SidebandCore():
else:
self.set_active_propagation_node(None)
def message_notification(self, message):
def message_notification_no_display(self, message):
self.message_notification(message, no_display=True)
def message_notification(self, message, no_display=False):
if message.state == LXMF.LXMessage.FAILED and hasattr(message, "try_propagation_on_fail") and message.try_propagation_on_fail:
RNS.log("Direct delivery of "+str(message)+" failed. Retrying as propagated message.", RNS.LOG_VERBOSE)
message.try_propagation_on_fail = None
@ -2942,7 +2967,8 @@ class SidebandCore():
self._db_message_set_method(message.hash, LXMF.LXMessage.PROPAGATED)
self.message_router.handle_outbound(message)
else:
self.lxm_ingest(message, originator=True)
if not no_display:
self.lxm_ingest(message, originator=True)
def get_message_fields(self, context_dest, telemetry_update=False):
fields = {}
@ -2987,7 +3013,7 @@ class SidebandCore():
RNS.log("Error while creating paper message: "+str(e), RNS.LOG_ERROR)
return False
def send_message(self, content, destination_hash, propagation, skip_fields=False):
def send_message(self, content, destination_hash, propagation, skip_fields=False, no_display=False):
try:
if content == "":
raise ValueError("Message content cannot be empty")
@ -3007,15 +3033,21 @@ class SidebandCore():
fields = self.get_message_fields(destination_hash)
lxm = LXMF.LXMessage(dest, source, content, title="", desired_method=desired_method, fields = fields)
lxm.register_delivery_callback(self.message_notification)
lxm.register_failed_callback(self.message_notification)
if not no_display:
lxm.register_delivery_callback(self.message_notification)
lxm.register_failed_callback(self.message_notification)
else:
lxm.register_delivery_callback(self.message_notification_no_display)
lxm.register_failed_callback(self.message_notification_no_display)
if self.message_router.get_outbound_propagation_node() != None:
if self.config["lxmf_try_propagation_on_fail"]:
lxm.try_propagation_on_fail = True
self.message_router.handle_outbound(lxm)
self.lxm_ingest(lxm, originator=True)
if not no_display:
self.lxm_ingest(lxm, originator=True)
return True
@ -3035,6 +3067,8 @@ class SidebandCore():
commands.append({Commands.ECHO: echo_content})
elif content.startswith("sig"):
commands.append({Commands.SIGNAL_REPORT: True})
elif content.startswith("ping"):
commands.append({Commands.PING: True})
if len(commands) == 0:
return False
@ -3307,6 +3341,10 @@ class SidebandCore():
if self.requests_allowed_from(context_dest):
commands = message.fields[LXMF.FIELD_COMMANDS]
self.handle_commands(commands, message)
else:
# TODO: Add these event to built-in log/event viewer
# when it is implemented.
RNS.log("Unauthorized command received from "+RNS.prettyhexrep(context_dest), RNS.LOG_WARNING)
else:
self.lxm_ingest(message)
@ -3324,10 +3362,14 @@ class SidebandCore():
RNS.log("Handling telemetry request with timebase "+str(timebase), RNS.LOG_DEBUG)
self.create_telemetry_response(to_addr=context_dest, timebase=timebase)
elif Commands.PING in command:
RNS.log("Handling ping request", RNS.LOG_DEBUG)
self.send_message("Ping reply", context_dest, False, skip_fields=True, no_display=True)
elif Commands.ECHO in command:
msg_content = "Echo reply: "+command[Commands.ECHO].decode("utf-8")
RNS.log("Handling echo request", RNS.LOG_DEBUG)
self.send_message(msg_content, context_dest, False, skip_fields=True)
self.send_message(msg_content, context_dest, False, skip_fields=True, no_display=True)
elif Commands.SIGNAL_REPORT in command:
RNS.log("Handling signal report", RNS.LOG_DEBUG)
@ -3343,7 +3385,7 @@ class SidebandCore():
else:
phy_str = "No reception info available"
self.send_message(phy_str, context_dest, False, skip_fields=True)
self.send_message(phy_str, context_dest, False, skip_fields=True, no_display=True)
except Exception as e:
RNS.log("Error while handling commands: "+str(e), RNS.LOG_ERROR)