Add some additional tests for #3

This commit is contained in:
Aaron Heise 2023-02-16 07:03:57 -06:00
parent 9a37a52601
commit 3b3cee73bb
3 changed files with 143 additions and 9 deletions

View File

@ -351,11 +351,11 @@ def _launch_child(cmd_line: list[str], env: dict[str, str], stdin_is_pipe: bool,
os.dup2(child_stdout, 1) os.dup2(child_stdout, 1)
os.dup2(child_stderr, 2) os.dup2(child_stderr, 2)
# Make PTY controlling if necessary # Make PTY controlling if necessary
if not stdin_is_pipe: if child_fd is not None:
os.setsid() os.setsid()
tmp_fd = os.open(os.ttyname(0), os.O_RDWR) tmp_fd = os.open(os.ttyname(0 if not stdin_is_pipe else 1 if not stdout_is_pipe else 2), os.O_RDWR)
os.close(tmp_fd) os.close(tmp_fd)
# fcntl.ioctl(0, termios.TIOCSCTTY, 0) # fcntl.ioctl(0 if not stdin_is_pipe else 1 if not stdout_is_pipe else 2), os.O_RDWR, termios.TIOCSCTTY, 0)
# Execute the command # Execute the command
os.execvpe(cmd_line[0], cmd_line, env) os.execvpe(cmd_line[0], cmd_line, env)
@ -365,7 +365,7 @@ def _launch_child(cmd_line: list[str], env: dict[str, str], stdin_is_pipe: bool,
print(f"Unable to start {cmd_line[0]}: {err} ({fname}:{exc_tb.tb_lineno})") print(f"Unable to start {cmd_line[0]}: {err} ({fname}:{exc_tb.tb_lineno})")
sys.stdout.flush() sys.stdout.flush()
# don't let any other modules get in our way, do an immediate silent exit. # don't let any other modules get in our way, do an immediate silent exit.
os._exit(0) os._exit(255)
else: else:
# We are in the parent process, so close the child-side of the PTY and/or pipes # We are in the parent process, so close the child-side of the PTY and/or pipes

View File

@ -23,7 +23,8 @@ module_dir = os.path.dirname(module_abs_filename)
class SubprocessReader(contextlib.AbstractContextManager): class SubprocessReader(contextlib.AbstractContextManager):
def __init__(self, argv: [str], env: dict = None, name: str = None): def __init__(self, argv: [str], env: dict = None, name: str = None, stdin_is_pipe: bool = False,
stdout_is_pipe: bool = False, stderr_is_pipe: bool = False):
self._log = module_logger.getChild(self.__class__.__name__ + ("" if name is None else f"({name})")) self._log = module_logger.getChild(self.__class__.__name__ + ("" if name is None else f"({name})"))
self.name = name or "subproc" self.name = name or "subproc"
self.process: rnsh.process.CallbackSubprocess self.process: rnsh.process.CallbackSubprocess
@ -32,16 +33,17 @@ class SubprocessReader(contextlib.AbstractContextManager):
self.argv = argv self.argv = argv
self._lock = threading.RLock() self._lock = threading.RLock()
self._stdout = bytearray() self._stdout = bytearray()
self._stderr = bytearray()
self.return_code: int = None self.return_code: int = None
self.process = rnsh.process.CallbackSubprocess(argv=self.argv, self.process = rnsh.process.CallbackSubprocess(argv=self.argv,
env=self.env, env=self.env,
loop=self.loop, loop=self.loop,
stdout_callback=self._stdout_cb, stdout_callback=self._stdout_cb,
terminated_callback=self._terminated_cb, terminated_callback=self._terminated_cb,
stderr_callback=self._stdout_cb, stderr_callback=self._stderr_cb,
stdin_is_pipe=False, stdin_is_pipe=stdin_is_pipe,
stdout_is_pipe=False, stdout_is_pipe=stdout_is_pipe,
stderr_is_pipe=False) stderr_is_pipe=stderr_is_pipe)
def _stdout_cb(self, data): def _stdout_cb(self, data):
self._log.debug(f"_stdout_cb({data})") self._log.debug(f"_stdout_cb({data})")
@ -56,6 +58,19 @@ class SubprocessReader(contextlib.AbstractContextManager):
self._log.debug(f"read() returns {data}") self._log.debug(f"read() returns {data}")
return data return data
def _stderr_cb(self, data):
self._log.debug(f"_stderr_cb({data})")
with self._lock:
self._stderr.extend(data)
def read_err(self):
self._log.debug(f"read_err()")
with self._lock:
data = self._stderr.copy()
self._stderr.clear()
self._log.debug(f"read_err() returns {data}")
return data
def _terminated_cb(self, rc): def _terminated_cb(self, rc):
self._log.debug(f"_terminated_cb({rc})") self._log.debug(f"_terminated_cb({rc})")
self.return_code = rc self.return_code = rc

View File

@ -51,6 +51,125 @@ async def test_echo_live():
assert decoded == message assert decoded == message
assert not state.process.running assert not state.process.running
@pytest.mark.skip_ci
@pytest.mark.asyncio
async def test_echo_live_pipe_in():
"""
Check for immediate echo
"""
with tests.helpers.SubprocessReader(argv=["/bin/cat"], stdin_is_pipe=True) as state:
state.start()
assert state.process is not None
assert state.process.running
message = "t"
state.process.write(message.encode("utf-8"))
await asyncio.sleep(0.1)
data = state.read()
state.process.close_stdin()
await asyncio.sleep(0.1)
assert len(data) > 0
decoded = data.decode("utf-8")
assert decoded == message
assert not state.process.running
@pytest.mark.skip_ci
@pytest.mark.asyncio
async def test_echo_live_pipe_out():
"""
Check for immediate echo
"""
with tests.helpers.SubprocessReader(argv=["/bin/cat"], stdout_is_pipe=True) as state:
state.start()
assert state.process is not None
assert state.process.running
message = "t"
state.process.write(message.encode("utf-8"))
state.process.write(rnsh.process.CTRL_D)
await asyncio.sleep(0.1)
data = state.read()
assert len(data) > 0
decoded = data.decode("utf-8")
assert decoded == message
data = state.read_err()
assert len(data) > 0
state.process.close_stdin()
await asyncio.sleep(0.1)
assert not state.process.running
@pytest.mark.skip_ci
@pytest.mark.asyncio
async def test_echo_live_pipe_err():
"""
Check for immediate echo
"""
with tests.helpers.SubprocessReader(argv=["/bin/cat"], stderr_is_pipe=True) as state:
state.start()
assert state.process is not None
assert state.process.running
message = "t"
state.process.write(message.encode("utf-8"))
await asyncio.sleep(0.1)
data = state.read()
state.process.write(rnsh.process.CTRL_C)
await asyncio.sleep(0.1)
assert len(data) > 0
decoded = data.decode("utf-8")
assert decoded == message
assert not state.process.running
@pytest.mark.skip_ci
@pytest.mark.asyncio
async def test_echo_live_pipe_out_err():
"""
Check for immediate echo
"""
with tests.helpers.SubprocessReader(argv=["/bin/cat"], stdout_is_pipe=True, stderr_is_pipe=True) as state:
state.start()
assert state.process is not None
assert state.process.running
message = "t"
state.process.write(message.encode("utf-8"))
state.process.write(rnsh.process.CTRL_D)
await asyncio.sleep(0.1)
data = state.read()
assert len(data) > 0
decoded = data.decode("utf-8")
assert decoded == message
data = state.read_err()
assert len(data) == 0
state.process.close_stdin()
await asyncio.sleep(0.1)
assert not state.process.running
@pytest.mark.skip_ci
@pytest.mark.asyncio
async def test_echo_live_pipe_all():
"""
Check for immediate echo
"""
with tests.helpers.SubprocessReader(argv=["/bin/cat"], stdout_is_pipe=True, stderr_is_pipe=True,
stdin_is_pipe=True) as state:
state.start()
assert state.process is not None
assert state.process.running
message = "t"
state.process.write(message.encode("utf-8"))
await asyncio.sleep(0.1)
data = state.read()
state.process.close_stdin()
await asyncio.sleep(0.1)
assert len(data) > 0
decoded = data.decode("utf-8")
assert decoded == message
assert not state.process.running
@pytest.mark.skip_ci @pytest.mark.skip_ci
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_double_echo_live(): async def test_double_echo_live():