Rate limit window change event.

Also added some scaling for pumping event loops to improve CPU usage.
Version bump for release
This commit is contained in:
Aaron Heise 2023-02-18 18:31:59 -06:00
parent b72f9bbfdf
commit 04b2660d41
No known key found for this signature in database
GPG Key ID: 6BA54088C41DE8BF
3 changed files with 47 additions and 12 deletions

View File

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "rnsh" name = "rnsh"
version = "0.0.9" version = "0.0.10"
description = "Shell over Reticulum" description = "Shell over Reticulum"
authors = ["acehoss <acehoss@acehoss.net>"] authors = ["acehoss <acehoss@acehoss.net>"]
license = "MIT" license = "MIT"

View File

@ -1,3 +1,7 @@
import asyncio
import time
def bitwise_or_if(value: int, condition: bool, orval: int): def bitwise_or_if(value: int, condition: bool, orval: int):
if not condition: if not condition:
return value return value
@ -6,3 +10,23 @@ def bitwise_or_if(value: int, condition: bool, orval: int):
def check_and(value: int, andval: int) -> bool: def check_and(value: int, andval: int) -> bool:
return (value & andval) > 0 return (value & andval) > 0
class SleepRate:
def __init__(self, target_period: float):
self.target_period = target_period
self.last_wake = time.time()
def next_sleep_time(self) -> float:
old_last_wake = self.last_wake
self.last_wake = time.time()
next_wake = max(old_last_wake + 0.01, self.last_wake)
sleep_for = next_wake - self.last_wake
return sleep_for if sleep_for > 0 else 0
async def sleep_async(self):
await asyncio.sleep(self.next_sleep_time())
def sleep_block(self):
time.sleep(self.next_sleep_time())

View File

@ -51,6 +51,7 @@ import contextlib
import rnsh.args import rnsh.args
import pwd import pwd
import rnsh.protocol as protocol import rnsh.protocol as protocol
import rnsh.helpers as helpers
module_logger = __logging.getLogger(__name__) module_logger = __logging.getLogger(__name__)
@ -191,15 +192,19 @@ async def _listen(configdir, command, identitypath=None, service_name="default",
if announce_period is not None: if announce_period is not None:
_destination.announce() _destination.announce()
last = time.time() last_announce = time.time()
sleeper = helpers.SleepRate(0.01)
try: try:
while not await _check_finished(): while not await _check_finished():
if announce_period and 0 < announce_period < time.time() - last: if announce_period and 0 < announce_period < time.time() - last_announce:
last = time.time() last_announce = time.time()
_destination.announce() _destination.announce()
await session.ListenerSession.pump_all() if len(session.ListenerSession.sessions) > 0:
await asyncio.sleep(0.01) await session.ListenerSession.pump_all()
await sleeper.sleep_async()
else:
await asyncio.sleep(0.25)
finally: finally:
log.warning("Shutting down") log.warning("Shutting down")
await session.ListenerSession.terminate_all("Shutting down") await session.ListenerSession.terminate_all("Shutting down")
@ -416,17 +421,15 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness
hpix=hpix, hpix=hpix,
vpix=vpix)) vpix=vpix))
loop.add_signal_handler(signal.SIGWINCH, sigwinch_handler) loop.add_signal_handler(signal.SIGWINCH, sigwinch_handler)
mdu = _link.MDU - 16 mdu = _link.MDU - 16
sent_eof = False sent_eof = False
last_winch = time.time()
sleeper = helpers.SleepRate(0.01)
while not await _check_finished() and state in [InitiatorState.IS_RUNNING]: while not await _check_finished() and state in [InitiatorState.IS_RUNNING]:
try: try:
try: try:
packet = _pq.get_nowait() packet = _pq.get(timeout=sleeper.next_sleep_time())
message = messenger.receive(packet) message = messenger.receive(packet)
if isinstance(message, protocol.StreamDataMessage): if isinstance(message, protocol.StreamDataMessage):
@ -468,6 +471,14 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness
messenger.send(outlet, protocol.StreamDataMessage(protocol.StreamDataMessage.STREAM_ID_STDIN, messenger.send(outlet, protocol.StreamDataMessage(protocol.StreamDataMessage.STREAM_ID_STDIN,
stdin, eof)) stdin, eof))
sent_eof = eof sent_eof = eof
# send window change, but rate limited
if winch and time.time() - last_winch > _link.rtt * 25:
last_winch = time.time()
winch = False
with contextlib.suppress(Exception):
r, c, h, v = process.tty_get_winsize(0)
messenger.send(outlet, protocol.WindowSizeMessage(r, c, h, v))
except RemoteExecutionError as e: except RemoteExecutionError as e:
print(e.msg) print(e.msg)
return 255 return 255
@ -478,7 +489,7 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness
return 127 return 127
# await process.event_wait_any([_new_data, _finished], timeout=min(max(rtt * 50, 5), 120)) # await process.event_wait_any([_new_data, _finished], timeout=min(max(rtt * 50, 5), 120))
await asyncio.sleep(0.01) # await sleeper.sleep_async()
log.debug("after main loop") log.debug("after main loop")
return 0 return 0