mirror of
https://github.com/markqvist/Sideband.git
synced 2025-08-15 01:35:45 -04:00
Implemented bulk telemetry transfers
This commit is contained in:
parent
08e112f538
commit
26ebc80a22
4 changed files with 165 additions and 78 deletions
|
@ -747,21 +747,25 @@ class SidebandCore():
|
|||
RNS.log("Error while checking telemetry sending for "+RNS.prettyhexrep(context_dest)+": "+str(e), RNS.LOG_ERROR)
|
||||
return False
|
||||
|
||||
def requests_allowed_from(self, context_dest):
|
||||
def allow_request_from(self, context_dest):
|
||||
try:
|
||||
if self.config["telemetry_allow_requests_from_anyone"] == True:
|
||||
return True
|
||||
|
||||
if self.config["telemetry_allow_requests_from_trusted"] == True:
|
||||
existing_conv = self._db_conversation(context_dest)
|
||||
return existing_conv["trust"] == 1
|
||||
|
||||
return self.requests_allowed_from(context_dest)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while checking request permissions for "+RNS.prettyhexrep(context_dest)+": "+str(e), RNS.LOG_ERROR)
|
||||
return False
|
||||
|
||||
def requests_allowed_from(self, context_dest):
|
||||
try:
|
||||
existing_conv = self._db_conversation(context_dest)
|
||||
if existing_conv != None:
|
||||
if existing_conv["trust"] == 1:
|
||||
trusted = True
|
||||
else:
|
||||
trusted = False
|
||||
|
||||
if self.config["telemetry_allow_requests_from_trusted"] == True:
|
||||
return trusted
|
||||
|
||||
cd = existing_conv["data"]
|
||||
if cd != None and "allow_requests" in cd and cd["allow_requests"] == True:
|
||||
return True
|
||||
|
@ -977,15 +981,18 @@ class SidebandCore():
|
|||
return "not_sent"
|
||||
|
||||
|
||||
def send_latest_telemetry(self, to_addr=None, stream=None):
|
||||
def send_latest_telemetry(self, to_addr=None, stream=None, is_authorized_telemetry_request=False):
|
||||
if to_addr == None or to_addr == self.lxmf_destination.hash:
|
||||
return "no_address"
|
||||
else:
|
||||
if to_addr == self.config["telemetry_collector"]:
|
||||
is_authorized_telemetry_request = True
|
||||
|
||||
if self.getstate(f"telemetry.{RNS.hexrep(to_addr, delimit=False)}.update_sending") == True:
|
||||
RNS.log("Not sending new telemetry update, since an earlier transfer is already in progress", RNS.LOG_DEBUG)
|
||||
return "in_progress"
|
||||
|
||||
if self.latest_packed_telemetry != None and self.latest_telemetry != None:
|
||||
if (self.latest_packed_telemetry != None and self.latest_telemetry != None) or stream != None:
|
||||
dest_identity = RNS.Identity.recall(to_addr)
|
||||
|
||||
if dest_identity == None:
|
||||
|
@ -1002,7 +1009,7 @@ class SidebandCore():
|
|||
else:
|
||||
desired_method = LXMF.LXMessage.DIRECT
|
||||
|
||||
lxm_fields = self.get_message_fields(to_addr)
|
||||
lxm_fields = self.get_message_fields(to_addr, is_authorized_telemetry_request=is_authorized_telemetry_request)
|
||||
if stream != None and len(stream) > 0:
|
||||
lxm_fields[LXMF.FIELD_TELEMETRY_STREAM] = stream
|
||||
|
||||
|
@ -1041,7 +1048,7 @@ class SidebandCore():
|
|||
|
||||
else:
|
||||
RNS.log("A telemetry update was requested, but there was nothing to send.", RNS.LOG_WARNING)
|
||||
return "not_sent"
|
||||
return "nothing_to_send"
|
||||
|
||||
|
||||
def list_telemetry(self, context_dest = None, after = None, before = None, limit = None):
|
||||
|
@ -1559,10 +1566,11 @@ class SidebandCore():
|
|||
|
||||
return results
|
||||
|
||||
def _db_save_telemetry(self, context_dest, telemetry, physical_link = None, source_dest = None):
|
||||
def _db_save_telemetry(self, context_dest, telemetry, physical_link = None, source_dest = None, via = None):
|
||||
try:
|
||||
remote_telemeter = Telemeter.from_packed(telemetry)
|
||||
telemetry_timestamp = remote_telemeter.read_all()["time"]["utc"]
|
||||
read_telemetry = remote_telemeter.read_all()
|
||||
telemetry_timestamp = read_telemetry["time"]["utc"]
|
||||
|
||||
db = self.__db_connect()
|
||||
dbc = db.cursor()
|
||||
|
@ -1572,7 +1580,8 @@ class SidebandCore():
|
|||
result = dbc.fetchall()
|
||||
|
||||
if len(result) != 0:
|
||||
return
|
||||
RNS.log("Telemetry entry with source "+RNS.prettyhexrep(context_dest)+" and timestamp "+str(telemetry_timestamp)+" already exists, skipping save", RNS.LOG_DEBUG)
|
||||
return None
|
||||
|
||||
if physical_link != None and len(physical_link) != 0:
|
||||
remote_telemeter.synthesize("physical_link")
|
||||
|
@ -1602,6 +1611,20 @@ class SidebandCore():
|
|||
|
||||
remote_telemeter.sensors["received"].update_data()
|
||||
telemetry = remote_telemeter.packed()
|
||||
|
||||
if via != None:
|
||||
if not "received" in remote_telemeter.sensors:
|
||||
remote_telemeter.synthesize("received")
|
||||
|
||||
if "by" in remote_telemeter.sensors["received"].data:
|
||||
remote_telemeter.sensors["received"].by = remote_telemeter.sensors["received"].data["by"]
|
||||
if "distance" in remote_telemeter.sensors["received"].data:
|
||||
remote_telemeter.sensors["received"].geodesic_distance = remote_telemeter.sensors["received"].data["distance"]["geodesic"]
|
||||
remote_telemeter.sensors["received"].euclidian_distance = remote_telemeter.sensors["received"].data["distance"]["euclidian"]
|
||||
|
||||
remote_telemeter.sensors["received"].via = via
|
||||
remote_telemeter.sensors["received"].update_data()
|
||||
telemetry = remote_telemeter.packed()
|
||||
|
||||
query = "INSERT INTO telemetry (dest_context, ts, data) values (?, ?, ?)"
|
||||
data = (context_dest, telemetry_timestamp, telemetry)
|
||||
|
@ -1617,35 +1640,52 @@ class SidebandCore():
|
|||
|
||||
def _db_update_appearance(self, context_dest, timestamp, appearance):
|
||||
conv = self._db_conversation(context_dest)
|
||||
data_dict = conv["data"]
|
||||
if data_dict == None:
|
||||
data_dict = {}
|
||||
|
||||
if not "appearance" in data_dict:
|
||||
data_dict["appearance"] = None
|
||||
if conv == None:
|
||||
ae = [appearance, int(time.time())]
|
||||
# TODO: Clean out these temporary values at some interval.
|
||||
# Probably expire after 14 days or so.
|
||||
self.setpersistent("temp.peer_appearance."+RNS.hexrep(context_dest, delimit=False), ae)
|
||||
else:
|
||||
data_dict = conv["data"]
|
||||
if data_dict == None:
|
||||
data_dict = {}
|
||||
|
||||
if data_dict["appearance"] != appearance:
|
||||
data_dict["appearance"] = appearance
|
||||
packed_dict = msgpack.packb(data_dict)
|
||||
|
||||
db = self.__db_connect()
|
||||
dbc = db.cursor()
|
||||
|
||||
query = "UPDATE conv set data = ? where dest_context = ?"
|
||||
data = (packed_dict, context_dest)
|
||||
dbc.execute(query, data)
|
||||
result = dbc.fetchall()
|
||||
db.commit()
|
||||
if not "appearance" in data_dict:
|
||||
data_dict["appearance"] = None
|
||||
|
||||
if data_dict["appearance"] != appearance:
|
||||
data_dict["appearance"] = appearance
|
||||
packed_dict = msgpack.packb(data_dict)
|
||||
|
||||
db = self.__db_connect()
|
||||
dbc = db.cursor()
|
||||
|
||||
query = "UPDATE conv set data = ? where dest_context = ?"
|
||||
data = (packed_dict, context_dest)
|
||||
dbc.execute(query, data)
|
||||
result = dbc.fetchall()
|
||||
db.commit()
|
||||
|
||||
def _db_get_appearance(self, context_dest, conv = None):
|
||||
if context_dest == self.lxmf_destination.hash:
|
||||
return [self.config["telemetry_icon"], self.config["telemetry_fg"], self.config["telemetry_bg"]]
|
||||
else:
|
||||
if conv == None:
|
||||
conv = self._db_conversation(context_dest)
|
||||
|
||||
if conv != None and "data" in conv:
|
||||
data_dict = None
|
||||
if conv != None:
|
||||
data_dict = conv["data"]
|
||||
|
||||
else:
|
||||
conv = self._db_conversation(context_dest)
|
||||
if conv != None:
|
||||
data_dict = conv["data"]
|
||||
else:
|
||||
data_dict = {}
|
||||
apd = self.getpersistent("temp.peer_appearance."+RNS.hexrep(context_dest, delimit=False))
|
||||
if apd != None:
|
||||
data_dict["appearance"] = apd
|
||||
|
||||
if data_dict != None:
|
||||
try:
|
||||
if data_dict != None and "appearance" in data_dict:
|
||||
def htf(cbytes):
|
||||
|
@ -2029,7 +2069,9 @@ class SidebandCore():
|
|||
if not originator and lxm.fields != None:
|
||||
if self.config["telemetry_receive_trusted_only"] == False or (self.config["telemetry_receive_trusted_only"] == True and self.is_trusted(context_dest)):
|
||||
if LXMF.FIELD_ICON_APPEARANCE in lxm.fields:
|
||||
self._db_update_appearance(context_dest, lxm.timestamp, lxm.fields[LXMF.FIELD_ICON_APPEARANCE])
|
||||
peer_appearance = lxm.fields[LXMF.FIELD_ICON_APPEARANCE]
|
||||
if peer_appearance != None and len(peer_appearance) > 0 and len(peer_appearance) < 96:
|
||||
self._db_update_appearance(context_dest, lxm.timestamp, peer_appearance)
|
||||
|
||||
if LXMF.FIELD_TELEMETRY in lxm.fields:
|
||||
physical_link = {}
|
||||
|
@ -2040,9 +2082,16 @@ class SidebandCore():
|
|||
packed_telemetry = self._db_save_telemetry(context_dest, lxm.fields[LXMF.FIELD_TELEMETRY], physical_link=physical_link, source_dest=context_dest)
|
||||
|
||||
if LXMF.FIELD_TELEMETRY_STREAM in lxm.fields:
|
||||
for telemetry_entry in lxm.fields[LXMF.FIELD_TELEMETRY_STREAM]:
|
||||
# TODO: Implement
|
||||
RNS.log("TODO: Save this telemetry stream entry: "+str(telemetry_entry), RNS.LOG_WARNING)
|
||||
if lxm.fields[LXMF.FIELD_TELEMETRY_STREAM] != None and len(lxm.fields[LXMF.FIELD_TELEMETRY_STREAM]) > 0:
|
||||
for telemetry_entry in lxm.fields[LXMF.FIELD_TELEMETRY_STREAM]:
|
||||
tsource = telemetry_entry[0]
|
||||
ttstemp = telemetry_entry[1]
|
||||
tpacked = telemetry_entry[2]
|
||||
if self._db_save_telemetry(tsource, tpacked, via = context_dest):
|
||||
RNS.log("Saved telemetry stream entry from "+RNS.prettyhexrep(tsource), RNS.LOG_WARNING)
|
||||
|
||||
else:
|
||||
RNS.log("Received telemetry stream field with no data: "+str(lxm.fields[LXMF.FIELD_TELEMETRY_STREAM]), RNS.LOG_DEBUG)
|
||||
|
||||
if own_command or len(lxm.content) != 0 or len(lxm.title) != 0:
|
||||
db = self.__db_connect()
|
||||
|
@ -2482,7 +2531,8 @@ class SidebandCore():
|
|||
self.pending_telemetry_send = True
|
||||
self.pending_telemetry_send_try += 1
|
||||
if self.config["telemetry_send_all_to_collector"]:
|
||||
self.create_telemetry_collector_response(to_addr=collector_address)
|
||||
last_timebase = (self.getpersistent(f"telemetry.{RNS.hexrep(collector_address, delimit=False)}.last_send_success_timebase") or 0)
|
||||
self.create_telemetry_collector_response(to_addr=collector_address, timebase=last_timebase, is_authorized_telemetry_request=True)
|
||||
else:
|
||||
self.send_latest_telemetry(to_addr=collector_address)
|
||||
else:
|
||||
|
@ -2996,9 +3046,9 @@ class SidebandCore():
|
|||
except Exception as e:
|
||||
RNS.log("Error while setting last successul telemetry timebase for "+RNS.prettyhexrep(message.destination_hash), RNS.LOG_DEBUG)
|
||||
|
||||
def get_message_fields(self, context_dest, telemetry_update=False):
|
||||
def get_message_fields(self, context_dest, telemetry_update=False, is_authorized_telemetry_request=False):
|
||||
fields = {}
|
||||
send_telemetry = (telemetry_update == True) or self.should_send_telemetry(context_dest)
|
||||
send_telemetry = (telemetry_update == True) or (self.should_send_telemetry(context_dest) or is_authorized_telemetry_request)
|
||||
send_appearance = self.config["telemetry_send_appearance"] or send_telemetry
|
||||
|
||||
if send_telemetry and self.latest_packed_telemetry != None:
|
||||
|
@ -3379,7 +3429,7 @@ class SidebandCore():
|
|||
return
|
||||
|
||||
if message.signature_validated and LXMF.FIELD_COMMANDS in message.fields:
|
||||
if self.requests_allowed_from(context_dest):
|
||||
if self.allow_request_from(context_dest):
|
||||
commands = message.fields[LXMF.FIELD_COMMANDS]
|
||||
self.handle_commands(commands, message)
|
||||
else:
|
||||
|
@ -3391,7 +3441,7 @@ class SidebandCore():
|
|||
self.lxm_ingest(message)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("Error while ingesting LXMF message "+RNS.prettyhexrep(message.hash)+" to database: "+str(e))
|
||||
RNS.log("Error while ingesting LXMF message "+RNS.prettyhexrep(message.hash)+" to database: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
def handle_commands(self, commands, message):
|
||||
try:
|
||||
|
@ -3403,7 +3453,7 @@ class SidebandCore():
|
|||
RNS.log("Handling telemetry request with timebase "+str(timebase), RNS.LOG_DEBUG)
|
||||
if self.config["telemetry_collector_enabled"]:
|
||||
RNS.log(f"Collector requests enabled, returning complete telemetry response for all known objects since {timebase}", RNS.LOG_DEBUG)
|
||||
self.create_telemetry_collector_response(to_addr=context_dest, timebase=timebase)
|
||||
self.create_telemetry_collector_response(to_addr=context_dest, timebase=timebase, is_authorized_telemetry_request=True)
|
||||
else:
|
||||
RNS.log("Responding with own latest telemetry", RNS.LOG_DEBUG)
|
||||
self.send_latest_telemetry(to_addr=context_dest)
|
||||
|
@ -3436,25 +3486,33 @@ class SidebandCore():
|
|||
except Exception as e:
|
||||
RNS.log("Error while handling commands: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
def create_telemetry_collector_response(self, to_addr, timebase):
|
||||
sources = {}
|
||||
def create_telemetry_collector_response(self, to_addr, timebase, is_authorized_telemetry_request=False):
|
||||
added_sources = {}
|
||||
sources = self.list_telemetry(after=timebase)
|
||||
only_latest = self.config["telemetry_requests_only_send_latest"]
|
||||
|
||||
elements = 0; added = 0
|
||||
telemetry_stream = []
|
||||
for source in sources:
|
||||
if source != to_addr:
|
||||
for entry in sources[source]:
|
||||
elements += 1
|
||||
timestamp = entry[0]; packed_telemetry = entry[1]
|
||||
te = [source, timestamp, packed_telemetry]
|
||||
if only_latest:
|
||||
if not source in sources:
|
||||
sources[source] = True
|
||||
if not source in added_sources:
|
||||
added_sources[source] = True
|
||||
telemetry_stream.append(te)
|
||||
added += 1
|
||||
else:
|
||||
telemetry_stream.append(te)
|
||||
added += 1
|
||||
|
||||
self.send_latest_telemetry(to_addr=to_addr, stream=telemetry_stream)
|
||||
return self.send_latest_telemetry(
|
||||
to_addr=to_addr,
|
||||
stream=telemetry_stream,
|
||||
is_authorized_telemetry_request=is_authorized_telemetry_request
|
||||
)
|
||||
|
||||
|
||||
def get_display_name_bytes(self):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue