Initial framework for channel windowing

This commit is contained in:
Mark Qvist 2023-05-10 18:43:17 +02:00
parent 7df11a6f67
commit a4c64abed4
4 changed files with 184 additions and 56 deletions

View file

@ -223,6 +223,38 @@ class Channel(contextlib.AbstractContextManager):
``Channel`` is not instantiated directly, but rather
obtained from a ``Link`` with ``get_channel()``.
"""
# The initial window size at channel setup
WINDOW = 2
# Absolute minimum window size
WINDOW_MIN = 1
# The maximum window size for transfers on slow links
WINDOW_MAX_SLOW = 10
# The maximum window size for transfers on fast links
WINDOW_MAX_FAST = 75
# For calculating maps and guard segments, this
# must be set to the global maximum window.
WINDOW_MAX = WINDOW_MAX_FAST
# If the fast rate is sustained for this many request
# rounds, the fast link window size will be allowed.
FAST_RATE_THRESHOLD = WINDOW_MAX_SLOW - WINDOW - 2
# If the RTT rate is higher than this value,
# the max window size for fast links will be used.
# The default is 50 Kbps (the value is stored in
# bytes per second, hence the "/ 8").
RATE_FAST = (50*1000) / 8
# The minimum allowed flexibility of the window size.
# The difference between window_max and window_min
# will never be smaller than this value.
WINDOW_FLEXIBILITY = 4
def __init__(self, outlet: ChannelOutletBase):
"""
@ -234,8 +266,10 @@ class Channel(contextlib.AbstractContextManager):
self._rx_ring: collections.deque[Envelope] = collections.deque()
self._message_callbacks: [MessageCallbackType] = []
self._next_sequence = 0
self._next_rx_sequence = 0
self._message_factories: dict[int, Type[MessageBase]] = {}
self._max_tries = 5
self._max_outstanding = Channel.WINDOW
def __enter__(self) -> Channel:
return self
@ -325,21 +359,13 @@ class Channel(contextlib.AbstractContextManager):
ring.insert(i, envelope)
return True
if existing.sequence == envelope.sequence:
RNS.log(f"Envelope: Emplacement of duplicate envelope sequence.", RNS.LOG_EXTREME)
RNS.log(f"Envelope: Emplacement of duplicate envelope with sequence "+str(envelope.sequence), RNS.LOG_EXTREME)
return False
i += 1
envelope.tracked = True
ring.append(envelope)
return True
def _prune_rx_ring(self):
with self._lock:
# Implementation for fixed window = 1
stale = list(sorted(self._rx_ring, key=lambda env: env.sequence, reverse=True))[1:]
for env in stale:
env.tracked = False
self._rx_ring.remove(env)
def _run_callbacks(self, message: MessageBase):
with self._lock:
cbs = self._message_callbacks.copy()
@ -349,24 +375,61 @@ class Channel(contextlib.AbstractContextManager):
if cb(message):
return
except Exception as ex:
RNS.log(f"Channel: Error running message callback: {ex}", RNS.LOG_ERROR)
RNS.log(f"Channel "+str(self)+" experienced an error while running message callback: {ex}", RNS.LOG_ERROR)
def _receive(self, raw: bytes):
try:
envelope = Envelope(outlet=self._outlet, raw=raw)
with self._lock:
message = envelope.unpack(self._message_factories)
prev_env = self._rx_ring[0] if len(self._rx_ring) > 0 else None
if prev_env and envelope.sequence != (prev_env.sequence + 1) % 0x10000:
RNS.log("Channel: Out of order packet received", RNS.LOG_EXTREME)
# TODO: Test sequence overflow
if envelope.sequence < self._next_rx_sequence:
window_overflow = (self._next_rx_sequence+Channel.WINDOW_MAX) % 0x10000
if window_overflow < self._next_rx_sequence:
if envelope.sequence > window_overflow:
RNS.log("Invalid packet sequence ("+str(envelope.sequence)+") received on channel "+str(self), RNS.LOG_DEBUG)
return
else:
if envelope.sequence < self._next_rx_sequence:
RNS.log("Invalid packet sequence ("+str(envelope.sequence)+") received on channel "+str(self), RNS.LOG_DEBUG)
return
is_new = self._emplace_envelope(envelope, self._rx_ring)
self._prune_rx_ring()
if not is_new:
RNS.log("Channel: Duplicate message received", RNS.LOG_EXTREME)
RNS.log("Duplicate message received on channel "+str(self), RNS.LOG_EXTREME)
return
RNS.log(f"Message received: {message}", RNS.LOG_DEBUG)
threading.Thread(target=self._run_callbacks, name="Message Callback", args=[message], daemon=True).start()
else:
# TODO: Remove
# rmsg = "RX Ring State:\n "
# for e in self._rx_ring:
# rmsg += "["+str(e.sequence)+"]"
# rmsg += "\n"
# RNS.log(rmsg)
# print(rmsg)
with self._lock:
contigous = []
for e in self._rx_ring:
if e.sequence == self._next_rx_sequence:
contigous.append(e)
self._next_rx_sequence = (self._next_rx_sequence + 1) % 0x10000
for e in contigous:
m = e.unpack(self._message_factories)
self._rx_ring.remove(e)
threading.Thread(target=self._run_callbacks, name="Message Callback", args=[m], daemon=True).start()
# TODO: Remove
# rmsg = "RX Ring State:\n "
# for e in self._rx_ring:
# rmsg += "["+str(e.sequence)+"]"
# rmsg += "\n"
# RNS.log(rmsg)
# print(rmsg)
except Exception as ex:
RNS.log(f"Channel: Error receiving data: {ex}")
@ -381,14 +444,15 @@ class Channel(contextlib.AbstractContextManager):
return False
with self._lock:
outstanding = 0
for envelope in self._tx_ring:
if envelope.outlet == self._outlet and (not envelope.packet
or self._outlet.get_packet_state(envelope.packet) == MessageState.MSGSTATE_SENT):
# TODO: Check if this should be enabled with some kind of
# rate limiting, since it currently floods log output when
# messages are waiting.
# RNS.log("Channel: Link has a pending message.", RNS.LOG_EXTREME)
return False
if envelope.outlet == self._outlet:
if not envelope.packet or not self._outlet.get_packet_state(envelope.packet) == MessageState.MSGSTATE_DELIVERED:
outstanding += 1
if outstanding >= self._max_outstanding:
return False
return True
def _packet_tx_op(self, packet: TPacket, op: Callable[[TPacket], bool]):
@ -419,6 +483,7 @@ class Channel(contextlib.AbstractContextManager):
return True
envelope.tries += 1
self._outlet.resend(envelope.packet)
self._outlet.set_packet_delivered_callback(envelope.packet, self._packet_delivered)
self._outlet.set_packet_timeout_callback(envelope.packet, self._packet_timeout, self._get_packet_timeout_time(envelope.tries))
return False
@ -449,6 +514,25 @@ class Channel(contextlib.AbstractContextManager):
envelope.tries += 1
self._outlet.set_packet_delivered_callback(envelope.packet, self._packet_delivered)
self._outlet.set_packet_timeout_callback(envelope.packet, self._packet_timeout, self._get_packet_timeout_time(envelope.tries))
# TODO: Remove
# try:
# tmsg = "TX Ring State:\n "
# for e in self._tx_ring:
# estat="u"
# status = e.packet.receipt.get_status()
# if status == RNS.PacketReceipt.SENT:
# estat="s"
# if status == RNS.PacketReceipt.DELIVERED:
# estat="d"
# if status == RNS.PacketReceipt.FAILED:
# estat="f"
# tmsg += "["+str(e.sequence)+estat+"]"
# print(tmsg)
# RNS.log(tmsg)
# except:
# pass
return envelope
@property
@ -483,7 +567,8 @@ class LinkChannelOutlet(ChannelOutletBase):
def resend(self, packet: RNS.Packet) -> RNS.Packet:
RNS.log("Resending packet " + RNS.prettyhexrep(packet.packet_hash), RNS.LOG_DEBUG)
if not packet.resend():
receipt = packet.resend()
if not receipt:
RNS.log("Failed to resend packet", RNS.LOG_ERROR)
return packet
@ -538,4 +623,7 @@ class LinkChannelOutlet(ChannelOutletBase):
packet.receipt.set_delivery_callback(inner if callback else None)
def get_packet_id(self, packet: RNS.Packet) -> any:
return packet.get_hash()
if packet and hasattr(packet, "get_hash") and callable(packet.get_hash):
return packet.get_hash()
else:
return None