mirror of
https://github.com/markqvist/LXMF.git
synced 2025-03-02 11:59:25 -05:00
Implemented propagation node auto reverse peering
This commit is contained in:
parent
734305e9c7
commit
093d4f1317
41
LXMF/LXMF.py
41
LXMF/LXMF.py
@ -547,6 +547,9 @@ class LXMPeer:
|
|||||||
# TODO: Remove
|
# TODO: Remove
|
||||||
RNS.log("Sending sync request to peer")
|
RNS.log("Sending sync request to peer")
|
||||||
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
|
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
|
||||||
|
else:
|
||||||
|
# TODO: Remove
|
||||||
|
RNS.log("No unsynced messages")
|
||||||
|
|
||||||
def request_failed(self, request_receipt):
|
def request_failed(self, request_receipt):
|
||||||
RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG)
|
RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG)
|
||||||
@ -577,8 +580,8 @@ class LXMPeer:
|
|||||||
RNS.log("Peer had all advertised messages", RNS.LOG_DEBUG)
|
RNS.log("Peer had all advertised messages", RNS.LOG_DEBUG)
|
||||||
|
|
||||||
for transient_id in self.unhandled_messages:
|
for transient_id in self.unhandled_messages:
|
||||||
message = self.unhandled_messages[transient_id]
|
message_entry = self.unhandled_messages[transient_id]
|
||||||
self.handled_messages[transient_id] = message
|
self.handled_messages[transient_id] = message_entry
|
||||||
|
|
||||||
self.unhandled_messages = {}
|
self.unhandled_messages = {}
|
||||||
|
|
||||||
@ -594,8 +597,8 @@ class LXMPeer:
|
|||||||
# If the peer did not want the message, it has
|
# If the peer did not want the message, it has
|
||||||
# already received it from another peer.
|
# already received it from another peer.
|
||||||
if not transient_id in response:
|
if not transient_id in response:
|
||||||
message = self.unhandled_messages.pop(transient_id)
|
message_entry = self.unhandled_messages.pop(transient_id)
|
||||||
self.handled_messages[transient_id] = message
|
self.handled_messages[transient_id] = message_entry
|
||||||
|
|
||||||
for transient_id in response:
|
for transient_id in response:
|
||||||
wanted_messages.append(self.unhandled_messages[transient_id][2])
|
wanted_messages.append(self.unhandled_messages[transient_id][2])
|
||||||
@ -605,7 +608,7 @@ class LXMPeer:
|
|||||||
# TODO: Remove
|
# TODO: Remove
|
||||||
RNS.log("Peer wanted: "+str(wanted_messages), RNS.LOG_DEBUG)
|
RNS.log("Peer wanted: "+str(wanted_messages), RNS.LOG_DEBUG)
|
||||||
|
|
||||||
data = msgpack.packb(wanted_messages)
|
data = msgpack.packb([time.time(), wanted_messages])
|
||||||
resource = RNS.Resource(data, self.link, callback = self.resource_concluded)
|
resource = RNS.Resource(data, self.link, callback = self.resource_concluded)
|
||||||
resource.transferred_messages = wanted_message_ids
|
resource.transferred_messages = wanted_message_ids
|
||||||
self.state = LXMPeer.RESOURCE_TRANSFERRING
|
self.state = LXMPeer.RESOURCE_TRANSFERRING
|
||||||
@ -856,15 +859,32 @@ class LXMRouter:
|
|||||||
|
|
||||||
|
|
||||||
def propagation_resource_concluded(self, resource):
|
def propagation_resource_concluded(self, resource):
|
||||||
RNS.log("Transfer concluded for delivery resource "+str(resource), RNS.LOG_DEBUG)
|
RNS.log("Transfer concluded for propagation resource "+str(resource), RNS.LOG_DEBUG)
|
||||||
if resource.status == RNS.Resource.COMPLETE:
|
if resource.status == RNS.Resource.COMPLETE:
|
||||||
# TODO: The peer this was received from should
|
# TODO: The peer this was received from should
|
||||||
# have the transient id added to it's list of
|
# have the transient id added to it's list of
|
||||||
# already handled messages.
|
# already handled messages.
|
||||||
try:
|
try:
|
||||||
messages = msgpack.unpackb(resource.data.read())
|
data = msgpack.unpackb(resource.data.read())
|
||||||
for message in messages:
|
remote_timebase = data[0]
|
||||||
self.lxmf_propagation(lxmessage)
|
remote_hash = None
|
||||||
|
remote_identity = resource.link.get_remote_identity()
|
||||||
|
|
||||||
|
if remote_identity != None:
|
||||||
|
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "propagation")
|
||||||
|
remote_hash = remote_destination.hash
|
||||||
|
|
||||||
|
if not remote_hash in self.peers:
|
||||||
|
self.peer(remote_hash, remote_timebase)
|
||||||
|
|
||||||
|
messages = data[1]
|
||||||
|
for lxmf_data in messages:
|
||||||
|
if remote_hash in self.peers:
|
||||||
|
transient_id = RNS.Identity.full_hash(lxmf_data)
|
||||||
|
peer = self.peers[remote_hash]
|
||||||
|
peer.handled_messages[transient_id] = [transient_id, remote_timebase, lxmf_data]
|
||||||
|
|
||||||
|
self.lxmf_propagation(lxmf_data)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
RNS.log("Error while unpacking received propagation messages", RNS.LOG_DEBUG)
|
RNS.log("Error while unpacking received propagation messages", RNS.LOG_DEBUG)
|
||||||
@ -880,6 +900,9 @@ class LXMRouter:
|
|||||||
|
|
||||||
self.propagation_entries[transient_id] = propagation_entry
|
self.propagation_entries[transient_id] = propagation_entry
|
||||||
|
|
||||||
|
# TODO: Remove?
|
||||||
|
RNS.log("Received propagated LXMF message "+RNS.prettyhexrep(transient_id)+", adding to peer distribution queues...", RNS.LOG_DEBUG)
|
||||||
|
|
||||||
for peer_id in self.peers:
|
for peer_id in self.peers:
|
||||||
peer = self.peers[peer_id]
|
peer = self.peers[peer_id]
|
||||||
peer.handle_message(transient_id)
|
peer.handle_message(transient_id)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user