From 73faf04ea1b50f890b0fa38988e5381cab5c7487 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Wed, 10 May 2023 20:01:33 +0200 Subject: [PATCH] Tuned channel windowing --- RNS/Channel.py | 69 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 25 deletions(-) diff --git a/RNS/Channel.py b/RNS/Channel.py index db7cb91..c2c1f46 100644 --- a/RNS/Channel.py +++ b/RNS/Channel.py @@ -242,7 +242,7 @@ class Channel(contextlib.AbstractContextManager): # If the fast rate is sustained for this many request # rounds, the fast link window size will be allowed. - FAST_RATE_THRESHOLD = 5 + FAST_RATE_THRESHOLD = 10 # If the RTT rate is higher than this value, # the max window size for fast links will be used. @@ -412,37 +412,21 @@ class Channel(contextlib.AbstractContextManager): contigous.append(e) self._next_rx_sequence = (self._next_rx_sequence + 1) % Channel.SEQ_MODULUS # TODO: Remove + if self._next_rx_sequence > Channel.SEQ_MODULUS - 100: + RNS.log("RX SEQ "+str(self._next_rx_sequence)) + print("RX SEQ "+str(self._next_rx_sequence)) + if self._next_rx_sequence == 0: - RNS.log("SEQ OVERFLOW") + RNS.log("RX SEQ OVERFLOW") + RNS.log("RX SEQ OVERFLOW") + print("RX SEQ OVERFLOW") + print("RX SEQ OVERFLOW") for e in contigous: m = e.unpack(self._message_factories) self._rx_ring.remove(e) threading.Thread(target=self._run_callbacks, name="Message Callback", args=[m], daemon=True).start() - if self.window < self.window_max: - self.window += 1 - if (self.window - self.window_min) > (self.window_flexibility-1): - self.window_min += 1 - - # TODO: Remove - RNS.log("Increased channel window to "+str(self.window), RNS.LOG_DEBUG) - - if self._outlet.rtt != 0: - # TODO: Remove - # RNS.log("Link RTT is "+str(self._outlet.rtt), RNS.LOG_DEBUG) - if self._outlet.rtt > Channel.RTT_FAST: - self.fast_rate_rounds = 0 - - else: - self.fast_rate_rounds += 1 - - if self.window_max < Channel.WINDOW_MAX_FAST and self.fast_rate_rounds == Channel.FAST_RATE_THRESHOLD: - self.window_max = Channel.WINDOW_MAX_FAST - - # TODO: Remove - RNS.log("Increased max channel window to "+str(self.window_max), RNS.LOG_DEBUG) - except Exception as ex: RNS.log(f"Channel: Error receiving data: {ex}") @@ -476,6 +460,30 @@ class Channel(contextlib.AbstractContextManager): envelope.tracked = False if envelope in self._tx_ring: self._tx_ring.remove(envelope) + + if self.window < self.window_max: + self.window += 1 + if (self.window - self.window_min) > (self.window_flexibility-1): + self.window_min += 1 + + # TODO: Remove + RNS.log("Increased channel window to "+str(self.window), RNS.LOG_DEBUG) + + if self._outlet.rtt != 0: + # TODO: Remove + # RNS.log("Link RTT is "+str(self._outlet.rtt), RNS.LOG_DEBUG) + if self._outlet.rtt > Channel.RTT_FAST: + self.fast_rate_rounds = 0 + + else: + self.fast_rate_rounds += 1 + + if self.window_max < Channel.WINDOW_MAX_FAST and self.fast_rate_rounds == Channel.FAST_RATE_THRESHOLD: + self.window_max = Channel.WINDOW_MAX_FAST + + # TODO: Remove + RNS.log("Increased max channel window to "+str(self.window_max), RNS.LOG_DEBUG) + else: RNS.log("Channel: Envelope not found in TX ring", RNS.LOG_DEBUG) if not envelope: @@ -527,6 +535,17 @@ class Channel(contextlib.AbstractContextManager): raise ChannelException(CEType.ME_LINK_NOT_READY, f"Link is not ready") envelope = Envelope(self._outlet, message=message, sequence=self._next_sequence) self._next_sequence = (self._next_sequence + 1) % Channel.SEQ_MODULUS + # TODO: Remove + if self._next_sequence > Channel.SEQ_MODULUS - 100: + RNS.log("RX SEQ "+str(self._next_rx_sequence)) + print("RX SEQ "+str(self._next_rx_sequence)) + + if self._next_sequence == 0: + RNS.log("TX SEQ OVERFLOW") + RNS.log("TX SEQ OVERFLOW") + print("TX SEQ OVERFLOW") + print("TX SEQ OVERFLOW") + self._emplace_envelope(envelope, self._tx_ring) if envelope is None: raise BlockingIOError()