diff --git a/LXMF/LXMF.py b/LXMF/LXMF.py index 5af55e1..69cd36a 100644 --- a/LXMF/LXMF.py +++ b/LXMF/LXMF.py @@ -417,7 +417,8 @@ class LXMessage: "state": self.state, "lxmf_bytes": self.packed, "transport_encrypted": self.transport_encrypted, - "transport_encryption": self.transport_encryption + "transport_encryption": self.transport_encryption, + "method": self.method } packed_container = msgpack.packb(container) @@ -507,6 +508,8 @@ class LXMessage: lxm.transport_encrypted = container["transport_encrypted"] if "transport_encryption" in container: lxm.transport_encryption = container["transport_encryption"] + if "method" in container: + lxm.method = container["method"] return lxm except Exception as e: @@ -644,6 +647,8 @@ class LXMPeer: RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG) if self.link != None: self.link.teardown() + else: + self.state = LXMPeer.IDLE def offer_response(self, request_receipt): # TODO: Remove @@ -703,17 +708,23 @@ class LXMPeer: for message_entry in wanted_messages: file_path = message_entry[1] - # TODO: Remove - RNS.log("Loading "+str(file_path)+" for transfer") - file = open(file_path, "rb") - lxmf_data = file.read() - file.close() - lxm_list.append(lxmf_data) + if os.path.isfile(file_path): + # TODO: Remove + RNS.log("Loading "+str(file_path)+" for transfer") + file = open(file_path, "rb") + lxmf_data = file.read() + file.close() + lxm_list.append(lxmf_data) + else: + # TODO: Remove + RNS.log("The requested message "+str(file_path)+" does not exist") data = msgpack.packb([time.time(), lxm_list]) resource = RNS.Resource(data, self.link, callback = self.resource_concluded) resource.transferred_messages = wanted_message_ids self.state = LXMPeer.RESOURCE_TRANSFERRING + else: + self.state = LXMPeer.IDLE except Exception as e: RNS.log("Error while handling offer response from peer "+str(self.destination), RNS.LOG_DEBUG) @@ -725,8 +736,13 @@ class LXMPeer: for transient_id in resource.transferred_messages: message = self.unhandled_messages.pop(transient_id) self.handled_messages[transient_id] = message + self.state = LXMPeer.IDLE else: RNS.log("Resource transfer for LXMF peer sync failed to "+str(self.destination), RNS.LOG_DEBUG) + if self.link != None: + self.link.teardown() + else: + self.state = LXMPeer.IDLE @@ -794,6 +810,7 @@ class LXMRouter: self.propagation_transfer_state = LXMRouter.PR_IDLE self.propagation_transfer_progress = 0.0 self.propagation_transfer_last_result = None + self.locally_delivered_transient_ids = {} if identity == None: identity = RNS.Identity() @@ -821,24 +838,25 @@ class LXMRouter: job_thread.start() def exit_handler(self): - try: - serialised_peers = [] - for peer_id in self.peers: - peer = self.peers[peer_id] - serialised_peers.append(peer.to_bytes()) + if self.propagation_node: + try: + serialised_peers = [] + for peer_id in self.peers: + peer = self.peers[peer_id] + serialised_peers.append(peer.to_bytes()) + # TODO: Remove + RNS.log("Saving peer "+str(peer)) + + peers_file = open(self.storagepath+"/peers", "wb") + peers_file.write(msgpack.packb(serialised_peers)) + peers_file.close() + # TODO: Remove - RNS.log("Saving peer "+str(peer)) - - peers_file = open(self.storagepath+"/peers", "wb") - peers_file.write(msgpack.packb(serialised_peers)) - peers_file.close() - - # TODO: Remove - RNS.log("Saved peers") + RNS.log("Saved peers") - except Exception as e: - RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) + except Exception as e: + RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR) def register_delivery_identity(self, identity, display_name = None): @@ -936,7 +954,10 @@ class LXMRouter: def has_message(self, transient_id): - return False + if transient_id in self.locally_delivered_transient_ids: + return True + else: + return False def message_get_failed(self, request_receipt): # TODO: Remove or change @@ -992,8 +1013,21 @@ class LXMRouter: self.outbound_propagation_link.teardown() else: if request_receipt.response != None and len(request_receipt.response) > 0: + haves = [] for lxmf_data in request_receipt.response: self.lxmf_propagation(lxmf_data) + haves.append(RNS.Identity.full_hash(lxmf_data)) + + # Return a list of successfully received messages to the node + # TODO: Remove + RNS.log("Telling node to clear "+str(len(haves))+" messages") + request_receipt.link.request( + LXMPeer.MESSAGE_GET_PATH, + [None, haves], + # response_callback=self.message_syncfinal_response, + failed_callback=self.message_get_failed, + # progress_callback=self.message_get_progress + ) else: RNS.log("No LXMF data received in message download response", RNS.LOG_DEBUG) @@ -1189,9 +1223,6 @@ class LXMRouter: return LXMPeer.ERROR_NO_IDENTITY else: try: - # TODO: Remove - RNS.log(str(data)) - remote_destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "delivery") # If both want and have fields are empty, send a list of @@ -1225,7 +1256,7 @@ class LXMRouter: if transient_id in self.propagation_entries and self.propagation_entries[transient_id][0] == remote_destination.hash: try: filepath = self.propagation_entries[transient_id][1] - self.propagation_entries.remove(transient_id) + self.propagation_entries.pop(transient_id) os.unlink(filepath) RNS.log("Client "+RNS.prettyhexrep(remote_destination.hash)+" purged message "+RNS.prettyhexrep(transient_id)+" at "+str(filepath), RNS.LOG_DEBUG) @@ -1335,6 +1366,7 @@ class LXMRouter: decrypted_lxmf_data = delivery_destination.decrypt(encrypted_lxmf_data) delivery_data = lxmf_data[:LXMessage.DESTINATION_LENGTH]+decrypted_lxmf_data self.lxmf_delivery(delivery_data, delivery_destination.type) + self.locally_delivered_transient_ids[transient_id] = time.time() else: if self.propagation_node: