diff --git a/rnsh/rnsh.py b/rnsh/rnsh.py index 77f737d..e738334 100644 --- a/rnsh/rnsh.py +++ b/rnsh/rnsh.py @@ -201,8 +201,9 @@ async def _listen(configdir, command, identitypath=None, service_name="default", last_announce = time.time() _destination.announce() if len(session.ListenerSession.sessions) > 0: - await session.ListenerSession.pump_all() - await sleeper.sleep_async() + # no sleep if there's work to do + if not await session.ListenerSession.pump_all(): + await sleeper.sleep_async() else: await asyncio.sleep(0.25) finally: @@ -426,12 +427,13 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness sent_eof = False last_winch = time.time() sleeper = helpers.SleepRate(0.01) + processed = False while not await _check_finished() and state in [InitiatorState.IS_RUNNING]: try: try: - packet = _pq.get(timeout=sleeper.next_sleep_time()) + packet = _pq.get(timeout=sleeper.next_sleep_time() if not processed else 0.0005) message = messenger.receive(packet) - + processed = True if isinstance(message, protocol.StreamDataMessage): if message.stream_id == protocol.StreamDataMessage.STREAM_ID_STDOUT: if message.data and len(message.data) > 0: @@ -461,7 +463,7 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness return 200 except queue.Empty: - pass + processed = False if messenger.is_outlet_ready(outlet): stdin = data_buffer[:mdu] @@ -471,6 +473,7 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness messenger.send(outlet, protocol.StreamDataMessage(protocol.StreamDataMessage.STREAM_ID_STDIN, stdin, eof)) sent_eof = eof + processed = True # send window change, but rate limited if winch and time.time() - last_winch > _link.rtt * 25: @@ -479,6 +482,7 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness with contextlib.suppress(Exception): r, c, h, v = process.tty_get_winsize(0) messenger.send(outlet, protocol.WindowSizeMessage(r, c, h, v)) + processed = True except RemoteExecutionError as e: print(e.msg) return 255 diff --git a/rnsh/session.py b/rnsh/session.py index 566d362..233fe61 100644 --- a/rnsh/session.py +++ b/rnsh/session.py @@ -187,9 +187,11 @@ class ListenerSession: self._set_state(LSState.LSSTATE_WAIT_VERS) @classmethod - async def pump_all(cls): + async def pump_all(cls) -> True: + processed_any = False for session in cls.sessions: - session.pump() + processed = session.pump() + processed_any = processed_any or processed await asyncio.sleep(0) @@ -199,13 +201,12 @@ class ListenerSession: session.terminate(reason) await asyncio.sleep(0) - def pump(self): - + def pump(self) -> bool: try: if self.state != LSState.LSSTATE_RUNNING: - return + return False elif not self.messenger.is_outlet_ready(self.outlet): - return + return False elif len(self.stderr_buf) > 0: mdu = self.outlet.mdu - 16 data = self.stderr_buf[:mdu] @@ -217,6 +218,7 @@ class ListenerSession: self.send(msg) if send_eof: self.stderr_eof_sent = True + return True elif len(self.stdout_buf) > 0: mdu = self.outlet.mdu - 16 data = self.stdout_buf[:mdu] @@ -226,6 +228,9 @@ class ListenerSession: msg = protocol.StreamDataMessage(protocol.StreamDataMessage.STREAM_ID_STDOUT, data, send_eof) self.send(msg) + if send_eof: + self.stdout_eof_sent = True + return True elif self.return_code is not None and not self.return_code_sent: msg = protocol.CommandExitedMessage(self.return_code) self.send(msg) @@ -233,8 +238,10 @@ class ListenerSession: self._call(functools.partial(self._check_protocol_timeout, lambda: self.state == LSState.LSSTATE_RUNNING, "CommandExitedMessage"), max(self.outlet.rtt * 5, 10)) + return False except Exception as ex: self._log.exception("Error during pump", ex) + return False def _terminate_process(self): with contextlib.suppress(Exception):