mirror of
https://github.com/markqvist/LXMF.git
synced 2025-11-19 23:13:04 -05:00
Added metadate structure to propagation node announces
This commit is contained in:
parent
aca5bf9c14
commit
d8b25e092f
4 changed files with 37 additions and 20 deletions
|
|
@ -45,12 +45,13 @@ class LXMFPropagationAnnounceHandler:
|
||||||
if self.lxmrouter.propagation_node:
|
if self.lxmrouter.propagation_node:
|
||||||
data = msgpack.unpackb(app_data)
|
data = msgpack.unpackb(app_data)
|
||||||
if pn_announce_data_is_valid(data):
|
if pn_announce_data_is_valid(data):
|
||||||
pn_active = data[0]
|
metadata = data[0]
|
||||||
node_timebase = int(data[1])
|
propagation_enabled = data[1]
|
||||||
propagation_transfer_limit = int(data[2])
|
node_timebase = int(data[2])
|
||||||
propagation_sync_limit = int(data[3])
|
propagation_transfer_limit = int(data[3])
|
||||||
propagation_stamp_cost = int(data[4][0])
|
propagation_sync_limit = int(data[4])
|
||||||
propagation_stamp_cost_flexibility = int(data[4][1])
|
propagation_stamp_cost = int(data[5][0])
|
||||||
|
propagation_stamp_cost_flexibility = int(data[5][1])
|
||||||
|
|
||||||
if destination_hash in self.lxmrouter.static_peers:
|
if destination_hash in self.lxmrouter.static_peers:
|
||||||
self.lxmrouter.peer(destination_hash=destination_hash,
|
self.lxmrouter.peer(destination_hash=destination_hash,
|
||||||
|
|
@ -58,20 +59,22 @@ class LXMFPropagationAnnounceHandler:
|
||||||
propagation_transfer_limit=propagation_transfer_limit,
|
propagation_transfer_limit=propagation_transfer_limit,
|
||||||
propagation_sync_limit=propagation_sync_limit,
|
propagation_sync_limit=propagation_sync_limit,
|
||||||
propagation_stamp_cost=propagation_stamp_cost,
|
propagation_stamp_cost=propagation_stamp_cost,
|
||||||
propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility)
|
propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility,
|
||||||
|
metadata=metadata)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if self.lxmrouter.autopeer:
|
if self.lxmrouter.autopeer:
|
||||||
if pn_active == True:
|
if propagation_enabled == True:
|
||||||
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
|
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
|
||||||
self.lxmrouter.peer(destination_hash=destination_hash,
|
self.lxmrouter.peer(destination_hash=destination_hash,
|
||||||
timestamp=node_timebase,
|
timestamp=node_timebase,
|
||||||
propagation_transfer_limit=propagation_transfer_limit,
|
propagation_transfer_limit=propagation_transfer_limit,
|
||||||
propagation_sync_limit=propagation_sync_limit,
|
propagation_sync_limit=propagation_sync_limit,
|
||||||
propagation_stamp_cost=propagation_stamp_cost,
|
propagation_stamp_cost=propagation_stamp_cost,
|
||||||
propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility)
|
propagation_stamp_cost_flexibility=propagation_stamp_cost_flexibility,
|
||||||
|
metadata=metadata)
|
||||||
|
|
||||||
elif pn_active == False:
|
elif propagation_enabled == False:
|
||||||
self.lxmrouter.unpeer(destination_hash, node_timebase)
|
self.lxmrouter.unpeer(destination_hash, node_timebase)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
||||||
26
LXMF/LXMF.py
26
LXMF/LXMF.py
|
|
@ -91,6 +91,17 @@ RENDERER_MICRON = 0x01
|
||||||
RENDERER_MARKDOWN = 0x02
|
RENDERER_MARKDOWN = 0x02
|
||||||
RENDERER_BBCODE = 0x03
|
RENDERER_BBCODE = 0x03
|
||||||
|
|
||||||
|
# Optional propagation node metadata fields. These
|
||||||
|
# fields may be highly unstable in allocation and
|
||||||
|
# availability until the version 1.0.0 release, so use
|
||||||
|
# at your own risk until then, and expect changes!
|
||||||
|
PN_META_VERSION = 0x00
|
||||||
|
PN_META_NAME = 0x01
|
||||||
|
PN_META_SYNC_STRATUM = 0x02
|
||||||
|
PN_META_SYNC_THROTTLE = 0x03
|
||||||
|
PN_META_AUTH_BAND = 0x04
|
||||||
|
PN_META_UTIL_PRESSURE = 0x05
|
||||||
|
|
||||||
##########################################################
|
##########################################################
|
||||||
# The following helper functions makes it easier to #
|
# The following helper functions makes it easier to #
|
||||||
# handle and operate on LXMF data in client programs #
|
# handle and operate on LXMF data in client programs #
|
||||||
|
|
@ -138,19 +149,20 @@ def stamp_cost_from_app_data(app_data=None):
|
||||||
def pn_announce_data_is_valid(data):
|
def pn_announce_data_is_valid(data):
|
||||||
try:
|
try:
|
||||||
if type(data) == bytes: data = msgpack.unpackb(data)
|
if type(data) == bytes: data = msgpack.unpackb(data)
|
||||||
if len(data) < 5: raise ValueError("Invalid announce data: Insufficient peer data")
|
if len(data) < 6: raise ValueError("Invalid announce data: Insufficient peer data")
|
||||||
else:
|
else:
|
||||||
if data[0] != True and data[0] != False: raise ValueError("Invalid announce data: Indeterminate propagation node status")
|
if type(data[0]) != dict: raise ValueError("Invalid announce data: Could not decode peer metadata")
|
||||||
try: int(data[1])
|
if data[1] != True and data[1] != False: raise ValueError("Invalid announce data: Indeterminate propagation node status")
|
||||||
except: raise ValueError("Invalid announce data: Could not decode peer timebase")
|
|
||||||
try: int(data[2])
|
try: int(data[2])
|
||||||
except: raise ValueError("Invalid announce data: Could not decode peer propagation transfer limit")
|
except: raise ValueError("Invalid announce data: Could not decode peer timebase")
|
||||||
try: int(data[3])
|
try: int(data[3])
|
||||||
|
except: raise ValueError("Invalid announce data: Could not decode peer propagation transfer limit")
|
||||||
|
try: int(data[4])
|
||||||
except: raise ValueError("Invalid announce data: Could not decode peer propagation sync limit")
|
except: raise ValueError("Invalid announce data: Could not decode peer propagation sync limit")
|
||||||
if type(data[4]) != list: raise ValueError("Invalid announce data: Could not decode peer stamp costs")
|
if type(data[4]) != list: raise ValueError("Invalid announce data: Could not decode peer stamp costs")
|
||||||
try: int(data[4][0])
|
try: int(data[5][0])
|
||||||
except: raise ValueError("Invalid announce data: Could not decode peer target stamp cost")
|
except: raise ValueError("Invalid announce data: Could not decode peer target stamp cost")
|
||||||
try: int(data[4][1])
|
try: int(data[5][1])
|
||||||
except: raise ValueError("Invalid announce data: Could not decode peer stamp cost flexibility")
|
except: raise ValueError("Invalid announce data: Could not decode peer stamp cost flexibility")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|
|
||||||
|
|
@ -282,9 +282,11 @@ class LXMRouter:
|
||||||
def announce_propagation_node(self):
|
def announce_propagation_node(self):
|
||||||
def delayed_announce():
|
def delayed_announce():
|
||||||
time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY)
|
time.sleep(LXMRouter.NODE_ANNOUNCE_DELAY)
|
||||||
node_state = self.propagation_node and not self.from_static_only
|
node_state = self.propagation_node and not self.from_static_only
|
||||||
stamp_cost = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility]
|
stamp_cost = [self.propagation_stamp_cost, self.propagation_stamp_cost_flexibility]
|
||||||
|
metadata = {}
|
||||||
announce_data = [
|
announce_data = [
|
||||||
|
metadata, # Node metadata
|
||||||
node_state, # Boolean flag signalling propagation node state
|
node_state, # Boolean flag signalling propagation node state
|
||||||
int(time.time()), # Current node timebase
|
int(time.time()), # Current node timebase
|
||||||
self.propagation_per_transfer_limit, # Per-transfer limit for message propagation in kilobytes
|
self.propagation_per_transfer_limit, # Per-transfer limit for message propagation in kilobytes
|
||||||
|
|
|
||||||
|
|
@ -76,7 +76,7 @@ def validate_pn_stamps_job_multip(transient_stamps):
|
||||||
cores = multiprocessing.cpu_count()
|
cores = multiprocessing.cpu_count()
|
||||||
pool_count = min(cores, math.ceil(len(transient_stamps) / PN_VALIDATION_POOL_MIN_SIZE))
|
pool_count = min(cores, math.ceil(len(transient_stamps) / PN_VALIDATION_POOL_MIN_SIZE))
|
||||||
|
|
||||||
RNS.log(f"Validating {len(transient_stamps)} stamps using {pool_count} processes...")
|
RNS.log(f"Validating {len(transient_stamps)} stamps using {pool_count} processes...", RNS.LOG_VERBOSE)
|
||||||
with multiprocessing.Pool(pool_count) as p: validated_entries = p.map(_validate_single_pn_stamp_entry, transient_stamps)
|
with multiprocessing.Pool(pool_count) as p: validated_entries = p.map(_validate_single_pn_stamp_entry, transient_stamps)
|
||||||
|
|
||||||
return validated_entries
|
return validated_entries
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue