mirror of
https://github.com/markqvist/LXMF.git
synced 2025-02-22 16:09:51 -05:00
Implemented LXMF propagation peer sync and client downloads
This commit is contained in:
parent
b675e1c47e
commit
9a75b9c1ec
86
LXMF/LXMF.py
86
LXMF/LXMF.py
@ -417,7 +417,8 @@ class LXMessage:
|
|||||||
"state": self.state,
|
"state": self.state,
|
||||||
"lxmf_bytes": self.packed,
|
"lxmf_bytes": self.packed,
|
||||||
"transport_encrypted": self.transport_encrypted,
|
"transport_encrypted": self.transport_encrypted,
|
||||||
"transport_encryption": self.transport_encryption
|
"transport_encryption": self.transport_encryption,
|
||||||
|
"method": self.method
|
||||||
}
|
}
|
||||||
|
|
||||||
packed_container = msgpack.packb(container)
|
packed_container = msgpack.packb(container)
|
||||||
@ -507,6 +508,8 @@ class LXMessage:
|
|||||||
lxm.transport_encrypted = container["transport_encrypted"]
|
lxm.transport_encrypted = container["transport_encrypted"]
|
||||||
if "transport_encryption" in container:
|
if "transport_encryption" in container:
|
||||||
lxm.transport_encryption = container["transport_encryption"]
|
lxm.transport_encryption = container["transport_encryption"]
|
||||||
|
if "method" in container:
|
||||||
|
lxm.method = container["method"]
|
||||||
|
|
||||||
return lxm
|
return lxm
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -644,6 +647,8 @@ class LXMPeer:
|
|||||||
RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG)
|
RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG)
|
||||||
if self.link != None:
|
if self.link != None:
|
||||||
self.link.teardown()
|
self.link.teardown()
|
||||||
|
else:
|
||||||
|
self.state = LXMPeer.IDLE
|
||||||
|
|
||||||
def offer_response(self, request_receipt):
|
def offer_response(self, request_receipt):
|
||||||
# TODO: Remove
|
# TODO: Remove
|
||||||
@ -703,17 +708,23 @@ class LXMPeer:
|
|||||||
|
|
||||||
for message_entry in wanted_messages:
|
for message_entry in wanted_messages:
|
||||||
file_path = message_entry[1]
|
file_path = message_entry[1]
|
||||||
# TODO: Remove
|
if os.path.isfile(file_path):
|
||||||
RNS.log("Loading "+str(file_path)+" for transfer")
|
# TODO: Remove
|
||||||
file = open(file_path, "rb")
|
RNS.log("Loading "+str(file_path)+" for transfer")
|
||||||
lxmf_data = file.read()
|
file = open(file_path, "rb")
|
||||||
file.close()
|
lxmf_data = file.read()
|
||||||
lxm_list.append(lxmf_data)
|
file.close()
|
||||||
|
lxm_list.append(lxmf_data)
|
||||||
|
else:
|
||||||
|
# TODO: Remove
|
||||||
|
RNS.log("The requested message "+str(file_path)+" does not exist")
|
||||||
|
|
||||||
data = msgpack.packb([time.time(), lxm_list])
|
data = msgpack.packb([time.time(), lxm_list])
|
||||||
resource = RNS.Resource(data, self.link, callback = self.resource_concluded)
|
resource = RNS.Resource(data, self.link, callback = self.resource_concluded)
|
||||||
resource.transferred_messages = wanted_message_ids
|
resource.transferred_messages = wanted_message_ids
|
||||||
self.state = LXMPeer.RESOURCE_TRANSFERRING
|
self.state = LXMPeer.RESOURCE_TRANSFERRING
|
||||||
|
else:
|
||||||
|
self.state = LXMPeer.IDLE
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
RNS.log("Error while handling offer response from peer "+str(self.destination), RNS.LOG_DEBUG)
|
RNS.log("Error while handling offer response from peer "+str(self.destination), RNS.LOG_DEBUG)
|
||||||
@ -725,8 +736,13 @@ class LXMPeer:
|
|||||||
for transient_id in resource.transferred_messages:
|
for transient_id in resource.transferred_messages:
|
||||||
message = self.unhandled_messages.pop(transient_id)
|
message = self.unhandled_messages.pop(transient_id)
|
||||||
self.handled_messages[transient_id] = message
|
self.handled_messages[transient_id] = message
|
||||||
|
self.state = LXMPeer.IDLE
|
||||||
else:
|
else:
|
||||||
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG)
|
RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG)
|
||||||
|
if self.link != None:
|
||||||
|
self.link.teardown()
|
||||||
|
else:
|
||||||
|
self.state = LXMPeer.IDLE
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -794,6 +810,7 @@ class LXMRouter:
|
|||||||
self.propagation_transfer_state = LXMRouter.PR_IDLE
|
self.propagation_transfer_state = LXMRouter.PR_IDLE
|
||||||
self.propagation_transfer_progress = 0.0
|
self.propagation_transfer_progress = 0.0
|
||||||
self.propagation_transfer_last_result = None
|
self.propagation_transfer_last_result = None
|
||||||
|
self.locally_delivered_transient_ids = {}
|
||||||
|
|
||||||
if identity == None:
|
if identity == None:
|
||||||
identity = RNS.Identity()
|
identity = RNS.Identity()
|
||||||
@ -821,24 +838,25 @@ class LXMRouter:
|
|||||||
job_thread.start()
|
job_thread.start()
|
||||||
|
|
||||||
def exit_handler(self):
|
def exit_handler(self):
|
||||||
try:
|
if self.propagation_node:
|
||||||
serialised_peers = []
|
try:
|
||||||
for peer_id in self.peers:
|
serialised_peers = []
|
||||||
peer = self.peers[peer_id]
|
for peer_id in self.peers:
|
||||||
serialised_peers.append(peer.to_bytes())
|
peer = self.peers[peer_id]
|
||||||
|
serialised_peers.append(peer.to_bytes())
|
||||||
|
# TODO: Remove
|
||||||
|
RNS.log("Saving peer "+str(peer))
|
||||||
|
|
||||||
|
peers_file = open(self.storagepath+"/peers", "wb")
|
||||||
|
peers_file.write(msgpack.packb(serialised_peers))
|
||||||
|
peers_file.close()
|
||||||
|
|
||||||
# TODO: Remove
|
# TODO: Remove
|
||||||
RNS.log("Saving peer "+str(peer))
|
RNS.log("Saved peers")
|
||||||
|
|
||||||
peers_file = open(self.storagepath+"/peers", "wb")
|
|
||||||
peers_file.write(msgpack.packb(serialised_peers))
|
|
||||||
peers_file.close()
|
|
||||||
|
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("Saved peers")
|
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||||
|
|
||||||
|
|
||||||
def register_delivery_identity(self, identity, display_name = None):
|
def register_delivery_identity(self, identity, display_name = None):
|
||||||
@ -936,7 +954,10 @@ class LXMRouter:
|
|||||||
|
|
||||||
|
|
||||||
def has_message(self, transient_id):
|
def has_message(self, transient_id):
|
||||||
return False
|
if transient_id in self.locally_delivered_transient_ids:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
def message_get_failed(self, request_receipt):
|
def message_get_failed(self, request_receipt):
|
||||||
# TODO: Remove or change
|
# TODO: Remove or change
|
||||||
@ -992,8 +1013,21 @@ class LXMRouter:
|
|||||||
self.outbound_propagation_link.teardown()
|
self.outbound_propagation_link.teardown()
|
||||||
else:
|
else:
|
||||||
if request_receipt.response != None and len(request_receipt.response) > 0:
|
if request_receipt.response != None and len(request_receipt.response) > 0:
|
||||||
|
haves = []
|
||||||
for lxmf_data in request_receipt.response:
|
for lxmf_data in request_receipt.response:
|
||||||
self.lxmf_propagation(lxmf_data)
|
self.lxmf_propagation(lxmf_data)
|
||||||
|
haves.append(RNS.Identity.full_hash(lxmf_data))
|
||||||
|
|
||||||
|
# Return a list of successfully received messages to the node
|
||||||
|
# TODO: Remove
|
||||||
|
RNS.log("Telling node to clear "+str(len(haves))+" messages")
|
||||||
|
request_receipt.link.request(
|
||||||
|
LXMPeer.MESSAGE_GET_PATH,
|
||||||
|
[None, haves],
|
||||||
|
# response_callback=self.message_syncfinal_response,
|
||||||
|
failed_callback=self.message_get_failed,
|
||||||
|
# progress_callback=self.message_get_progress
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
RNS.log("No LXMF data received in message download response", RNS.LOG_DEBUG)
|
RNS.log("No LXMF data received in message download response", RNS.LOG_DEBUG)
|
||||||
|
|
||||||
@ -1189,9 +1223,6 @@ class LXMRouter:
|
|||||||
return LXMPeer.ERROR_NO_IDENTITY
|
return LXMPeer.ERROR_NO_IDENTITY
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
# TODO: Remove
|
|
||||||
RNS.log(str(data))
|
|
||||||
|
|
||||||
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "delivery")
|
remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "delivery")
|
||||||
|
|
||||||
# If both want and have fields are empty, send a list of
|
# If both want and have fields are empty, send a list of
|
||||||
@ -1225,7 +1256,7 @@ class LXMRouter:
|
|||||||
if transient_id in self.propagation_entries and self.propagation_entries[transient_id][0] == remote_destination.hash:
|
if transient_id in self.propagation_entries and self.propagation_entries[transient_id][0] == remote_destination.hash:
|
||||||
try:
|
try:
|
||||||
filepath = self.propagation_entries[transient_id][1]
|
filepath = self.propagation_entries[transient_id][1]
|
||||||
self.propagation_entries.remove(transient_id)
|
self.propagation_entries.pop(transient_id)
|
||||||
os.unlink(filepath)
|
os.unlink(filepath)
|
||||||
RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" purged message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG)
|
RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" purged message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG)
|
||||||
|
|
||||||
@ -1335,6 +1366,7 @@ class LXMRouter:
|
|||||||
decrypted_lxmf_data = delivery_destination.decrypt(encrypted_lxmf_data)
|
decrypted_lxmf_data = delivery_destination.decrypt(encrypted_lxmf_data)
|
||||||
delivery_data = lxmf_data[:LXMessage.DESTINATION_LENGTH]+decrypted_lxmf_data
|
delivery_data = lxmf_data[:LXMessage.DESTINATION_LENGTH]+decrypted_lxmf_data
|
||||||
self.lxmf_delivery(delivery_data, delivery_destination.type)
|
self.lxmf_delivery(delivery_data, delivery_destination.type)
|
||||||
|
self.locally_delivered_transient_ids[transient_id] = time.time()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if self.propagation_node:
|
if self.propagation_node:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user