Lots more stuff, still more debugging to do

This commit is contained in:
Aaron Heise 2023-02-10 03:08:58 -06:00
parent fcc73ba31a
commit 3183923c8c
5 changed files with 515 additions and 442 deletions

1
rnsh/__version.py Normal file
View file

@ -0,0 +1 @@
__version__ = "0.0.1"

View file

@ -1,4 +1,5 @@
import asyncio import asyncio
import contextlib
import errno import errno
import functools import functools
import signal import signal
@ -144,6 +145,20 @@ class TtyRestorer:
""" """
termios.tcsetattr(self._fd, termios.TCSADRAIN, self._tattr) termios.tcsetattr(self._fd, termios.TCSADRAIN, self._tattr)
async def event_wait(evt: asyncio.Event, timeout: float) -> bool:
"""
Wait for event to be set, or timeout to expire.
:param evt: asyncio.Event to wait on
:param timeout: maximum number of seconds to wait.
:return: True if event was set, False if timeout expired
"""
# suppress TimeoutError because we'll return False in case of timeout
with contextlib.suppress(asyncio.TimeoutError):
await asyncio.wait_for(evt.wait(), timeout)
return evt.is_set()
class CallbackSubprocess: class CallbackSubprocess:
# time between checks of child process # time between checks of child process
PROCESS_POLL_TIME: float = 0.1 PROCESS_POLL_TIME: float = 0.1
@ -309,6 +324,31 @@ class CallbackSubprocess:
def return_code(self) -> int | None: def return_code(self) -> int | None:
return self._return_code return self._return_code
# # from https://gist.github.com/bruce-shi/fd0e3f5e2360c64bc9ce2efb254744f7
# from collections import defaultdict
# class disable_signals(object):
# def __init__(self, disabled_signals=None):
# self.stashed_signals = defaultdict(list)
# self.disabled_signals = disabled_signals or []
#
# def __enter__(self):
# for signal in self.disabled_signals:
# self.disconnect(signal)
#
# def __exit__(self, exc_type, exc_val, exc_tb):
# for signal in list(self.stashed_signals):
# self.reconnect(signal)
#
# def disconnect(self, signal):
# self.stashed_signals[signal] = signal.receivers
# signal.receivers = []
#
# def reconnect(self, signal):
# signal.receivers = self.stashed_signals.get(signal, [])
# del self.stashed_signals[signal]
# signal.sender_receivers_cache.clear()
async def main(): async def main():
""" """
A test driver for the CallbackProcess class. A test driver for the CallbackProcess class.

View file

