General structure for LXMF propagation

This commit is contained in:
Mark Qvist 2021-10-02 22:04:31 +02:00
parent 1c5e879abd
commit ad773dcbc5

View File

@ -1,3 +1,4 @@
import os
import math
import time
import threading
@ -476,6 +477,167 @@ class LXMFDeliveryAnnounceHandler:
self.lxmrouter.process_outbound()
class LXMFPropagationAnnounceHandler:
def __init__(self, lxmrouter):
self.aspect_filter = APP_NAME+".propagation"
self.lxmrouter = lxmrouter
def received_announce(self, destination_hash, announced_identity, app_data):
try:
data = msgpack.unpackb(app_data)
if self.lxmrouter.autopeer:
if data[0] == True:
if RNS.Transport.hops_to(destination_hash) <= self.lxmrouter.autopeer_maxdepth:
self.lxmrouter.peer(destination_hash, data[1])
elif data[0] == False:
self.lxmrouter.unpeer(destination_hash, data[1])
except Exception as e:
RNS.log("Error while evaluating propagation node announce, ignoring announce.", RNS.LOG_DEBUG)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
class LXMPeer:
OFFER_REQUEST_PATH = "o"
IDLE = 0x00
LINK_ESTABLISHING = 0x01
LINK_READY = 0x02
REQUEST_SENT = 0x03
RESPONSE_RECEIVED = 0x04
RESOURCE_TRANSFERRING = 0x05
ERROR_NO_IDENTITY = 0xf0
@staticmethod
def from_bytes(bytes):
pass
def to_bytes(self):
pass
def __init__(self, router, destination_hash):
self.alive = False
self.last_heard = None
self.peering_timebase = 0
self.link = None
self.state = LXMPeer.IDLE
self.unhandled_messages = {}
self.handled_messages = {}
self.router = router
self.destination_hash = destination_hash
self.identity = RNS.Identity.recall(destination_hash)
self.destination = RNS.Destination(self.identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
def sync(self, initiator=True):
if len(self.unhandled_messages) > 0:
if self.state == LXMPeer.IDLE:
self.link = RNS.Link(self.destination, established_callback = self.link_established, closed_callback = self.link_closed)
self.state = LXMPeer.LINK_ESTABLISHING
else:
if self.state == LXMPeer.LINK_READY:
unhandled_ids = []
for transient_id in self.unhandled_messages:
unhandled_ids.append(transient_id)
# TODO: Remove
RNS.log("Sending sync request to peer")
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
def request_failed(self, request_receipt):
RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG)
if self.link != None:
self.link.teardown()
def offer_response(self, request_receipt):
# TODO: Remove
RNS.log("Got offer response from peer")
try:
self.state = LXMPeer.RESPONSE_RECEIVED
response = request_receipt.response
RNS.log(str(response))
wanted_messages = []
wanted_message_ids = []
if response == LXMPeer.ERROR_NO_IDENTITY:
if self.link != None:
self.link.indentify()
self.state = LXMPeer.LINK_READY
elif response == False:
# Peer already has all advertised messages
# TODO: Remove
RNS.log("Peer had all advertised messages", RNS.LOG_DEBUG)
for transient_id in self.unhandled_messages:
message = self.unhandled_messages[transient_id]
self.handled_messages[transient_id] = message
self.unhandled_messages = {}
elif response == True:
# Peer wants all advertised messages
for transient_id in self.unhandled_messages:
wanted_messages.append(self.unhandled_messages[transient_id][2])
wanted_message_ids.append(transient_id)
else:
# Peer wants some advertised messages
for transient_id in self.unhandled_messages:
# If the peer did not want the message, it has
# already received it from another peer.
if not transient_id in response:
message = self.unhandled_messages.pop(transient_id)
self.handled_messages[transient_id] = message
for transient_id in response:
wanted_messages.append(self.unhandled_messages[transient_id][2])
wanted_message_ids.append(transient_id)
if len(wanted_messages) > 0:
# TODO: Remove
RNS.log("Peer wanted: "+str(wanted_messages), RNS.LOG_DEBUG)
data = msgpack.packb(wanted_messages)
resource = RNS.Resource(data, self.link, callback = self.resource_concluded)
resource.transferred_messages = wanted_message_ids
self.state = LXMPeer.RESOURCE_TRANSFERRING
except Exception as e:
RNS.log("Error while handling offer response from peer "+str(self.destination), RNS.LOG_DEBUG)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
def resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE:
for transient_id in resource.transferred_messages:
message = self.unhandled_messages.pop(transient_id)
self.handled_messages[transient_id] = message
else:
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG)
def link_established(self, link):
self.link.identify(self.router.identity)
self.state = LXMPeer.LINK_READY
def link_closed(self, link):
self.link = None
self.state = LXMPeer.IDLE
def handle_message(self, transient_id):
if not transient_id in self.handled_messages and not self.unhandled_messages:
self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id]
class LXMRouter:
MAX_DELIVERY_ATTEMPTS = 3
PROCESSING_INTERVAL = 5
@ -483,7 +645,21 @@ class LXMRouter:
PATH_REQUEST_WAIT = 5
LINK_MAX_INACTIVITY = 10*60
def __init__(self):
AUTOPEER = True
AUTOPEER_MAXDEPTH = 4
# TODO: Remove
def ptest(self):
i1 = RNS.Identity()
i2 = RNS.Identity()
d1 = RNS.Destination(i1, RNS.Destination.IN, RNS.Destination.SINGLE, "test", "delivery")
d2 = RNS.Destination(i2, RNS.Destination.IN, RNS.Destination.SINGLE, "test", "delivery")
lxm = LXMessage(d1, d2, content="No content here")
return lxm
def __init__(self, identity = None, autopeer = AUTOPEER, autopeer_maxdepth = AUTOPEER_MAXDEPTH):
self.pending_inbound = []
self.pending_outbound = []
self.failed_outbound = []
@ -492,12 +668,26 @@ class LXMRouter:
self.processing_outbound = False
self.processing_inbound = False
self.processing_count = 0
self.identity = RNS.Identity()
self.propagation_node = False
if identity == None:
identity = RNS.Identity()
self.identity = identity
self.lxmf_query_destination = RNS.Destination(None, RNS.Destination.IN, RNS.Destination.PLAIN, APP_NAME, "query")
self.propagation_destination = RNS.Destination(self.identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "propagation")
self.autopeer = autopeer
self.autopeer_maxdepth = autopeer_maxdepth
self.peers = {}
self.propagation_entries = {}
self.propagated_ids = {}
RNS.Transport.register_announce_handler(LXMFDeliveryAnnounceHandler(self))
RNS.Transport.register_announce_handler(LXMFPropagationAnnounceHandler(self))
self.__delivery_callback = None
@ -544,7 +734,7 @@ class LXMRouter:
if destination_type == RNS.Destination.SINGLE:
message.transport_encrypted = True
message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_RSA
message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC
elif destination_type == RNS.Destination.GROUP:
message.transport_encrypted = True
message.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_AES
@ -586,7 +776,7 @@ class LXMRouter:
link.set_packet_callback(self.delivery_packet)
link.set_resource_strategy(RNS.Link.ACCEPT_ALL)
link.set_resource_started_callback(self.resource_transfer_began)
link.set_resource_concluded_callback(self.resource_transfer_concluded)
link.set_resource_concluded_callback(self.delivery_resource_concluded)
def delivery_link_closed(self, link):
pass
@ -594,11 +784,133 @@ class LXMRouter:
def resource_transfer_began(self, resource):
RNS.log("Transfer began for resource "+str(resource), RNS.LOG_DEBUG)
def resource_transfer_concluded(self, resource):
RNS.log("Transfer concluded for resource "+str(resource), RNS.LOG_DEBUG)
def delivery_resource_concluded(self, resource):
RNS.log("Transfer concluded for delivery resource "+str(resource), RNS.LOG_DEBUG)
if resource.status == RNS.Resource.COMPLETE:
self.lxmf_delivery(resource.data.read(), resource.link.type)
def enable_propagation(self, storagepath):
try:
self.storagepath = storagepath+"/lxmf"
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
# TODO: Load peers and data
self.propagation_node = True
self.propagation_destination.set_link_established_callback(self.propagation_link_established)
self.propagation_destination.register_request_handler(LXMPeer.OFFER_REQUEST_PATH, self.offer_request, allow = RNS.Destination.ALLOW_ALL)
self.announce_propagation_node()
except Exception as e:
RNS.log("Could not enable propagation node. The contained exception was: "+str(e), RNS.LOG_ERROR)
RNS.panic()
def disable_propagation(self):
self.propagation_node = False
self.announce_propagation_node()
def announce_propagation_node(self):
data = msgpack.packb([self.propagation_node, int(time.time())])
self.propagation_destination.announce(app_data=data)
def offer_request(self, path, data, request_id, remote_identity, requested_at):
# TODO: Remove
RNS.log("Offer request")
if remote_identity == None:
return LXMPeer.ERROR_NO_IDENTITY
else:
try:
# TODO: Remove
RNS.log(str(data))
transient_ids = data
wanted_ids = []
for transient_id in transient_ids:
if not transient_id in self.propagation_entries:
wanted_ids.append(transient_id)
if len(wanted_ids) == 0:
return False
elif len(wanted_ids) == len(transient_ids):
return True
else:
return wanted_ids
except Exception as e:
RNS.log("Error occurred while generating response for sync request, the contained exception was: "+str(e), RNS.LOG_DEBUG)
return None
def propagation_link_established(self, link):
link.set_resource_strategy(RNS.Link.ACCEPT_ALL)
link.set_resource_started_callback(self.resource_transfer_began)
link.set_resource_concluded_callback(self.propagation_resource_concluded)
def propagation_resource_concluded(self, resource):
RNS.log("Transfer concluded for delivery resource "+str(resource), RNS.LOG_DEBUG)
if resource.status == RNS.Resource.COMPLETE:
# TODO: The peer this was received from should
# have the transient id added to it's list of
# already handled messages.
self.lxmf_propagation(resource.data.read())
def lxmf_propagation(self, lxmf_data):
try:
if len(lxmf_data) >= LXMessage.LXMF_OVERHEAD:
transient_id = RNS.Identity.full_hash(lxmf_data)
if not transient_id in self.propagation_entries and not transient_id in self.propagated_ids:
received = time.time()
propagation_entry = [transient_id, received, lxmf_data]
self.propagation_entries[transient_id] = propagation_entry
for peer_id in self.peers:
peer = self.peers[peer_id]
peer.handle_message(transient_id)
return True
return False
except Exception as e:
RNS.log("Could not assemble propagated LXMF message from received data", RNS.LOG_NOTICE)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
return False
def peer(self, destination_hash, timestamp):
if destination_hash in self.peers:
peer = self.peers[destination_hash]
peer.alive = True
peer.peering_timebase = timestamp
peer.last_heard = time.time()
else:
peer = LXMPeer(self, destination_hash)
self.peers[destination_hash] = peer
RNS.log(str(self.propagation_destination)+" peered with "+str(peer.destination))
def unpeer(self, destination_hash, timestamp = None):
if timestamp == None:
timestamp = int(time.time())
if destination_hash in self.peers:
peer = self.peers[destination_hash]
if timestamp >= peer.peering_timebase:
self.peers.pop(destination_hash)
RNS.log(str(self.propagation_destination)+" broke peering with "+str(peer.destination))
def jobloop(self):
while (True):
# TODO: Improve this to scheduling, so manual
@ -606,10 +918,18 @@ class LXMRouter:
self.jobs()
time.sleep(LXMRouter.PROCESSING_INTERVAL)
JOB_OUTBOUND_INTERVAL = 1
JOB_LINKS_INTERVAL = 1
def jobs(self):
if self.processing_count % LXMRouter.JOB_OUTBOUND_INTERVAL == 0:
self.process_outbound()
if self.processing_count % LXMRouter.JOB_LINKS_INTERVAL == 0:
self.clean_links()
self.processing_count += 1
def clean_links(self):
closed_links = []
for link_hash in self.direct_links: