First functional build.

This commit is contained in:
Aaron Heise 2023-02-10 18:11:48 -06:00
parent 2239f69953
commit ced3ee2f98
3 changed files with 51 additions and 23 deletions

View File

@ -2,6 +2,7 @@ import asyncio
import contextlib import contextlib
import errno import errno
import functools import functools
import re
import signal import signal
import struct import struct
import threading import threading
@ -164,7 +165,7 @@ class CallbackSubprocess:
# time between checks of child process # time between checks of child process
PROCESS_POLL_TIME: float = 0.1 PROCESS_POLL_TIME: float = 0.1
def __init__(self, argv: [str], term: str | None, loop: asyncio.AbstractEventLoop, stdout_callback: callable, def __init__(self, argv: [str], env: dict | None, loop: asyncio.AbstractEventLoop, stdout_callback: callable,
terminated_callback: callable): terminated_callback: callable):
""" """
Fork a child process and generate callbacks with output from the process. Fork a child process and generate callbacks with output from the process.
@ -180,8 +181,8 @@ class CallbackSubprocess:
self._log = module_logger.getChild(self.__class__.__name__) self._log = module_logger.getChild(self.__class__.__name__)
# self._log.debug(f"__init__({argv},{term},...") # self._log.debug(f"__init__({argv},{term},...")
self._command = argv self._command: [str] = argv
self._term = term self._env = env
self._loop = loop self._loop = loop
self._stdout_cb = stdout_callback self._stdout_cb = stdout_callback
self._terminated_cb = terminated_callback self._terminated_cb = terminated_callback
@ -283,20 +284,35 @@ class CallbackSubprocess:
Start the child process. Start the child process.
""" """
self._log.debug("start()") self._log.debug("start()")
# # Using the parent environment seems to do some weird stuff, at least on macOS
# parentenv = os.environ.copy() # parentenv = os.environ.copy()
# env = {"HOME": parentenv["HOME"], # env = {"HOME": parentenv["HOME"],
# "PATH": parentenv["PATH"],
# "TERM": self._term if self._term is not None else parentenv.get("TERM", "xterm"), # "TERM": self._term if self._term is not None else parentenv.get("TERM", "xterm"),
# "LANG": parentenv.get("LANG"), # "LANG": parentenv.get("LANG"),
# "SHELL": self._command[0]} # "SHELL": self._command[0]}
env = os.environ.copy() env = os.environ.copy()
if self._term is not None: for key in self._env:
env["TERM"] = self._term env[key] = self._env[key]
program = self._command[0]
match = re.search("^/bin/(.*sh)$", program)
if match:
self._command[0] = "-" + match.group(1)
env["SHELL"] = program
self._log.debug(f"set login shell {self._command}")
self._pid, self._child_fd = pty.fork() self._pid, self._child_fd = pty.fork()
if self._pid == 0: if self._pid == 0:
try: try:
# this may not be strictly necessary, but there was
# is occasionally some funny business that goes on
# with networking. Anecdotally this fixed it, but
# more testing is needed as it might be a coincidence.
p = psutil.Process() p = psutil.Process()
for c in p.connections(kind='all'): for c in p.connections(kind='all'):
if c == sys.stdin.fileno() or c == sys.stdout.fileno() or c == sys.stderr.fileno(): if c == sys.stdin.fileno() or c == sys.stdout.fileno() or c == sys.stderr.fileno():
@ -306,9 +322,9 @@ class CallbackSubprocess:
except: except:
pass pass
os.setpgrp() os.setpgrp()
os.execvpe(self._command[0], self._command, env) os.execvpe(program, self._command, env)
except Exception as err: except Exception as err:
print(f"Child process error: {err}") print(f"Child process error: {err}, command: {self._command}")
sys.stdout.flush() sys.stdout.flush()
# don't let any other modules get in our way. # don't let any other modules get in our way.
os._exit(0) os._exit(0)

View File

