diff --git a/rnsh/process.py b/rnsh/process.py index 628f9f6..21c6093 100644 --- a/rnsh/process.py +++ b/rnsh/process.py @@ -324,30 +324,6 @@ class CallbackSubprocess: def return_code(self) -> int | None: return self._return_code -# # from https://gist.github.com/bruce-shi/fd0e3f5e2360c64bc9ce2efb254744f7 -# from collections import defaultdict -# class disable_signals(object): -# def __init__(self, disabled_signals=None): -# self.stashed_signals = defaultdict(list) -# self.disabled_signals = disabled_signals or [] -# -# def __enter__(self): -# for signal in self.disabled_signals: -# self.disconnect(signal) -# -# def __exit__(self, exc_type, exc_val, exc_tb): -# for signal in list(self.stashed_signals): -# self.reconnect(signal) -# -# def disconnect(self, signal): -# self.stashed_signals[signal] = signal.receivers -# signal.receivers = [] -# -# def reconnect(self, signal): -# signal.receivers = self.stashed_signals.get(signal, []) -# del self.stashed_signals[signal] -# signal.sender_receivers_cache.clear() - async def main(): """ diff --git a/rnsh/retry.py b/rnsh/retry.py index 759acc0..0d0066a 100644 --- a/rnsh/retry.py +++ b/rnsh/retry.py @@ -16,12 +16,12 @@ class RetryStatus: self.wait_delay = wait_delay self.retry_callback = retry_callback self.timeout_callback = timeout_callback - self.try_time = time.monotonic() + self.try_time = time.time() self.completed = False @property def ready(self): - return self.try_time + self.wait_delay < time.monotonic() and not self.completed + return self.try_time + self.wait_delay < time.time() and not self.completed @property def timed_out(self): @@ -59,10 +59,8 @@ class RetryThread: return self._finished def _thread_run(self): - last_run = time.monotonic() while self._run and self._finished is None: - time.sleep(last_run + self._loop_period - time.monotonic()) - last_run = time.monotonic() + time.sleep(self._loop_period) ready: list[RetryStatus] = [] prune: list[RetryStatus] = [] with self._lock: diff --git a/rnsh/rnsh.py b/rnsh/rnsh.py index 5d35cc9..4331c1e 100644 --- a/rnsh/rnsh.py +++ b/rnsh/rnsh.py @@ -65,10 +65,6 @@ _destination: RNS.Destination | None = None _pool: ThreadPool = ThreadPool(10) async def _pump_int(timeout: float = 0): - if timeout == 0: - if _finished.done(): - raise _finished.exception() - return try: await asyncio.wait_for(_finished, timeout=timeout) except asyncio.exceptions.CancelledError: @@ -76,9 +72,11 @@ async def _pump_int(timeout: float = 0): except TimeoutError: pass + def _handle_sigint_with_async_int(signal, frame): + global _finished if _finished is not None: - _finished.set_exception(KeyboardInterrupt()) + _finished.get_loop().call_soon_threadsafe(_finished.set_exception, KeyboardInterrupt()) else: raise KeyboardInterrupt() @@ -168,11 +166,11 @@ async def _listen(configdir, command, identitypath=None, service_name="default", if not disable_announce: _destination.announce() - last = time.monotonic() + last = time.time() try: while True: - if not disable_announce and time.monotonic() - last > 900: # TODO: make parameter + if not disable_announce and time.time() - last > 900: # TODO: make parameter last = datetime.datetime.now() _destination.announce() await _pump_int(1.0) @@ -475,21 +473,15 @@ def _listen_request(path, data, request_id, link_id, remote_identity, requested_ async def _spin(until: Callable | None = None, timeout: float | None = None) -> bool: global _pool - finished = asyncio.get_running_loop().create_future() + if timeout is not None: + timeout += time.time() - def inner_spin(): - while (timeout is None or time.time() < timeout) and not until(): - if _finished.exception(): - finished.set_exception(_finished.exception()) - break - time.sleep(0.005) - if timeout is not None and time.time() > timeout: - finished.set_result(False) - else: - finished.set_result(True) - - _pool.apply_async(inner_spin) - return await finished + while (timeout is None or time.time() < timeout) and not until(): + await _pump_int(0.01) + if timeout is not None and time.time() > timeout: + return False + else: + return True _link: RNS.Link | None = None _remote_exec_grace = 2.0 @@ -508,7 +500,7 @@ def _client_packet_handler(message, packet): async def _execute(configdir, identitypath=None, verbosity=0, quietness=0, noid=False, destination=None, service_name="default", stdin=None, timeout=RNS.Transport.PATH_REQUEST_TIMEOUT): - global _identity, _reticulum, _link, _destination, _remote_exec_grace + global _identity, _reticulum, _link, _destination, _remote_exec_grace, _tr log = _get_logger("_execute") dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH // 8) * 2 @@ -530,6 +522,7 @@ async def _execute(configdir, identitypath=None, verbosity=0, quietness=0, noid= if not RNS.Transport.has_path(destination_hash): RNS.Transport.request_path(destination_hash) + log.info(f"Requesting path...") if not await _spin(until=lambda: RNS.Transport.has_path(destination_hash), timeout=timeout): raise Exception("Path not found") @@ -547,6 +540,7 @@ async def _execute(configdir, identitypath=None, verbosity=0, quietness=0, noid= _link = RNS.Link(_destination) _link.did_identify = False + log.info(f"Establishing link...") if not await _spin(until=lambda: _link.status == RNS.Link.ACTIVE, timeout=timeout): raise Exception("Could not establish link with " + RNS.prettyhexrep(destination_hash)) @@ -603,6 +597,7 @@ async def _execute(configdir, identitypath=None, verbosity=0, quietness=0, noid= except Exception as e: raise Exception(f"Received invalid response") from e + _tr.raw() if stdout is not None: stdout = base64.b64decode(stdout) # log.debug(f"stdout: {stdout}") @@ -620,6 +615,7 @@ async def _execute(configdir, identitypath=None, verbosity=0, quietness=0, noid= async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness: int, noid: bool, destination: str, service_name: str, timeout: float): global _new_data, _finished, _tr + log = _get_logger("_initiate") loop = asyncio.get_event_loop() _new_data = asyncio.Event() @@ -634,7 +630,7 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness data_buffer = bytearray() def sigint_handler(signal, frame): - # log.debug("KeyboardInterrupt") + log.debug("KeyboardInterrupt") data_buffer.extend("\x03".encode("utf-8")) def sigwinch_handler(signal, frame): @@ -642,7 +638,6 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness if _new_data is not None: _new_data.set() - _tr.raw() def stdin(): data = process.tty_read(sys.stdin.fileno()) # log.debug(f"stdin {data}") @@ -653,7 +648,6 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness await _pump_int() signal.signal(signal.SIGWINCH, sigwinch_handler) - signal.signal(signal.SIGINT, sigint_handler) while True: stdin = data_buffer.copy() data_buffer.clear() @@ -670,6 +664,7 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness stdin=stdin, timeout=timeout, ) + signal.signal(signal.SIGINT, sigint_handler) if return_code is not None: _link.teardown() return return_code @@ -743,7 +738,7 @@ Options: args_program_args = args.get("", None) or [] args_no_id = args.get("--no-id", None) or False args_mirror = args.get("--mirror", None) or False - args_timeout = args.get("--timeout", None) + args_timeout = args.get("--timeout", None) or RNS.Transport.PATH_REQUEST_TIMEOUT args_destination = args.get("", None) args_help = args.get("--help", None) or False diff --git a/rnsh/rnslogging.py b/rnsh/rnslogging.py index e7a35fb..c6a52a0 100644 --- a/rnsh/rnslogging.py +++ b/rnsh/rnslogging.py @@ -66,20 +66,20 @@ logging.basicConfig( datefmt='%Y-%m-%d %H:%M:%S', handlers=[RnsHandler()]) -#hack for temporarily overriding term settings to make debug print right -_rns_log_orig = RNS.log - -def _rns_log(msg, level=3, _override_destination = False): - tattr = termios.tcgetattr(sys.stdin.fileno()) - tattr_orig = tattr.copy() - # tcflag_t c_iflag; /* input modes */ - # tcflag_t c_oflag; /* output modes */ - # tcflag_t c_cflag; /* control modes */ - # tcflag_t c_lflag; /* local modes */ - # cc_t c_cc[NCCS]; /* special characters */ - tattr[1] = tattr[1] | termios.ONLRET | termios.ONLCR | termios.OPOST - termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, tattr) - _rns_log_orig(msg, level, _override_destination) - termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, tattr_orig) - -RNS.log = _rns_log \ No newline at end of file +# #hack for temporarily overriding term settings to make debug print right +# _rns_log_orig = RNS.log +# +# def _rns_log(msg, level=3, _override_destination = False): +# tattr = termios.tcgetattr(sys.stdin.fileno()) +# tattr_orig = tattr.copy() +# # tcflag_t c_iflag; /* input modes */ +# # tcflag_t c_oflag; /* output modes */ +# # tcflag_t c_cflag; /* control modes */ +# # tcflag_t c_lflag; /* local modes */ +# # cc_t c_cc[NCCS]; /* special characters */ +# tattr[1] = tattr[1] | termios.ONLRET | termios.ONLCR | termios.OPOST +# termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, tattr) +# _rns_log_orig(msg, level, _override_destination) +# termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, tattr_orig) +# +# RNS.log = _rns_log \ No newline at end of file