Few more bugs addressed

This commit is contained in:
Aaron Heise 2023-02-10 03:55:00 -06:00
parent 3183923c8c
commit a8095b6270
4 changed files with 41 additions and 72 deletions

View file

@ -65,10 +65,6 @@ _destination: RNS.Destination | None = None
_pool: ThreadPool = ThreadPool(10)
async def _pump_int(timeout: float = 0):
if timeout == 0:
if _finished.done():
raise _finished.exception()
return
try:
await asyncio.wait_for(_finished, timeout=timeout)
except asyncio.exceptions.CancelledError:
@ -76,9 +72,11 @@ async def _pump_int(timeout: float = 0):
except TimeoutError:
pass
def _handle_sigint_with_async_int(signal, frame):
global _finished
if _finished is not None:
_finished.set_exception(KeyboardInterrupt())
_finished.get_loop().call_soon_threadsafe(_finished.set_exception, KeyboardInterrupt())
else:
raise KeyboardInterrupt()
@ -168,11 +166,11 @@ async def _listen(configdir, command, identitypath=None, service_name="default",
if not disable_announce:
_destination.announce()
last = time.monotonic()
last = time.time()
try:
while True:
if not disable_announce and time.monotonic() - last > 900: # TODO: make parameter
if not disable_announce and time.time() - last > 900: # TODO: make parameter
last = datetime.datetime.now()
_destination.announce()
await _pump_int(1.0)
@ -475,21 +473,15 @@ def _listen_request(path, data, request_id, link_id, remote_identity, requested_
async def _spin(until: Callable | None = None, timeout: float | None = None) -> bool:
global _pool
finished = asyncio.get_running_loop().create_future()
if timeout is not None:
timeout += time.time()
def inner_spin():
while (timeout is None or time.time() < timeout) and not until():
if _finished.exception():
finished.set_exception(_finished.exception())
break
time.sleep(0.005)
if timeout is not None and time.time() > timeout:
finished.set_result(False)
else:
finished.set_result(True)
_pool.apply_async(inner_spin)
return await finished
while (timeout is None or time.time() < timeout) and not until():
await _pump_int(0.01)
if timeout is not None and time.time() > timeout:
return False
else:
return True
_link: RNS.Link | None = None
_remote_exec_grace = 2.0
@ -508,7 +500,7 @@ def _client_packet_handler(message, packet):
async def _execute(configdir, identitypath=None, verbosity=0, quietness=0, noid=False, destination=None,
service_name="default", stdin=None, timeout=RNS.Transport.PATH_REQUEST_TIMEOUT):
global _identity, _reticulum, _link, _destination, _remote_exec_grace
global _identity, _reticulum, _link, _destination, _remote_exec_grace, _tr
log = _get_logger("_execute")
dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH // 8) * 2
@ -530,6 +522,7 @@ async def _execute(configdir, identitypath=None, verbosity=0, quietness=0, noid=
if not RNS.Transport.has_path(destination_hash):
RNS.Transport.request_path(destination_hash)
log.info(f"Requesting path...")
if not await _spin(until=lambda: RNS.Transport.has_path(destination_hash), timeout=timeout):
raise Exception("Path not found")
@ -547,6 +540,7 @@ async def _execute(configdir, identitypath=None, verbosity=0, quietness=0, noid=
_link = RNS.Link(_destination)
_link.did_identify = False
log.info(f"Establishing link...")
if not await _spin(until=lambda: _link.status == RNS.Link.ACTIVE, timeout=timeout):
raise Exception("Could not establish link with " + RNS.prettyhexrep(destination_hash))
@ -603,6 +597,7 @@ async def _execute(configdir, identitypath=None, verbosity=0, quietness=0, noid=
except Exception as e:
raise Exception(f"Received invalid response") from e
_tr.raw()
if stdout is not None:
stdout = base64.b64decode(stdout)
# log.debug(f"stdout: {stdout}")
@ -620,6 +615,7 @@ async def _execute(configdir, identitypath=None, verbosity=0, quietness=0, noid=
async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness: int, noid: bool, destination: str,
service_name: str, timeout: float):
global _new_data, _finished, _tr
log = _get_logger("_initiate")
loop = asyncio.get_event_loop()
_new_data = asyncio.Event()
@ -634,7 +630,7 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness
data_buffer = bytearray()
def sigint_handler(signal, frame):
# log.debug("KeyboardInterrupt")
log.debug("KeyboardInterrupt")
data_buffer.extend("\x03".encode("utf-8"))
def sigwinch_handler(signal, frame):
@ -642,7 +638,6 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness
if _new_data is not None:
_new_data.set()
_tr.raw()
def stdin():
data = process.tty_read(sys.stdin.fileno())
# log.debug(f"stdin {data}")
@ -653,7 +648,6 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness
await _pump_int()
signal.signal(signal.SIGWINCH, sigwinch_handler)
signal.signal(signal.SIGINT, sigint_handler)
while True:
stdin = data_buffer.copy()
data_buffer.clear()
@ -670,6 +664,7 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness
stdin=stdin,
timeout=timeout,
)
signal.signal(signal.SIGINT, sigint_handler)
if return_code is not None:
_link.teardown()
return return_code
@ -743,7 +738,7 @@ Options:
args_program_args = args.get("<arg>", None) or []
args_no_id = args.get("--no-id", None) or False
args_mirror = args.get("--mirror", None) or False
args_timeout = args.get("--timeout", None)
args_timeout = args.get("--timeout", None) or RNS.Transport.PATH_REQUEST_TIMEOUT
args_destination = args.get("<destination_hash>", None)
args_help = args.get("--help", None) or False