@ -1,4 +1,5 @@
import asyncio import asyncio
import logging
import threading import threading
import time import time
import logging as __logging import logging as __logging
@ -11,6 +12,7 @@ class RetryStatus:
def __init__(self, tag: any, try_limit: int, wait_delay: float, retry_callback: Callable[[any, int], any], def __init__(self, tag: any, try_limit: int, wait_delay: float, retry_callback: Callable[[any, int], any],
timeout_callback: Callable[[any, int], None], tries: int = 1): timeout_callback: Callable[[any, int], None], tries: int = 1):
self._log = module_logger.getChild(self.__class__.__name__) self._log = module_logger.getChild(self.__class__.__name__)
self._log.setLevel(logging.INFO)
self.tag = tag self.tag = tag
self.try_limit = try_limit self.try_limit = try_limit
self.tries = tries self.tries = tries

View File

@ -167,7 +167,7 @@ async def _listen(configdir, command, identitypath=None, service_name="default",
try: try:
while True: while True:
if not disable_announce and time.time() - last > 900: # TODO: make parameter if not disable_announce and time.time() - last > 900: # TODO: make parameter
last = datetime.datetime.now() last = time.time()
_destination.announce() _destination.announce()
await _check_finished(1.0) await _check_finished(1.0)
except KeyboardInterrupt: except KeyboardInterrupt:
@ -219,13 +219,15 @@ class ProcessState:
data_available_callback: callable, data_available_callback: callable,
terminated_callback: callable, terminated_callback: callable,
term: str | None, term: str | None,
remote_identity: str | None,
loop: asyncio.AbstractEventLoop = None): loop: asyncio.AbstractEventLoop = None):
self._log = _get_logger(self.__class__.__name__) self._log = _get_logger(self.__class__.__name__)
self._mdu = mdu self._mdu = mdu
self._loop = loop if loop is not None else asyncio.get_running_loop() self._loop = loop if loop is not None else asyncio.get_running_loop()
self._process = process.CallbackSubprocess(argv=cmd, self._process = process.CallbackSubprocess(argv=cmd,
term=term, env={ "TERM": term or os.environ.get("TERM", None),
"RNS_REMOTE_IDENTITY": remote_identity or ""},
loop=loop, loop=loop,
stdout_callback=self._stdout_data, stdout_callback=self._stdout_data,
terminated_callback=terminated_callback) terminated_callback=terminated_callback)
@ -297,10 +299,10 @@ class ProcessState:
def _update_winsz(self): def _update_winsz(self):
try: try:
self.process.set_winsize(self._term_state[ProcessState.TERMSTATE_IDX_ROWS], self.process.set_winsize(self._term_state[1],
self._term_state[ProcessState.TERMSTATE_IDX_COLS], self._term_state[2],
self._term_state[ProcessState.TERMSTATE_IDX_HPIX], self._term_state[3],
self._term_state[ProcessState.TERMSTATE_IDX_VPIX]) self._term_state[4])
except Exception as e: except Exception as e:
self._log.debug(f"failed to update winsz: {e}") self._log.debug(f"failed to update winsz: {e}")
@ -354,7 +356,7 @@ class ProcessState:
if stdin is not None and len(stdin) > 0: if stdin is not None and len(stdin) > 0:
stdin = base64.b64decode(stdin) stdin = base64.b64decode(stdin)
self.process.write(stdin) self.process.write(stdin)
response[ProcessState.RESPONSE_IDX_RETCODE] = self.return_code response[ProcessState.RESPONSE_IDX_RETCODE] = None if self.process.running else self.return_code
with self.lock: with self.lock:
stdout = self.read(read_size) stdout = self.read(read_size)
@ -457,13 +459,14 @@ def _subproc_terminated(link: RNS.Link, return_code: int):
_loop.call_soon_threadsafe(cleanup) _loop.call_soon_threadsafe(cleanup)
def _listen_start_proc(link: RNS.Link, term: str, loop: asyncio.AbstractEventLoop) -> ProcessState | None: def _listen_start_proc(link: RNS.Link, remote_identity: str | None, term: str, loop: asyncio.AbstractEventLoop) -> ProcessState | None:
global _cmd global _cmd
log = _get_logger("_listen_start_proc") log = _get_logger("_listen_start_proc")
try: try:
return ProcessState(tag=link.link_id, return ProcessState(tag=link.link_id,
cmd=_cmd, cmd=_cmd,
term=term, term=term,
remote_identity=remote_identity,
mdu=link.MDU, mdu=link.MDU,
loop=loop, loop=loop,
data_available_callback=functools.partial(_subproc_data_ready, link), data_available_callback=functools.partial(_subproc_data_ready, link),
@ -521,7 +524,10 @@ def _listen_request(path, data, request_id, link_id, remote_identity, requested_
process_state = ProcessState.get_for_tag(link.link_id) process_state = ProcessState.get_for_tag(link.link_id)
if process_state is None: if process_state is None:
log.debug(f"Process not found for link {link}") log.debug(f"Process not found for link {link}")
process_state = _listen_start_proc(link, term, _loop) process_state = _listen_start_proc(link=link,
term=term,
remote_identity=RNS.hexrep(remote_identity.hash).replace(":", ""),
loop=_loop)
# leave significant headroom for metadata and encoding # leave significant headroom for metadata and encoding
result = process_state.process_request(data, link.MDU * 3 // 2) result = process_state.process_request(data, link.MDU * 3 // 2)
@ -708,6 +714,7 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness
def sigint_handler(): def sigint_handler():
log.debug("KeyboardInterrupt") log.debug("KeyboardInterrupt")
data_buffer.extend("\x03".encode("utf-8")) data_buffer.extend("\x03".encode("utf-8"))
_new_data.set()
def sigwinch_handler(): def sigwinch_handler():
# log.debug("WindowChanged") # log.debug("WindowChanged")
@ -718,19 +725,21 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness
data = process.tty_read(sys.stdin.fileno()) data = process.tty_read(sys.stdin.fileno())
# log.debug(f"stdin {data}") # log.debug(f"stdin {data}")
if data is not None: if data is not None:
data_buffer.extend(data) data_buffer.extend(data)
_new_data.set()
process.tty_add_reader_callback(sys.stdin.fileno(), stdin) process.tty_add_reader_callback(sys.stdin.fileno(), stdin)
await _check_finished() await _check_finished()
# signal.signal(signal.SIGWINCH, sigwinch_handler)
loop.add_signal_handler(signal.SIGWINCH, sigwinch_handler) loop.add_signal_handler(signal.SIGWINCH, sigwinch_handler)
# leave a lot of overhead
mdu = 64
first_loop = True first_loop = True
while True: while True:
try: try:
log.debug("top of client loop") log.debug("top of client loop")
stdin = data_buffer.copy() stdin = data_buffer[:mdu]
data_buffer.clear() data_buffer = data_buffer[mdu:]
_new_data.clear() _new_data.clear()
log.debug("before _execute") log.debug("before _execute")
return_code = await _execute( return_code = await _execute(
@ -744,9 +753,10 @@ async def _initiate(configdir: str, identitypath: str, verbosity: int, quietness
stdin=stdin, stdin=stdin,
timeout=timeout, timeout=timeout,
) )
# signal.signal(signal.SIGINT, sigint_handler)
if first_loop: if first_loop:
first_loop = False first_loop = False
mdu = _link.MDU * 3 // 2
loop.remove_signal_handler(signal.SIGINT) loop.remove_signal_handler(signal.SIGINT)
loop.add_signal_handler(signal.SIGINT, sigint_handler) loop.add_signal_handler(signal.SIGINT, sigint_handler)
_new_data.set() _new_data.set()
@ -794,7 +804,7 @@ Usage:
rnsh --version rnsh --version
Options: Options:
--config FILE Alternate Reticulum config directory to use --config DIR Alternate Reticulum config directory to use
-i FILE --identity FILE Specific identity file to use -i FILE --identity FILE Specific identity file to use
-s NAME --service NAME Listen on/connect to specific service name if not default -s NAME --service NAME Listen on/connect to specific service name if not default
-p --print-identity Print identity information and exit -p --print-identity Print identity information and exit