@ -2,11 +2,15 @@ import asyncio
import threading import threading
import time import time
import logging as __logging import logging as __logging
from typing import Callable
module_logger = __logging.getLogger(__name__) module_logger = __logging.getLogger(__name__)
class RetryStatus: class RetryStatus:
def __init__(self, id: any, try_limit: int, wait_delay: float, retry_callback: callable[any, int], timeout_callback: callable[any], tries: int = 1): def __init__(self, tag: any, try_limit: int, wait_delay: float, retry_callback: Callable[[any, int], any],
self.id = id timeout_callback: Callable[[any, int], None], tries: int = 1):
self.tag = tag
self.try_limit = try_limit self.try_limit = try_limit
self.tries = tries self.tries = tries
self.wait_delay = wait_delay self.wait_delay = wait_delay
@ -25,22 +29,23 @@ class RetryStatus:
def timeout(self): def timeout(self):
self.completed = True self.completed = True
self.timeout_callback(self.id) self.timeout_callback(self.tag, self.tries)
def retry(self): def retry(self):
self.tries += 1 self.tries += 1
self.retry_callback(self.id, self.tries) self.retry_callback(self.tag, self.tries)
class RetryThread: class RetryThread:
def __init__(self, loop_period: float = 0.25): def __init__(self, loop_period: float = 0.25):
self._log = module_logger.getChild(self.__class__.__name__) self._log = module_logger.getChild(self.__class__.__name__)
self._loop_period = loop_period self._loop_period = loop_period
self._statuses: list[RetryStatus] = [] self._statuses: list[RetryStatus] = []
self._id_counter = 0 self._tag_counter = 0
self._lock = threading.RLock() self._lock = threading.RLock()
self._thread = threading.Thread(target=self._thread_run())
self._run = True self._run = True
self._finished: asyncio.Future | None = None self._finished: asyncio.Future | None = None
self._thread = threading.Thread(target=self._thread_run)
self._thread.start() self._thread.start()
def close(self, loop: asyncio.AbstractEventLoop | None = None) -> asyncio.Future | None: def close(self, loop: asyncio.AbstractEventLoop | None = None) -> asyncio.Future | None:
@ -52,6 +57,7 @@ class RetryThread:
else: else:
self._finished = loop.create_future() self._finished = loop.create_future()
return self._finished return self._finished
def _thread_run(self): def _thread_run(self):
last_run = time.monotonic() last_run = time.monotonic()
while self._run and self._finished is None: while self._run and self._finished is None:
@ -65,52 +71,59 @@ class RetryThread:
try: try:
if not retry.completed: if not retry.completed:
if retry.timed_out: if retry.timed_out:
self._log.debug(f"timed out {retry.id} after {retry.try_limit} tries") self._log.debug(f"timed out {retry.tag} after {retry.try_limit} tries")
retry.timeout() retry.timeout()
prune.append(retry) prune.append(retry)
else: else:
self._log.debug(f"retrying {retry.id}, try {retry.tries + 1}/{retry.try_limit}") self._log.debug(f"retrying {retry.tag}, try {retry.tries + 1}/{retry.try_limit}")
retry.retry() retry.retry()
except Exception as e: except Exception as e:
self._log.error(f"error processing retry id {retry.id}: {e}") self._log.error(f"error processing retry id {retry.tag}: {e}")
prune.append(retry) prune.append(retry)
with self._lock: with self._lock:
for retry in prune: for retry in prune:
self._log.debug(f"pruned retry {retry.id}, retry count {retry.tries}/{retry.try_limit}") self._log.debug(f"pruned retry {retry.tag}, retry count {retry.tries}/{retry.try_limit}")
self._statuses.remove(retry) self._statuses.remove(retry)
if self._finished is not None: if self._finished is not None:
self._finished.set_result(None) self._finished.set_result(None)
def _get_id(self): def _get_next_tag(self):
self._id_counter += 1 self._tag_counter += 1
return self._id_counter return self._tag_counter
def begin(self, try_limit: int, wait_delay: float, try_callback: callable[[any | None, int], any], timeout_callback: callable[any, int], id: int | None = None) -> any: def begin(self, try_limit: int, wait_delay: float, try_callback: Callable[[any, int], any],
timeout_callback: Callable[[any, int], None], tag: int | None = None) -> any:
self._log.debug(f"running first try") self._log.debug(f"running first try")
id = try_callback(id, 1) tag = try_callback(tag, 1)
self._log.debug(f"first try success, got id {id}") self._log.debug(f"first try success, got id {tag}")
with self._lock: with self._lock:
if id is None: if tag is None:
id = self._get_id() tag = self._get_next_tag()
self._statuses.append(RetryStatus(id=id, self._statuses.append(RetryStatus(tag=tag,
tries=1, tries=1,
try_limit=try_limit, try_limit=try_limit,
wait_delay=wait_delay, wait_delay=wait_delay,
retry_callback=try_callback, retry_callback=try_callback,
timeout_callback=timeout_callback)) timeout_callback=timeout_callback))
self._log.debug(f"added retry timer for {id}") self._log.debug(f"added retry timer for {tag}")
def complete(self, id: any):
assert id is not None def complete(self, tag: any):
assert tag is not None
status: RetryStatus | None = None
with self._lock: with self._lock:
status = next(filter(lambda l: l.id == id, self._statuses)) status = next(filter(lambda l: l.tag == tag, self._statuses))
assert status is not None if status is not None:
status.completed = True status.completed = True
self._statuses.remove(status) self._statuses.remove(status)
self._log.debug(f"completed {id}") if status is not None:
self._log.debug(f"completed {tag}")
else:
self._log.debug(f"status not found to complete {tag}")
def complete_all(self): def complete_all(self):
with self._lock: with self._lock:
for status in self._statuses: for status in self._statuses:
status.completed = True status.completed = True
self._log.debug(f"completed {status.id}") self._log.debug(f"completed {status.tag}")
self._statuses.clear() self._statuses.clear()

File diff suppressed because it is too large Load diff

View file

@ -2,8 +2,11 @@ import logging
from logging import Handler, getLevelName from logging import Handler, getLevelName
from types import GenericAlias from types import GenericAlias
import os import os
import tty
import termios
import sys
import RNS import RNS
import json
class RnsHandler(Handler): class RnsHandler(Handler):
""" """
@ -37,7 +40,12 @@ class RnsHandler(Handler):
""" """
try: try:
msg = self.format(record) msg = self.format(record)
# tattr = termios.tcgetattr(sys.stdin.fileno())
# json.dump(tattr, sys.stdout)
# termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, tattr | termios.ONLRET | termios.ONLCR | termios.OPOST)
RNS.log(msg, RnsHandler.get_rns_loglevel(record.levelno)) RNS.log(msg, RnsHandler.get_rns_loglevel(record.levelno))
# termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, tattr)
except RecursionError: # See issue 36272 except RecursionError: # See issue 36272
raise raise
except Exception: except Exception:
@ -49,7 +57,6 @@ class RnsHandler(Handler):
__class_getitem__ = classmethod(GenericAlias) __class_getitem__ = classmethod(GenericAlias)
log_format = '%(name)-40s %(message)s [%(threadName)s]' log_format = '%(name)-40s %(message)s [%(threadName)s]'
logging.basicConfig( logging.basicConfig(
@ -58,3 +65,21 @@ logging.basicConfig(
format=log_format, format=log_format,
datefmt='%Y-%m-%d %H:%M:%S', datefmt='%Y-%m-%d %H:%M:%S',
handlers=[RnsHandler()]) handlers=[RnsHandler()])
#hack for temporarily overriding term settings to make debug print right
_rns_log_orig = RNS.log
def _rns_log(msg, level=3, _override_destination = False):
tattr = termios.tcgetattr(sys.stdin.fileno())
tattr_orig = tattr.copy()
# tcflag_t c_iflag; /* input modes */
# tcflag_t c_oflag; /* output modes */
# tcflag_t c_cflag; /* control modes */
# tcflag_t c_lflag; /* local modes */
# cc_t c_cc[NCCS]; /* special characters */
tattr[1] = tattr[1] | termios.ONLRET | termios.ONLCR | termios.OPOST
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, tattr)
_rns_log_orig(msg, level, _override_destination)
termios.tcsetattr(sys.stdin.fileno(), termios.TCSANOW, tattr_orig)
RNS.log = _rns_log