This commit is contained in:
Aaron Heise 2023-02-08 23:54:24 -06:00
parent 50744830fb
commit 3a935ab6c2

View File

@ -4,14 +4,10 @@ import functools
import signal import signal
import struct import struct
import threading import threading
import time
import tty import tty
import pty import pty
import io
import os import os
import asyncio import asyncio
import subprocess
import typing
import sys import sys
import logging as __logging import logging as __logging
@ -22,37 +18,33 @@ import termios
module_logger = __logging.getLogger(__name__) module_logger = __logging.getLogger(__name__)
# _tattrs: dict[int, list] = {} def tty_add_reader_callback(fd: int, callback: callable, loop: asyncio.AbstractEventLoop | None = None):
# # _fl: dict[int, int] = {} """
# Add an async reader callback for a tty file descriptor.
# def tty_set_now(fd: int):
# global _tattrs
# # fl = fcntl.fcntl(fd, fcntl.F_GETFL)
# # module_logger.debug(f"fl {fd} {fl:032b} {fl}")
# _tattrs[fd] = termios.tcgetattr(fd)
# # _fl[fd] = fl
# # termios.tcsetattr(fd, termios.TCSANOW, termios.)
# # tty.setcbreak(fd, termios.TCSANOW)
# # tty.setraw(fd)
# # fcntl.fcntl(fd, fcntl.F_SETFL, fl & ~(termios.TCSADRAIN | termios.TCSAFLUSH))
# # module_logger.debug(f"fl {fd} {fl:032b} {fl}")
#
# def tty_reset(fd: int):
# global _tattrs
# tattr = _tattrs.get(fd)
# if tattr is not None:
# termios.tcsetattr(fd, termios.TCSANOW, tattr)
#
# # fl = _fl.get(fd)
# # if fl is not None:
# # fcntl.fcntl(fd, fcntl.F_SETFL, fl)
def tty_set_callback(fd: int, callback: callable, loop: asyncio.AbstractEventLoop | None = None): Example usage:
def reader():
data = tty_read(fd)
# do something with data
tty_add_reader_callback(self._si, reader, self._loop)
:param fd: file descriptor
:param callback: callback function
:param loop: asyncio event loop to which the reader should be added. If None, use the currently-running loop.
"""
if loop is None: if loop is None:
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
loop.add_reader(fd, callback) loop.add_reader(fd, callback)
def tty_read(fd: int) -> bytes | None: def tty_read(fd: int) -> bytes | None:
"""
Read available bytes from a tty file descriptor. When used in a callback added to a file descriptor using
tty_add_reader_callback(...), this function creates a solution for non-blocking reads from ttys.
:param fd: tty file descriptor
:return: bytes read
"""
if fd_is_closed(fd): if fd_is_closed(fd):
return None return None
@ -76,12 +68,22 @@ def tty_read(fd: int) -> bytes | None:
return result return result
def fd_is_closed(fd: int) -> bool: def fd_is_closed(fd: int) -> bool:
"""
Check if file descriptor is closed
:param fd: file descriptor
:return: True if file descriptor is closed
"""
try: try:
fcntl.fcntl(fd, fcntl.F_GETFL) < 0 fcntl.fcntl(fd, fcntl.F_GETFL) < 0
except OSError as ose: except OSError as ose:
return ose.errno == errno.EBADF return ose.errno == errno.EBADF
def tty_unset_callbacks(fd: int, loop: asyncio.AbstractEventLoop | None = None): def tty_unset_reader_callbacks(fd: int, loop: asyncio.AbstractEventLoop | None = None):
"""
Remove async reader callbacks for file descriptor.
:param fd: file descriptor
:param loop: asyncio event loop from which to remove callbacks
"""
try: try:
if loop is None: if loop is None:
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
@ -89,17 +91,34 @@ def tty_unset_callbacks(fd: int, loop: asyncio.AbstractEventLoop | None = None):
except: except:
pass pass
def tty_get_size(fd: int) -> [int, int, int ,int]: def tty_get_winsize(fd: int) -> [int, int, int , int]:
"""
Ge the window size of a tty.
:param fd: file descriptor of tty
:return: (rows, cols, h_pixels, v_pixels)
"""
packed = fcntl.ioctl(fd, termios.TIOCGWINSZ, struct.pack('HHHH', 0, 0, 0, 0)) packed = fcntl.ioctl(fd, termios.TIOCGWINSZ, struct.pack('HHHH', 0, 0, 0, 0))
rows, cols, h_pixels, v_pixels = struct.unpack('HHHH', packed) rows, cols, h_pixels, v_pixels = struct.unpack('HHHH', packed)
return rows, cols, h_pixels, v_pixels return rows, cols, h_pixels, v_pixels
def tty_set_size(fd: int, rows: int, cols: int, h_pixels: int, v_pixels: int): def tty_set_winsize(fd: int, rows: int, cols: int, h_pixels: int, v_pixels: int):
"""
Set the window size on a tty.
:param fd: file descriptor of tty
:param rows: number of visible rows
:param cols: number of visible columns
:param h_pixels: number of visible horizontal pixels
:param v_pixels: number of visible vertical pixels
"""
packed = struct.pack('HHHH', rows, cols, h_pixels, v_pixels) packed = struct.pack('HHHH', rows, cols, h_pixels, v_pixels)
fcntl.ioctl(fd, termios.TIOCSWINSZ, packed) fcntl.ioctl(fd, termios.TIOCSWINSZ, packed)
def process_exists(pid): def process_exists(pid) -> bool:
""" Check For the existence of a unix pid. """ """
Check For the existence of a unix pid.
:param pid: process id to check
:return: True if process exists
"""
try: try:
os.kill(pid, 0) os.kill(pid, 0)
except OSError: except OSError:
@ -109,37 +128,61 @@ def process_exists(pid):
class TtyRestorer: class TtyRestorer:
def __init__(self, fd: int): def __init__(self, fd: int):
"""
Saves termios attributes for a tty for later restoration
:param fd: file descriptor of tty
"""
self._fd = fd self._fd = fd
self._tattr = termios.tcgetattr(self._fd) self._tattr = termios.tcgetattr(self._fd)
def raw(self): def raw(self):
"""
Set raw mode on tty
"""
tty.setraw(self._fd, termios.TCSADRAIN) tty.setraw(self._fd, termios.TCSADRAIN)
def restore(self): def restore(self):
"""
Restore termios settings to state captured in constructor.
"""
termios.tcsetattr(self._fd, termios.TCSADRAIN, self._tattr) termios.tcsetattr(self._fd, termios.TCSADRAIN, self._tattr)
class CallbackSubprocess: class CallbackSubprocess:
# time between checks of child process
PROCESS_POLL_TIME: float = 0.1 PROCESS_POLL_TIME: float = 0.1
def __init__(self, command: [str], term: str, loop: asyncio.AbstractEventLoop, stdout_callback: callable, def __init__(self, argv: [str], term: str | None, loop: asyncio.AbstractEventLoop, stdout_callback: callable,
terminated_callback: callable): terminated_callback: callable):
"""
Fork a child process and generate callbacks with output from the process.
:param argv: the command line, tokenized. The first element must be the absolute path to an executable file.
:param term: the value that should be set for TERM. If None, the value from the parent process will be used
:param loop: the asyncio event loop to use
:param stdout_callback: callback for data, e.g. def callback(data:bytes) -> None
:param terminated_callback: callback for termination/return code, e.g. def callback(return_code:int) -> None
"""
assert loop is not None, "loop should not be None" assert loop is not None, "loop should not be None"
assert stdout_callback is not None, "stdout_callback should not be None" assert stdout_callback is not None, "stdout_callback should not be None"
assert terminated_callback is not None, "terminated_callback should not be None" assert terminated_callback is not None, "terminated_callback should not be None"
self.log = module_logger.getChild(self.__class__.__name__) self.log = module_logger.getChild(self.__class__.__name__)
self.log.debug(f"__init__({command},{term},...") self.log.debug(f"__init__({argv},{term},...")
self._command = command self._command = argv
self._term = term self._term = term
self._loop = loop self._loop = loop
self._stdout_cb = stdout_callback self._stdout_cb = stdout_callback
self._terminated_cb = terminated_callback self._terminated_cb = terminated_callback
self._pid: int | None = None self._pid: int | None = None
def terminate(self): def terminate(self, kill_delay: float = 1.0):
"""
Terminate child process if running
:param kill_delay: if after kill_delay seconds the child process has not exited, escalate to SIGHUP and SIGKILL
"""
self.log.debug("terminate()") self.log.debug("terminate()")
if not self.running:
return
try: try:
os.kill(self._pid, signal.SIGTERM) os.kill(self._pid, signal.SIGTERM)
except: except:
@ -164,28 +207,65 @@ class CallbackSubprocess:
@property @property
def started(self) -> bool: def started(self) -> bool:
"""
:return: True if child process has been started
"""
return self._pid is not None return self._pid is not None
@property @property
def running(self) -> bool: def running(self) -> bool:
"""
:return: True if child process is still running
"""
return self._pid is not None and process_exists(self._pid) return self._pid is not None and process_exists(self._pid)
def write(self, data: bytes): def write(self, data: bytes):
"""
Write bytes to the stdin of the child process.
:param data: bytes to write
"""
self.log.debug(f"write({data})") self.log.debug(f"write({data})")
os.write(self._si, data) os.write(self._si, data)
def set_winsize(self, r: int, c: int, h: int, w: int): def set_winsize(self, r: int, c: int, h: int, v: int):
self.log.debug(f"set_winsize({r},{c},{h},{w}") """
tty_set_size(self._si, r, c, h, w) Set the window size on the tty of the child process.
:param r: rows visible
:param c: columns visible
:param h: horizontal pixels visible
:param v: vertical pixels visible
:return:
"""
self.log.debug(f"set_winsize({r},{c},{h},{v}")
tty_set_winsize(self._si, r, c, h, v)
def copy_winsize(self, fromfd:int): def copy_winsize(self, fromfd:int):
r,c,h,w = tty_get_size(fromfd) """
self.set_winsize(r,c,w,h) Copy window size from one tty to another.
:param fromfd: source tty file descriptor
"""
r,c,h,v = tty_get_winsize(fromfd)
self.set_winsize(r,c,h,v)
# def tcsetattr(self, val: list[int | list[int | bytes]]): def tcsetattr(self, when: int, attr: list[int | list[int | bytes]]):
# termios.tcsetattr(self._si, termios.TCSANOW, val) """
Set tty attributes.
:param when: when to apply change: termios.TCSANOW or termios.TCSADRAIN or termios.TCSAFLUSH
:param attr: attributes to set
"""
termios.tcsetattr(self._si, when, attr)
def tcgetattr(self) -> list[int | list[int | bytes]]:
"""
Get tty attributes.
:return: tty attributes value
"""
return termios.tcgetattr(self._si)
def start(self): def start(self):
"""
Start the child process.
"""
self.log.debug("start()") self.log.debug("start()")
parentenv = os.environ.copy() parentenv = os.environ.copy()
env = {"HOME": parentenv["HOME"], env = {"HOME": parentenv["HOME"],
@ -224,14 +304,18 @@ class CallbackSubprocess:
except: except:
pass pass
tty_set_callback(self._si, functools.partial(reader, self._si, self._stdout_cb), self._loop) tty_add_reader_callback(self._si, functools.partial(reader, self._si, self._stdout_cb), self._loop)
async def main(): async def main():
"""
A test driver for the CallbackProcess class.
python ./process.py /bin/zsh --login
"""
import testlogging import testlogging
log = module_logger.getChild("main") log = module_logger.getChild("main")
if len(sys.argv) <= 1: if len(sys.argv) <= 1:
print("no cmd") print(f"Usage: {sys.argv} <absolute_path_to_child_executable> [child_arg ...]")
exit(1) exit(1)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
@ -252,7 +336,6 @@ async def main():
def sigint_handler(signal, frame): def sigint_handler(signal, frame):
# log.debug("KeyboardInterrupt") # log.debug("KeyboardInterrupt")
if process is None or process.started and not process.running: if process is None or process.started and not process.running:
tr.restore()
raise KeyboardInterrupt raise KeyboardInterrupt
elif process.running: elif process.running:
process.write("\x03".encode("utf-8")) process.write("\x03".encode("utf-8"))
@ -271,7 +354,7 @@ async def main():
process.write(data) process.write(data)
# sys.stdout.buffer.write(data) # sys.stdout.buffer.write(data)
tty_set_callback(sys.stdin.fileno(), stdin) tty_add_reader_callback(sys.stdin.fileno(), stdin)
process.start() process.start()
# process.tcsetattr(termios.tcgetattr(sys.stdin)) # process.tcsetattr(termios.tcgetattr(sys.stdin))
@ -285,5 +368,5 @@ if __name__ == "__main__":
tr.raw() tr.raw()
asyncio.run(main()) asyncio.run(main())
finally: finally:
tty_unset_callbacks(sys.stdin.fileno()) tty_unset_reader_callbacks(sys.stdin.fileno())
tr.restore() tr.restore()