mirror of
https://github.com/markqvist/LXMF.git
synced 2024-12-28 08:39:26 -05:00
Cleanup
This commit is contained in:
parent
9a75b9c1ec
commit
156109bf0b
93
LXMF/LXMF.py
93
LXMF/LXMF.py
@ -635,13 +635,9 @@ class LXMPeer:
|
|||||||
for transient_id in self.unhandled_messages:
|
for transient_id in self.unhandled_messages:
|
||||||
unhandled_ids.append(transient_id)
|
unhandled_ids.append(transient_id)
|
||||||
|
|
||||||
# TODO: Remove
|
RNS.log("Sending sync request to peer "+str(self.destination), RNS.LOG_DEBUG)
|
||||||
RNS.log("Sending sync request to peer")
|
|
||||||
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
|
self.link.request(LXMPeer.OFFER_REQUEST_PATH, unhandled_ids, response_callback=self.offer_response, failed_callback=self.request_failed)
|
||||||
self.state = LXMPeer.REQUEST_SENT
|
self.state = LXMPeer.REQUEST_SENT
|
||||||
else:
|
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("No unsynced messages")
|
|
||||||
|
|
||||||
def request_failed(self, request_receipt):
|
def request_failed(self, request_receipt):
|
||||||
RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG)
|
RNS.log("Sync request to peer "+str(self.destination)+" failed", RNS.LOG_DEBUG)
|
||||||
@ -651,8 +647,6 @@ class LXMPeer:
|
|||||||
self.state = LXMPeer.IDLE
|
self.state = LXMPeer.IDLE
|
||||||
|
|
||||||
def offer_response(self, request_receipt):
|
def offer_response(self, request_receipt):
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("Got offer response from peer")
|
|
||||||
try:
|
try:
|
||||||
self.state = LXMPeer.RESPONSE_RECEIVED
|
self.state = LXMPeer.RESPONSE_RECEIVED
|
||||||
response = request_receipt.response
|
response = request_receipt.response
|
||||||
@ -671,10 +665,6 @@ class LXMPeer:
|
|||||||
|
|
||||||
elif response == False:
|
elif response == False:
|
||||||
# Peer already has all advertised messages
|
# Peer already has all advertised messages
|
||||||
|
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("Peer had all advertised messages", RNS.LOG_DEBUG)
|
|
||||||
|
|
||||||
for transient_id in self.unhandled_messages:
|
for transient_id in self.unhandled_messages:
|
||||||
message_entry = self.unhandled_messages[transient_id]
|
message_entry = self.unhandled_messages[transient_id]
|
||||||
self.handled_messages[transient_id] = message_entry
|
self.handled_messages[transient_id] = message_entry
|
||||||
@ -701,23 +691,17 @@ class LXMPeer:
|
|||||||
wanted_message_ids.append(transient_id)
|
wanted_message_ids.append(transient_id)
|
||||||
|
|
||||||
if len(wanted_messages) > 0:
|
if len(wanted_messages) > 0:
|
||||||
# TODO: Remove
|
RNS.log("Peer wanted "+str(len(wanted_messages))+" of the available messages", RNS.LOG_DEBUG)
|
||||||
RNS.log("Peer wanted: "+str(wanted_messages), RNS.LOG_DEBUG)
|
|
||||||
|
|
||||||
lxm_list = []
|
lxm_list = []
|
||||||
|
|
||||||
for message_entry in wanted_messages:
|
for message_entry in wanted_messages:
|
||||||
file_path = message_entry[1]
|
file_path = message_entry[1]
|
||||||
if os.path.isfile(file_path):
|
if os.path.isfile(file_path):
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("Loading "+str(file_path)+" for transfer")
|
|
||||||
file = open(file_path, "rb")
|
file = open(file_path, "rb")
|
||||||
lxmf_data = file.read()
|
lxmf_data = file.read()
|
||||||
file.close()
|
file.close()
|
||||||
lxm_list.append(lxmf_data)
|
lxm_list.append(lxmf_data)
|
||||||
else:
|
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("The requested message "+str(file_path)+" does not exist")
|
|
||||||
|
|
||||||
data = msgpack.packb([time.time(), lxm_list])
|
data = msgpack.packb([time.time(), lxm_list])
|
||||||
resource = RNS.Resource(data, self.link, callback = self.resource_concluded)
|
resource = RNS.Resource(data, self.link, callback = self.resource_concluded)
|
||||||
@ -727,8 +711,8 @@ class LXMPeer:
|
|||||||
self.state = LXMPeer.IDLE
|
self.state = LXMPeer.IDLE
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
RNS.log("Error while handling offer response from peer "+str(self.destination), RNS.LOG_DEBUG)
|
RNS.log("Error while handling offer response from peer "+str(self.destination), RNS.LOG_ERROR)
|
||||||
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
|
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||||
|
|
||||||
|
|
||||||
def resource_concluded(self, resource):
|
def resource_concluded(self, resource):
|
||||||
@ -757,12 +741,12 @@ class LXMPeer:
|
|||||||
|
|
||||||
def handle_message(self, transient_id):
|
def handle_message(self, transient_id):
|
||||||
if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages:
|
if not transient_id in self.handled_messages and not transient_id in self.unhandled_messages:
|
||||||
# TODO: Remove
|
# TODO: Remove at some point
|
||||||
RNS.log("The message "+RNS.prettyhexrep(transient_id)+" was added to distribution queue for "+RNS.prettyhexrep(self.destination_hash))
|
RNS.log("The message "+RNS.prettyhexrep(transient_id)+" was added to distribution queue for "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_EXTREME)
|
||||||
self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id]
|
self.unhandled_messages[transient_id] = self.router.propagation_entries[transient_id]
|
||||||
else:
|
else:
|
||||||
# TODO: Remove
|
# TODO: Remove at some point
|
||||||
RNS.log("The message "+RNS.prettyhexrep(transient_id)+" was NOT added to distribution queue for "+RNS.prettyhexrep(self.destination_hash))
|
RNS.log("The message "+RNS.prettyhexrep(transient_id)+" was NOT added to distribution queue for "+RNS.prettyhexrep(self.destination_hash), RNS.LOG_EXTREME)
|
||||||
|
|
||||||
|
|
||||||
class LXMRouter:
|
class LXMRouter:
|
||||||
@ -844,16 +828,12 @@ class LXMRouter:
|
|||||||
for peer_id in self.peers:
|
for peer_id in self.peers:
|
||||||
peer = self.peers[peer_id]
|
peer = self.peers[peer_id]
|
||||||
serialised_peers.append(peer.to_bytes())
|
serialised_peers.append(peer.to_bytes())
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("Saving peer "+str(peer))
|
|
||||||
|
|
||||||
peers_file = open(self.storagepath+"/peers", "wb")
|
peers_file = open(self.storagepath+"/peers", "wb")
|
||||||
peers_file.write(msgpack.packb(serialised_peers))
|
peers_file.write(msgpack.packb(serialised_peers))
|
||||||
peers_file.close()
|
peers_file.close()
|
||||||
|
|
||||||
# TODO: Remove
|
RNS.log("Saved "+str(len(serialised_peers))+" peers to storage", RNS.LOG_DEBUG)
|
||||||
RNS.log("Saved peers")
|
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
RNS.log("Could not save propagation node peers to storage. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||||
@ -933,8 +913,8 @@ class LXMRouter:
|
|||||||
self.propagation_transfer_state = LXMRouter.PR_PATH_REQUESTED
|
self.propagation_transfer_state = LXMRouter.PR_PATH_REQUESTED
|
||||||
self.request_messages_path_job()
|
self.request_messages_path_job()
|
||||||
else:
|
else:
|
||||||
# TODO: Remove
|
# TODO: Remove at some point
|
||||||
RNS.log("Waiting for propagation node link to become active", RNS.LOG_DEBUG)
|
RNS.log("Waiting for propagation node link to become active", RNS.LOG_EXTREME)
|
||||||
|
|
||||||
|
|
||||||
def request_messages_path_job(self):
|
def request_messages_path_job(self):
|
||||||
@ -960,27 +940,21 @@ class LXMRouter:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def message_get_failed(self, request_receipt):
|
def message_get_failed(self, request_receipt):
|
||||||
# TODO: Remove or change
|
|
||||||
RNS.log("Message list/get request failed", RNS.LOG_DEBUG)
|
RNS.log("Message list/get request failed", RNS.LOG_DEBUG)
|
||||||
if self.outbound_propagation_link != None:
|
if self.outbound_propagation_link != None:
|
||||||
self.outbound_propagation_link.teardown()
|
self.outbound_propagation_link.teardown()
|
||||||
|
|
||||||
def message_list_response(self, request_receipt):
|
def message_list_response(self, request_receipt):
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("Got message list response from propagation node")
|
|
||||||
if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY:
|
if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY:
|
||||||
RNS.log("Propagation node indicated missing identification on reuquest", RNS.LOG_DEBUG)
|
RNS.log("Propagation node indicated missing identification on list request, tearing down link.", RNS.LOG_DEBUG)
|
||||||
if self.outbound_propagation_link != None:
|
if self.outbound_propagation_link != None:
|
||||||
self.outbound_propagation_link.teardown()
|
self.outbound_propagation_link.teardown()
|
||||||
else:
|
else:
|
||||||
# TODO: Remove
|
|
||||||
if request_receipt.response != None:
|
if request_receipt.response != None:
|
||||||
RNS.log("Received message list from node:", RNS.LOG_DEBUG)
|
|
||||||
haves = []
|
haves = []
|
||||||
wants = []
|
wants = []
|
||||||
if len(request_receipt.response) > 0:
|
if len(request_receipt.response) > 0:
|
||||||
for transient_id in request_receipt.response:
|
for transient_id in request_receipt.response:
|
||||||
RNS.log(" "+RNS.prettyhexrep(transient_id), RNS.LOG_DEBUG)
|
|
||||||
if self.has_message(transient_id):
|
if self.has_message(transient_id):
|
||||||
haves.append(transient_id)
|
haves.append(transient_id)
|
||||||
else:
|
else:
|
||||||
@ -994,21 +968,16 @@ class LXMRouter:
|
|||||||
progress_callback=self.message_get_progress
|
progress_callback=self.message_get_progress
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("No messages on node", RNS.LOG_DEBUG)
|
|
||||||
self.propagation_transfer_state = LXMRouter.PR_COMPLETE
|
self.propagation_transfer_state = LXMRouter.PR_COMPLETE
|
||||||
self.propagation_transfer_progress = 1.0
|
self.propagation_transfer_progress = 1.0
|
||||||
self.propagation_transfer_last_result = 0
|
self.propagation_transfer_last_result = 0
|
||||||
|
|
||||||
def message_get_progress(self, request_receipt):
|
def message_get_progress(self, request_receipt):
|
||||||
self.propagation_transfer_progress = request_receipt.get_progress()
|
self.propagation_transfer_progress = request_receipt.get_progress()
|
||||||
RNS.log("Progress: "+str(self.propagation_transfer_progress))
|
|
||||||
|
|
||||||
def message_get_response(self, request_receipt):
|
def message_get_response(self, request_receipt):
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("Got message download response from propagation node")
|
|
||||||
if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY:
|
if request_receipt.response == LXMPeer.ERROR_NO_IDENTITY:
|
||||||
RNS.log("Propagation node indicated missing identification on reuquest", RNS.LOG_DEBUG)
|
RNS.log("Propagation node indicated missing identification on get request, tearing down link.", RNS.LOG_DEBUG)
|
||||||
if self.outbound_propagation_link != None:
|
if self.outbound_propagation_link != None:
|
||||||
self.outbound_propagation_link.teardown()
|
self.outbound_propagation_link.teardown()
|
||||||
else:
|
else:
|
||||||
@ -1019,8 +988,6 @@ class LXMRouter:
|
|||||||
haves.append(RNS.Identity.full_hash(lxmf_data))
|
haves.append(RNS.Identity.full_hash(lxmf_data))
|
||||||
|
|
||||||
# Return a list of successfully received messages to the node
|
# Return a list of successfully received messages to the node
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("Telling node to clear "+str(len(haves))+" messages")
|
|
||||||
request_receipt.link.request(
|
request_receipt.link.request(
|
||||||
LXMPeer.MESSAGE_GET_PATH,
|
LXMPeer.MESSAGE_GET_PATH,
|
||||||
[None, haves],
|
[None, haves],
|
||||||
@ -1028,11 +995,7 @@ class LXMRouter:
|
|||||||
failed_callback=self.message_get_failed,
|
failed_callback=self.message_get_failed,
|
||||||
# progress_callback=self.message_get_progress
|
# progress_callback=self.message_get_progress
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
RNS.log("No LXMF data received in message download response", RNS.LOG_DEBUG)
|
|
||||||
|
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("No messages on node", RNS.LOG_DEBUG)
|
|
||||||
self.propagation_transfer_state = LXMRouter.PR_COMPLETE
|
self.propagation_transfer_state = LXMRouter.PR_COMPLETE
|
||||||
self.propagation_transfer_progress = 1.0
|
self.propagation_transfer_progress = 1.0
|
||||||
self.propagation_transfer_last_result = len(request_receipt.response)
|
self.propagation_transfer_last_result = len(request_receipt.response)
|
||||||
@ -1143,9 +1106,6 @@ class LXMRouter:
|
|||||||
file.close()
|
file.close()
|
||||||
|
|
||||||
self.propagation_entries[transient_id] = [destination_hash, filepath]
|
self.propagation_entries[transient_id] = [destination_hash, filepath]
|
||||||
|
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("Registered msg "+RNS.prettyhexrep(transient_id)+" at "+filepath+" for "+RNS.prettyhexrep(destination_hash))
|
|
||||||
|
|
||||||
if os.path.isfile(self.storagepath+"/peers"):
|
if os.path.isfile(self.storagepath+"/peers"):
|
||||||
peers_file = open(self.storagepath+"/peers", "rb")
|
peers_file = open(self.storagepath+"/peers", "rb")
|
||||||
@ -1155,12 +1115,9 @@ class LXMRouter:
|
|||||||
peer = LXMPeer.from_bytes(serialised_peer, self)
|
peer = LXMPeer.from_bytes(serialised_peer, self)
|
||||||
if peer.identity != None:
|
if peer.identity != None:
|
||||||
self.peers[peer.destination_hash] = peer
|
self.peers[peer.destination_hash] = peer
|
||||||
|
RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages", RNS.LOG_DEBUG)
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("Loaded peer "+RNS.prettyhexrep(peer.destination_hash)+" with "+str(len(peer.unhandled_messages))+" unhandled messages")
|
|
||||||
else:
|
else:
|
||||||
RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.")
|
RNS.log("Peer "+RNS.prettyhexrep(peer.destination_hash)+" could not be loaded, because its identity could not be recalled. Dropping peer.", RNS.LOG_DEBUG)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
self.propagation_node = True
|
self.propagation_node = True
|
||||||
@ -1185,16 +1142,10 @@ class LXMRouter:
|
|||||||
self.propagation_destination.announce(app_data=data)
|
self.propagation_destination.announce(app_data=data)
|
||||||
|
|
||||||
def offer_request(self, path, data, request_id, remote_identity, requested_at):
|
def offer_request(self, path, data, request_id, remote_identity, requested_at):
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("Offer request")
|
|
||||||
|
|
||||||
if remote_identity == None:
|
if remote_identity == None:
|
||||||
return LXMPeer.ERROR_NO_IDENTITY
|
return LXMPeer.ERROR_NO_IDENTITY
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
# TODO: Remove
|
|
||||||
RNS.log(str(data))
|
|
||||||
|
|
||||||
transient_ids = data
|
transient_ids = data
|
||||||
wanted_ids = []
|
wanted_ids = []
|
||||||
|
|
||||||
@ -1216,9 +1167,6 @@ class LXMRouter:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
def message_get_request(self, path, data, request_id, remote_identity, requested_at):
|
def message_get_request(self, path, data, request_id, remote_identity, requested_at):
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("Message get request")
|
|
||||||
|
|
||||||
if remote_identity == None:
|
if remote_identity == None:
|
||||||
return LXMPeer.ERROR_NO_IDENTITY
|
return LXMPeer.ERROR_NO_IDENTITY
|
||||||
else:
|
else:
|
||||||
@ -1238,11 +1186,6 @@ class LXMRouter:
|
|||||||
|
|
||||||
available_messages.sort(key=lambda e: e[1], reverse=False)
|
available_messages.sort(key=lambda e: e[1], reverse=False)
|
||||||
|
|
||||||
# TODO: Remove
|
|
||||||
for am in available_messages:
|
|
||||||
RNS.log("Msg size: "+str(am[1]))
|
|
||||||
##############
|
|
||||||
|
|
||||||
transient_ids = []
|
transient_ids = []
|
||||||
for available_entry in available_messages:
|
for available_entry in available_messages:
|
||||||
transient_ids.append(available_entry[0])
|
transient_ids.append(available_entry[0])
|
||||||
@ -1468,12 +1411,10 @@ class LXMRouter:
|
|||||||
waiting_peers.append(peer)
|
waiting_peers.append(peer)
|
||||||
|
|
||||||
if len(waiting_peers) > 0:
|
if len(waiting_peers) > 0:
|
||||||
# TODO: Remove
|
RNS.log("Randomly selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_EXTREME)
|
||||||
RNS.log("Randomly selecting peer to sync from "+str(len(waiting_peers))+" waiting peers.", RNS.LOG_DEBUG)
|
|
||||||
selected_index = random.randint(0,len(waiting_peers)-1)
|
selected_index = random.randint(0,len(waiting_peers)-1)
|
||||||
selected_peer = waiting_peers[selected_index]
|
selected_peer = waiting_peers[selected_index]
|
||||||
# TODO: Remove
|
RNS.log("Selected waiting peer "+str(selected_index)+": "+RNS.prettyhexrep(selected_peer.destination.hash), RNS.LOG_EXTREME)
|
||||||
RNS.log("Selected waiting peer "+str(selected_index)+": "+RNS.prettyhexrep(selected_peer.destination.hash), RNS.LOG_DEBUG)
|
|
||||||
selected_peer.sync()
|
selected_peer.sync()
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user