mirror of
https://github.com/markqvist/Reticulum.git
synced 2025-07-23 15:01:07 -04:00
Proof handling
This commit is contained in:
parent
dedea6ba11
commit
19d9b1a4a5
9 changed files with 283 additions and 58 deletions
153
RNS/Transport.py
153
RNS/Transport.py
|
@ -1,4 +1,7 @@
|
|||
import RNS
|
||||
import time
|
||||
import threading
|
||||
from time import sleep
|
||||
|
||||
class Transport:
|
||||
# Constants
|
||||
|
@ -8,15 +11,65 @@ class Transport:
|
|||
TUNNEL = 0x03;
|
||||
types = [BROADCAST, TRANSPORT, RELAY, TUNNEL]
|
||||
|
||||
interfaces = []
|
||||
destinations = []
|
||||
pending_links = []
|
||||
active_links = []
|
||||
packet_hashlist = []
|
||||
interfaces = [] # All active interfaces
|
||||
destinations = [] # All active destinations
|
||||
pending_links = [] # Links that are being established
|
||||
active_links = [] # Links that are active
|
||||
packet_hashlist = [] # A list of packet hashes for duplicate detection
|
||||
receipts = [] # Receipts of all outgoing packets for proof processing
|
||||
|
||||
jobs_locked = False
|
||||
jobs_running = False
|
||||
job_interval = 0.250
|
||||
receipts_last_checked = 0.0
|
||||
receipts_check_interval = 1.0
|
||||
hashlist_maxsize = 1000000
|
||||
|
||||
@staticmethod
|
||||
def scheduleJobs():
|
||||
thread = threading.Thread(target=Transport.jobloop)
|
||||
thread.setDaemon(True)
|
||||
thread.start()
|
||||
|
||||
@staticmethod
|
||||
def jobloop():
|
||||
while (True):
|
||||
Transport.jobs()
|
||||
sleep(Transport.job_interval)
|
||||
|
||||
@staticmethod
|
||||
def jobs():
|
||||
Transport.jobs_running = True
|
||||
try:
|
||||
if not Transport.jobs_locked:
|
||||
# Process receipts list for timed-out packets
|
||||
if Transport.receipts_last_checked+Transport.receipts_check_interval < time.time():
|
||||
for receipt in Transport.receipts:
|
||||
receipt.checkTimeout()
|
||||
if receipt.status != RNS.PacketReceipt.SENT:
|
||||
Transport.receipts.remove(receipt)
|
||||
|
||||
Transport.receipts_last_checked = time.time()
|
||||
|
||||
# Cull the packet hashlist if it has reached max size
|
||||
while (len(Transport.packet_hashlist) > Transport.hashlist_maxsize):
|
||||
Transport.packet_hashlist.pop(0)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log("An exception occurred while running Transport jobs.", RNS.LOG_ERROR)
|
||||
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||
|
||||
Transport.jobs_running = False
|
||||
|
||||
@staticmethod
|
||||
def outbound(packet):
|
||||
Transport.cacheRaw(packet.raw)
|
||||
while (Transport.jobs_running):
|
||||
sleep(0.1)
|
||||
|
||||
Transport.jobs_locked = True
|
||||
packet.updateHash()
|
||||
sent = False
|
||||
|
||||
for interface in Transport.interfaces:
|
||||
if interface.OUT:
|
||||
should_transmit = True
|
||||
|
@ -27,19 +80,38 @@ class Transport:
|
|||
if should_transmit:
|
||||
RNS.log("Transmitting "+str(len(packet.raw))+" bytes via: "+str(interface), RNS.LOG_DEBUG)
|
||||
interface.processOutgoing(packet.raw)
|
||||
sent = True
|
||||
|
||||
if sent:
|
||||
packet.sent = True
|
||||
packet.sent_at = time.time()
|
||||
|
||||
if (packet.packet_type == RNS.Packet.DATA):
|
||||
packet.receipt = RNS.PacketReceipt(packet)
|
||||
Transport.receipts.append(packet.receipt)
|
||||
|
||||
Transport.cache(packet)
|
||||
|
||||
Transport.jobs_locked = False
|
||||
return sent
|
||||
|
||||
@staticmethod
|
||||
def inbound(raw, interface=None):
|
||||
packet_hash = RNS.Identity.fullHash(raw)
|
||||
RNS.log(str(interface)+" received packet with hash "+RNS.prettyhexrep(packet_hash), RNS.LOG_DEBUG)
|
||||
while (Transport.jobs_running):
|
||||
sleep(0.1)
|
||||
|
||||
Transport.jobs_locked = True
|
||||
|
||||
packet = RNS.Packet(None, raw)
|
||||
packet.unpack()
|
||||
packet.updateHash()
|
||||
packet.receiving_interface = interface
|
||||
|
||||
if not packet_hash in Transport.packet_hashlist:
|
||||
Transport.packet_hashlist.append(packet_hash)
|
||||
packet = RNS.Packet(None, raw)
|
||||
packet.unpack()
|
||||
packet.packet_hash = packet_hash
|
||||
packet.receiving_interface = interface
|
||||
RNS.log(str(interface)+" received packet with hash "+RNS.prettyhexrep(packet.packet_hash), RNS.LOG_DEBUG)
|
||||
|
||||
if not packet.packet_hash in Transport.packet_hashlist:
|
||||
Transport.packet_hashlist.append(packet.packet_hash)
|
||||
|
||||
if packet.packet_type == RNS.Packet.ANNOUNCE:
|
||||
if RNS.Identity.validateAnnounce(packet):
|
||||
Transport.cache(packet)
|
||||
|
@ -64,6 +136,13 @@ class Transport:
|
|||
destination.receive(packet)
|
||||
Transport.cache(packet)
|
||||
|
||||
if destination.proof_strategy == RNS.Destination.PROVE_ALL:
|
||||
packet.prove()
|
||||
|
||||
if destination.proof_strategy == RNS.Destination.PROVE_APP:
|
||||
if destination.callbacks.proof_requested:
|
||||
destination.callbacks.proof_requested(packet)
|
||||
|
||||
if packet.packet_type == RNS.Packet.PROOF:
|
||||
if packet.header_type == RNS.Packet.HEADER_3:
|
||||
# This is a link request proof, forward
|
||||
|
@ -72,11 +151,27 @@ class Transport:
|
|||
if link.link_id == packet.destination_hash:
|
||||
link.validateProof(packet)
|
||||
else:
|
||||
for destination in Transport.destinations:
|
||||
if destination.hash == packet.destination_hash:
|
||||
if destination.proofcallback != None:
|
||||
destination.proofcallback(packet)
|
||||
# TODO: add universal proof handling
|
||||
# TODO: Make sure everything uses new proof handling
|
||||
if len(packet.data) == RNS.PacketReceipt.EXPL_LENGTH:
|
||||
proof_hash = packet.data[:RNS.Identity.HASHLENGTH/8]
|
||||
else:
|
||||
proof_hash = None
|
||||
|
||||
for receipt in Transport.receipts:
|
||||
receipt_validated = False
|
||||
if proof_hash != None:
|
||||
# Only test validation if hash matches
|
||||
if receipt.hash == proof_hash:
|
||||
receipt_validated = receipt.validateProofPacket(packet)
|
||||
else:
|
||||
# In case of an implicit proof, we have
|
||||
# to check every single outstanding receipt
|
||||
receipt_validated = receipt.validateProofPacket(packet)
|
||||
|
||||
if receipt_validated:
|
||||
Transport.receipts.remove(receipt)
|
||||
|
||||
Transport.jobs_locked = False
|
||||
|
||||
@staticmethod
|
||||
def registerDestination(destination):
|
||||
|
@ -112,15 +207,13 @@ class Transport:
|
|||
@staticmethod
|
||||
def cache(packet):
|
||||
if RNS.Transport.shouldCache(packet):
|
||||
RNS.Transport.cacheRaw(packet.raw)
|
||||
try:
|
||||
packet_hash = RNS.hexrep(packet.getHash(), delimit=False)
|
||||
file = open(RNS.Reticulum.cachepath+"/"+packet_hash, "w")
|
||||
file.write(packet.raw)
|
||||
file.close()
|
||||
RNS.log("Wrote packet "+packet_hash+" to cache", RNS.LOG_DEBUG)
|
||||
except Exception as e:
|
||||
RNS.log("Error writing packet to cache", RNS.LOG_ERROR)
|
||||
RNS.log("The contained exception was: "+str(e))
|
||||
|
||||
@staticmethod
|
||||
def cacheRaw(raw):
|
||||
try:
|
||||
file = open(RNS.Reticulum.cachepath+"/"+RNS.hexrep(RNS.Identity.fullHash(raw), delimit=False), "w")
|
||||
file.write(raw)
|
||||
file.close()
|
||||
RNS.log("Wrote packet "+RNS.prettyhexrep(RNS.Identity.fullHash(raw))+" to cache", RNS.LOG_DEBUG)
|
||||
except Exception as e:
|
||||
RNS.log("Error writing packet to cache", RNS.LOG_ERROR)
|
||||
RNS.log("The contained exception was: "+str(e))
|
Loading…
Add table
Add a link
Reference in a new issue