mirror of
https://github.com/markqvist/rnsh.git
synced 2025-06-19 11:34:13 -04:00
Event loop bursting
Pump event loops dry before sleeping. This has a noticeable positive effect on throughput.
This commit is contained in:
parent
ca836a60b2
commit
b41bcbbc44
2 changed files with 22 additions and 11 deletions
14
rnsh/rnsh.py
14
rnsh/rnsh.py
|
@ -201,8 +201,9 @@ async def _listen(configdir, command, identitypath=None, service_name="default",
|
||||||
last_announce = time.time()
|
last_announce = time.time()
|
||||||
_destination.announce()
|
_destination.announce()
|
||||||
if len(session.ListenerSession.sessions) > 0:
|
if len(session.ListenerSession.sessions) > 0:
|
||||||
await session.ListenerSession.pump_all()
|
# no sleep if there's work to do
|
||||||
await sleeper.sleep_async()
|
if not await session.ListenerSession.pump_all():
|
||||||
|
await sleeper.sleep_async()
|
||||||
else:
|
else:
|
||||||
await asyncio.sleep(0.25)
|
await asyncio.sleep(0.25)
|
||||||
finally:
|
finally:
|
||||||
|
@ -426,12 +427,13 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness
|
||||||
sent_eof = False
|
sent_eof = False
|
||||||
last_winch = time.time()
|
last_winch = time.time()
|
||||||
sleeper = helpers.SleepRate(0.01)
|
sleeper = helpers.SleepRate(0.01)
|
||||||
|
processed = False
|
||||||
while not await _check_finished() and state in [InitiatorState.IS_RUNNING]:
|
while not await _check_finished() and state in [InitiatorState.IS_RUNNING]:
|
||||||
try:
|
try:
|
||||||
try:
|
try:
|
||||||
packet = _pq.get(timeout=sleeper.next_sleep_time())
|
packet = _pq.get(timeout=sleeper.next_sleep_time() if not processed else 0.0005)
|
||||||
message = messenger.receive(packet)
|
message = messenger.receive(packet)
|
||||||
|
processed = True
|
||||||
if isinstance(message, protocol.StreamDataMessage):
|
if isinstance(message, protocol.StreamDataMessage):
|
||||||
if message.stream_id == protocol.StreamDataMessage.STREAM_ID_STDOUT:
|
if message.stream_id == protocol.StreamDataMessage.STREAM_ID_STDOUT:
|
||||||
if message.data and len(message.data) > 0:
|
if message.data and len(message.data) > 0:
|
||||||
|
@ -461,7 +463,7 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness
|
||||||
return 200
|
return 200
|
||||||
|
|
||||||
except queue.Empty:
|
except queue.Empty:
|
||||||
pass
|
processed = False
|
||||||
|
|
||||||
if messenger.is_outlet_ready(outlet):
|
if messenger.is_outlet_ready(outlet):
|
||||||
stdin = data_buffer[:mdu]
|
stdin = data_buffer[:mdu]
|
||||||
|
@ -471,6 +473,7 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness
|
||||||
messenger.send(outlet, protocol.StreamDataMessage(protocol.StreamDataMessage.STREAM_ID_STDIN,
|
messenger.send(outlet, protocol.StreamDataMessage(protocol.StreamDataMessage.STREAM_ID_STDIN,
|
||||||
stdin, eof))
|
stdin, eof))
|
||||||
sent_eof = eof
|
sent_eof = eof
|
||||||
|
processed = True
|
||||||
|
|
||||||
# send window change, but rate limited
|
# send window change, but rate limited
|
||||||
if winch and time.time() - last_winch > _link.rtt * 25:
|
if winch and time.time() - last_winch > _link.rtt * 25:
|
||||||
|
@ -479,6 +482,7 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness
|
||||||
with contextlib.suppress(Exception):
|
with contextlib.suppress(Exception):
|
||||||
r, c, h, v = process.tty_get_winsize(0)
|
r, c, h, v = process.tty_get_winsize(0)
|
||||||
messenger.send(outlet, protocol.WindowSizeMessage(r, c, h, v))
|
messenger.send(outlet, protocol.WindowSizeMessage(r, c, h, v))
|
||||||
|
processed = True
|
||||||
except RemoteExecutionError as e:
|
except RemoteExecutionError as e:
|
||||||
print(e.msg)
|
print(e.msg)
|
||||||
return 255
|
return 255
|
||||||
|
|
|
@ -187,9 +187,11 @@ class ListenerSession:
|
||||||
self._set_state(LSState.LSSTATE_WAIT_VERS)
|
self._set_state(LSState.LSSTATE_WAIT_VERS)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def pump_all(cls):
|
async def pump_all(cls) -> True:
|
||||||
|
processed_any = False
|
||||||
for session in cls.sessions:
|
for session in cls.sessions:
|
||||||
session.pump()
|
processed = session.pump()
|
||||||
|
processed_any = processed_any or processed
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
|
||||||
|
@ -199,13 +201,12 @@ class ListenerSession:
|
||||||
session.terminate(reason)
|
session.terminate(reason)
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
def pump(self):
|
def pump(self) -> bool:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if self.state != LSState.LSSTATE_RUNNING:
|
if self.state != LSState.LSSTATE_RUNNING:
|
||||||
return
|
return False
|
||||||
elif not self.messenger.is_outlet_ready(self.outlet):
|
elif not self.messenger.is_outlet_ready(self.outlet):
|
||||||
return
|
return False
|
||||||
elif len(self.stderr_buf) > 0:
|
elif len(self.stderr_buf) > 0:
|
||||||
mdu = self.outlet.mdu - 16
|
mdu = self.outlet.mdu - 16
|
||||||
data = self.stderr_buf[:mdu]
|
data = self.stderr_buf[:mdu]
|
||||||
|
@ -217,6 +218,7 @@ class ListenerSession:
|
||||||
self.send(msg)
|
self.send(msg)
|
||||||
if send_eof:
|
if send_eof:
|
||||||
self.stderr_eof_sent = True
|
self.stderr_eof_sent = True
|
||||||
|
return True
|
||||||
elif len(self.stdout_buf) > 0:
|
elif len(self.stdout_buf) > 0:
|
||||||
mdu = self.outlet.mdu - 16
|
mdu = self.outlet.mdu - 16
|
||||||
data = self.stdout_buf[:mdu]
|
data = self.stdout_buf[:mdu]
|
||||||
|
@ -226,6 +228,9 @@ class ListenerSession:
|
||||||
msg = protocol.StreamDataMessage(protocol.StreamDataMessage.STREAM_ID_STDOUT,
|
msg = protocol.StreamDataMessage(protocol.StreamDataMessage.STREAM_ID_STDOUT,
|
||||||
data, send_eof)
|
data, send_eof)
|
||||||
self.send(msg)
|
self.send(msg)
|
||||||
|
if send_eof:
|
||||||
|
self.stdout_eof_sent = True
|
||||||
|
return True
|
||||||
elif self.return_code is not None and not self.return_code_sent:
|
elif self.return_code is not None and not self.return_code_sent:
|
||||||
msg = protocol.CommandExitedMessage(self.return_code)
|
msg = protocol.CommandExitedMessage(self.return_code)
|
||||||
self.send(msg)
|
self.send(msg)
|
||||||
|
@ -233,8 +238,10 @@ class ListenerSession:
|
||||||
self._call(functools.partial(self._check_protocol_timeout,
|
self._call(functools.partial(self._check_protocol_timeout,
|
||||||
lambda: self.state == LSState.LSSTATE_RUNNING, "CommandExitedMessage"),
|
lambda: self.state == LSState.LSSTATE_RUNNING, "CommandExitedMessage"),
|
||||||
max(self.outlet.rtt * 5, 10))
|
max(self.outlet.rtt * 5, 10))
|
||||||
|
return False
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
self._log.exception("Error during pump", ex)
|
self._log.exception("Error during pump", ex)
|
||||||
|
return False
|
||||||
|
|
||||||
def _terminate_process(self):
|
def _terminate_process(self):
|
||||||
with contextlib.suppress(Exception):
|
with contextlib.suppress(Exception):
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue