mirror of
https://github.com/markqvist/Reticulum.git
synced 2025-02-25 00:50:20 -05:00
Implemented child interface spawning on AutoInterface
This commit is contained in:
parent
350687eda9
commit
d0ca61f373
@ -50,7 +50,7 @@ class AutoInterface(Interface):
|
|||||||
MULTICAST_PERMANENT_ADDRESS_TYPE = "0"
|
MULTICAST_PERMANENT_ADDRESS_TYPE = "0"
|
||||||
MULTICAST_TEMPORARY_ADDRESS_TYPE = "1"
|
MULTICAST_TEMPORARY_ADDRESS_TYPE = "1"
|
||||||
|
|
||||||
PEERING_TIMEOUT = 7.5
|
PEERING_TIMEOUT = 10.0
|
||||||
|
|
||||||
ALL_IGNORE_IFS = ["lo0"]
|
ALL_IGNORE_IFS = ["lo0"]
|
||||||
DARWIN_IGNORE_IFS = ["awdl0", "llw0", "lo0", "en5"]
|
DARWIN_IGNORE_IFS = ["awdl0", "llw0", "lo0", "en5"]
|
||||||
@ -117,6 +117,8 @@ class AutoInterface(Interface):
|
|||||||
self.interface_servers = {}
|
self.interface_servers = {}
|
||||||
self.multicast_echoes = {}
|
self.multicast_echoes = {}
|
||||||
self.timed_out_interfaces = {}
|
self.timed_out_interfaces = {}
|
||||||
|
self.spawned_interfaces = {}
|
||||||
|
self.write_lock = threading.Lock()
|
||||||
self.mif_deque = deque(maxlen=AutoInterface.MULTI_IF_DEQUE_LEN)
|
self.mif_deque = deque(maxlen=AutoInterface.MULTI_IF_DEQUE_LEN)
|
||||||
self.mif_deque_times = deque(maxlen=AutoInterface.MULTI_IF_DEQUE_LEN)
|
self.mif_deque_times = deque(maxlen=AutoInterface.MULTI_IF_DEQUE_LEN)
|
||||||
self.carrier_changed = False
|
self.carrier_changed = False
|
||||||
@ -132,7 +134,7 @@ class AutoInterface(Interface):
|
|||||||
# Increase peering timeout on Android, due to potential
|
# Increase peering timeout on Android, due to potential
|
||||||
# low-power modes implemented on many chipsets.
|
# low-power modes implemented on many chipsets.
|
||||||
if RNS.vendor.platformutils.is_android():
|
if RNS.vendor.platformutils.is_android():
|
||||||
self.peering_timeout *= 3
|
self.peering_timeout *= 2.5
|
||||||
|
|
||||||
if allowed_interfaces == None:
|
if allowed_interfaces == None:
|
||||||
self.allowed_interfaces = []
|
self.allowed_interfaces = []
|
||||||
@ -312,7 +314,8 @@ class AutoInterface(Interface):
|
|||||||
job_thread.daemon = True
|
job_thread.daemon = True
|
||||||
job_thread.start()
|
job_thread.start()
|
||||||
|
|
||||||
time.sleep(peering_wait)
|
# TODO: Fix this
|
||||||
|
# time.sleep(peering_wait)
|
||||||
|
|
||||||
self.online = True
|
self.online = True
|
||||||
|
|
||||||
@ -327,8 +330,9 @@ class AutoInterface(Interface):
|
|||||||
|
|
||||||
while True:
|
while True:
|
||||||
data, ipv6_src = socket.recvfrom(1024)
|
data, ipv6_src = socket.recvfrom(1024)
|
||||||
|
peering_hash = data[:RNS.Identity.HASHLENGTH//8]
|
||||||
expected_hash = RNS.Identity.full_hash(self.group_id+ipv6_src[0].encode("utf-8"))
|
expected_hash = RNS.Identity.full_hash(self.group_id+ipv6_src[0].encode("utf-8"))
|
||||||
if data == expected_hash:
|
if peering_hash == expected_hash:
|
||||||
self.add_peer(ipv6_src[0], ifname)
|
self.add_peer(ipv6_src[0], ifname)
|
||||||
else:
|
else:
|
||||||
RNS.log(str(self)+" received peering packet on "+str(ifname)+" from "+str(ipv6_src[0])+", but authentication hash was incorrect.", RNS.LOG_DEBUG)
|
RNS.log(str(self)+" received peering packet on "+str(ifname)+" from "+str(ipv6_src[0])+", but authentication hash was incorrect.", RNS.LOG_DEBUG)
|
||||||
@ -349,6 +353,10 @@ class AutoInterface(Interface):
|
|||||||
# Remove any timed out peers
|
# Remove any timed out peers
|
||||||
for peer_addr in timed_out_peers:
|
for peer_addr in timed_out_peers:
|
||||||
removed_peer = self.peers.pop(peer_addr)
|
removed_peer = self.peers.pop(peer_addr)
|
||||||
|
if peer_addr in self.spawned_interfaces:
|
||||||
|
spawned_interface = self.spawned_interfaces[peer_addr]
|
||||||
|
spawned_interface.detach()
|
||||||
|
spawned_interface.teardown()
|
||||||
RNS.log(str(self)+" removed peer "+str(peer_addr)+" on "+str(removed_peer[0]), RNS.LOG_DEBUG)
|
RNS.log(str(self)+" removed peer "+str(peer_addr)+" on "+str(removed_peer[0]), RNS.LOG_DEBUG)
|
||||||
|
|
||||||
for ifname in self.adopted_interfaces:
|
for ifname in self.adopted_interfaces:
|
||||||
@ -435,6 +443,10 @@ class AutoInterface(Interface):
|
|||||||
else:
|
else:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@property
|
||||||
|
def peer_count(self):
|
||||||
|
return len(self.spawned_interfaces)
|
||||||
|
|
||||||
def add_peer(self, addr, ifname):
|
def add_peer(self, addr, ifname):
|
||||||
if addr in self.link_local_addresses:
|
if addr in self.link_local_addresses:
|
||||||
ifname = None
|
ifname = None
|
||||||
@ -450,46 +462,62 @@ class AutoInterface(Interface):
|
|||||||
else:
|
else:
|
||||||
if not addr in self.peers:
|
if not addr in self.peers:
|
||||||
self.peers[addr] = [ifname, time.time()]
|
self.peers[addr] = [ifname, time.time()]
|
||||||
|
|
||||||
|
spawned_interface = AutoInterfacePeer(self, addr, ifname)
|
||||||
|
spawned_interface.OUT = self.OUT
|
||||||
|
spawned_interface.IN = self.IN
|
||||||
|
spawned_interface.parent_interface = self
|
||||||
|
spawned_interface.bitrate = self.bitrate
|
||||||
|
|
||||||
|
spawned_interface.ifac_size = self.ifac_size
|
||||||
|
spawned_interface.ifac_netname = self.ifac_netname
|
||||||
|
spawned_interface.ifac_netkey = self.ifac_netkey
|
||||||
|
if spawned_interface.ifac_netname != None or spawned_interface.ifac_netkey != None:
|
||||||
|
ifac_origin = b""
|
||||||
|
if spawned_interface.ifac_netname != None:
|
||||||
|
ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netname.encode("utf-8"))
|
||||||
|
if spawned_interface.ifac_netkey != None:
|
||||||
|
ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netkey.encode("utf-8"))
|
||||||
|
|
||||||
|
ifac_origin_hash = RNS.Identity.full_hash(ifac_origin)
|
||||||
|
spawned_interface.ifac_key = RNS.Cryptography.hkdf(
|
||||||
|
length=64,
|
||||||
|
derive_from=ifac_origin_hash,
|
||||||
|
salt=RNS.Reticulum.IFAC_SALT,
|
||||||
|
context=None
|
||||||
|
)
|
||||||
|
spawned_interface.ifac_identity = RNS.Identity.from_bytes(spawned_interface.ifac_key)
|
||||||
|
spawned_interface.ifac_signature = spawned_interface.ifac_identity.sign(RNS.Identity.full_hash(spawned_interface.ifac_key))
|
||||||
|
|
||||||
|
spawned_interface.announce_rate_target = self.announce_rate_target
|
||||||
|
spawned_interface.announce_rate_grace = self.announce_rate_grace
|
||||||
|
spawned_interface.announce_rate_penalty = self.announce_rate_penalty
|
||||||
|
spawned_interface.mode = self.mode
|
||||||
|
spawned_interface.HW_MTU = self.HW_MTU
|
||||||
|
spawned_interface.online = True
|
||||||
|
RNS.Transport.interfaces.append(spawned_interface)
|
||||||
|
if addr in self.spawned_interfaces:
|
||||||
|
self.spawned_interfaces[addr].detach()
|
||||||
|
self.spawned_interfaces[addr].teardown()
|
||||||
|
self.spawned_interfaces.pop(spawned_interface)
|
||||||
|
self.spawned_interfaces[addr] = spawned_interface
|
||||||
|
|
||||||
RNS.log(str(self)+" added peer "+str(addr)+" on "+str(ifname), RNS.LOG_DEBUG)
|
RNS.log(str(self)+" added peer "+str(addr)+" on "+str(ifname), RNS.LOG_DEBUG)
|
||||||
else:
|
else:
|
||||||
self.refresh_peer(addr)
|
self.refresh_peer(addr)
|
||||||
|
|
||||||
def refresh_peer(self, addr):
|
def refresh_peer(self, addr):
|
||||||
|
try:
|
||||||
self.peers[addr][1] = time.time()
|
self.peers[addr][1] = time.time()
|
||||||
|
except Exception as e:
|
||||||
|
RNS.log(f"An error occurred while refreshing peer {addr} on {self}: {e}", RNS.LOG_ERROR)
|
||||||
|
|
||||||
def process_incoming(self, data):
|
def process_incoming(self, data, addr=None):
|
||||||
if self.online:
|
if self.online and addr in self.spawned_interfaces:
|
||||||
data_hash = RNS.Identity.full_hash(data)
|
self.spawned_interfaces[addr].process_incoming(data, addr)
|
||||||
deque_hit = False
|
|
||||||
if data_hash in self.mif_deque:
|
|
||||||
for te in self.mif_deque_times:
|
|
||||||
if te[0] == data_hash and time.time() < te[1]+AutoInterface.MULTI_IF_DEQUE_TTL:
|
|
||||||
deque_hit = True
|
|
||||||
break
|
|
||||||
|
|
||||||
if not deque_hit:
|
|
||||||
self.mif_deque.append(data_hash)
|
|
||||||
self.mif_deque_times.append([data_hash, time.time()])
|
|
||||||
self.rxb += len(data)
|
|
||||||
self.owner.inbound(data, self)
|
|
||||||
|
|
||||||
def process_outgoing(self,data):
|
def process_outgoing(self,data):
|
||||||
if self.online:
|
pass
|
||||||
for peer in self.peers:
|
|
||||||
try:
|
|
||||||
if self.outbound_udp_socket == None:
|
|
||||||
self.outbound_udp_socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
|
|
||||||
|
|
||||||
peer_addr = str(peer)+"%"+str(self.interface_name_to_index(self.peers[peer][0]))
|
|
||||||
addr_info = socket.getaddrinfo(peer_addr, self.data_port, socket.AF_INET6, socket.SOCK_DGRAM)
|
|
||||||
self.outbound_udp_socket.sendto(data, addr_info[0][4])
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
RNS.log("Could not transmit on "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
|
|
||||||
|
|
||||||
|
|
||||||
self.txb += len(data)
|
|
||||||
|
|
||||||
|
|
||||||
# Until per-device sub-interfacing is implemented,
|
# Until per-device sub-interfacing is implemented,
|
||||||
# ingress limiting should be disabled on AutoInterface
|
# ingress limiting should be disabled on AutoInterface
|
||||||
@ -502,6 +530,83 @@ class AutoInterface(Interface):
|
|||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "AutoInterface["+self.name+"]"
|
return "AutoInterface["+self.name+"]"
|
||||||
|
|
||||||
|
class AutoInterfacePeer(Interface):
|
||||||
|
|
||||||
|
def __init__(self, owner, addr, ifname):
|
||||||
|
super().__init__()
|
||||||
|
self.owner = owner
|
||||||
|
self.parent_interface = owner
|
||||||
|
self.addr = addr
|
||||||
|
self.ifname = ifname
|
||||||
|
self.peer_addr = None
|
||||||
|
self.addr_info = None
|
||||||
|
self.HW_MTU = self.owner.HW_MTU
|
||||||
|
self.FIXED_MTU = self.owner.FIXED_MTU
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f"AutoInterfacePeer[{self.ifname}/{self.addr}]"
|
||||||
|
|
||||||
|
def process_incoming(self, data, addr=None):
|
||||||
|
if self.online and self.owner.online:
|
||||||
|
data_hash = RNS.Identity.full_hash(data)
|
||||||
|
deque_hit = False
|
||||||
|
if data_hash in self.owner.mif_deque:
|
||||||
|
for te in self.owner.mif_deque_times:
|
||||||
|
if te[0] == data_hash and time.time() < te[1]+AutoInterface.MULTI_IF_DEQUE_TTL:
|
||||||
|
deque_hit = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if not deque_hit:
|
||||||
|
self.owner.refresh_peer(self.addr)
|
||||||
|
self.owner.mif_deque.append(data_hash)
|
||||||
|
self.owner.mif_deque_times.append([data_hash, time.time()])
|
||||||
|
self.rxb += len(data)
|
||||||
|
self.owner.rxb += len(data)
|
||||||
|
self.owner.owner.inbound(data, self)
|
||||||
|
|
||||||
|
def process_outgoing(self, data):
|
||||||
|
if self.online:
|
||||||
|
with self.owner.write_lock:
|
||||||
|
try:
|
||||||
|
if self.owner.outbound_udp_socket == None: self.owner.outbound_udp_socket = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
|
||||||
|
if self.peer_addr == None: self.peer_addr = str(self.addr)+"%"+str(self.owner.interface_name_to_index(self.ifname))
|
||||||
|
if self.addr_info == None: addr_info = socket.getaddrinfo(self.peer_addr, self.owner.data_port, socket.AF_INET6, socket.SOCK_DGRAM)
|
||||||
|
self.owner.outbound_udp_socket.sendto(data, addr_info[0][4])
|
||||||
|
self.txb += len(data)
|
||||||
|
self.owner.txb += len(data)
|
||||||
|
except Exception as e:
|
||||||
|
RNS.log("Could not transmit on "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||||
|
|
||||||
|
def detach(self):
|
||||||
|
self.online = False
|
||||||
|
self.detached = True
|
||||||
|
|
||||||
|
def teardown(self):
|
||||||
|
if not self.detached:
|
||||||
|
RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is being torn down.", RNS.LOG_ERROR)
|
||||||
|
if RNS.Reticulum.panic_on_interface_error:
|
||||||
|
RNS.panic()
|
||||||
|
|
||||||
|
else:
|
||||||
|
RNS.log("The interface "+str(self)+" is being torn down.", RNS.LOG_VERBOSE)
|
||||||
|
|
||||||
|
self.online = False
|
||||||
|
self.OUT = False
|
||||||
|
self.IN = False
|
||||||
|
|
||||||
|
if self.addr in self.owner.spawned_interfaces:
|
||||||
|
try: self.owner.spawned_interfaces.pop(self.addr)
|
||||||
|
except Exception as e:
|
||||||
|
RNS.log(f"Could not remove {self} from parent interface on detach. The contained exception was: {e}", RNS.LOG_ERROR)
|
||||||
|
|
||||||
|
if self in RNS.Transport.interfaces:
|
||||||
|
RNS.Transport.interfaces.remove(self)
|
||||||
|
|
||||||
|
# Until per-device sub-interfacing is implemented,
|
||||||
|
# ingress limiting should be disabled on AutoInterface
|
||||||
|
def should_ingress_limit(self):
|
||||||
|
return False
|
||||||
|
|
||||||
class AutoInterfaceHandler(socketserver.BaseRequestHandler):
|
class AutoInterfaceHandler(socketserver.BaseRequestHandler):
|
||||||
def __init__(self, callback, *args, **keys):
|
def __init__(self, callback, *args, **keys):
|
||||||
self.callback = callback
|
self.callback = callback
|
||||||
@ -509,4 +614,5 @@ class AutoInterfaceHandler(socketserver.BaseRequestHandler):
|
|||||||
|
|
||||||
def handle(self):
|
def handle(self):
|
||||||
data = self.request[0]
|
data = self.request[0]
|
||||||
self.callback(data)
|
addr = self.client_address[0]
|
||||||
|
self.callback(data, addr)
|
@ -182,12 +182,12 @@ class Interface:
|
|||||||
RNS.log("An error occurred while processing held announces for "+str(self), RNS.LOG_ERROR)
|
RNS.log("An error occurred while processing held announces for "+str(self), RNS.LOG_ERROR)
|
||||||
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
|
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||||
|
|
||||||
def received_announce(self):
|
def received_announce(self, from_spawned=False):
|
||||||
self.ia_freq_deque.append(time.time())
|
self.ia_freq_deque.append(time.time())
|
||||||
if hasattr(self, "parent_interface") and self.parent_interface != None:
|
if hasattr(self, "parent_interface") and self.parent_interface != None:
|
||||||
self.parent_interface.received_announce(from_spawned=True)
|
self.parent_interface.received_announce(from_spawned=True)
|
||||||
|
|
||||||
def sent_announce(self):
|
def sent_announce(self, from_spawned=False):
|
||||||
self.oa_freq_deque.append(time.time())
|
self.oa_freq_deque.append(time.time())
|
||||||
if hasattr(self, "parent_interface") and self.parent_interface != None:
|
if hasattr(self, "parent_interface") and self.parent_interface != None:
|
||||||
self.parent_interface.sent_announce(from_spawned=True)
|
self.parent_interface.sent_announce(from_spawned=True)
|
||||||
|
@ -250,6 +250,7 @@ def program_setup(configdir, dispall=False, verbosity=0, name_filter=None, json=
|
|||||||
if dispall or not (
|
if dispall or not (
|
||||||
name.startswith("LocalInterface[") or
|
name.startswith("LocalInterface[") or
|
||||||
name.startswith("TCPInterface[Client") or
|
name.startswith("TCPInterface[Client") or
|
||||||
|
name.startswith("AutoInterfacePeer[") or
|
||||||
name.startswith("I2PInterfacePeer[Connected peer") or
|
name.startswith("I2PInterfacePeer[Connected peer") or
|
||||||
(name.startswith("I2PInterface[") and ("i2p_connectable" in ifstat and ifstat["i2p_connectable"] == False))
|
(name.startswith("I2PInterface[") and ("i2p_connectable" in ifstat and ifstat["i2p_connectable"] == False))
|
||||||
):
|
):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user