mirror of
https://github.com/markqvist/rnsh.git
synced 2025-05-12 11:02:28 -04:00
Several test-driven fixes
- Improved test suite exposed several issues with the handling of command line arguments which are now fixed - Fixed a race condition that would cause remote characters to be lost intermittently when running remote commands that finish immediately. - Added automated testing that actually spins up a random listener and initiator in a private Reticulum network and passes data between them, uncovering more issues which are now fixed. - Fixed (hopefully) an issue where `rnsh` doesn't know what version it is.
This commit is contained in:
parent
d4cb31e220
commit
27664df0b3
12 changed files with 508 additions and 85 deletions
|
@ -1,68 +1,20 @@
|
|||
import uuid
|
||||
import tests.helpers
|
||||
import time
|
||||
import pytest
|
||||
import rnsh.process
|
||||
import contextlib
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import types
|
||||
import typing
|
||||
import multiprocessing.pool
|
||||
logging.getLogger().setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
class State(contextlib.AbstractContextManager):
|
||||
def __init__(self, argv: [str], loop: asyncio.AbstractEventLoop, env: dict = None):
|
||||
self.process: rnsh.process.CallbackSubprocess
|
||||
self.loop = loop
|
||||
self.env = env or os.environ.copy()
|
||||
self.argv = argv
|
||||
self._lock = threading.RLock()
|
||||
self._stdout = bytearray()
|
||||
self.return_code: int = None
|
||||
self.process = rnsh.process.CallbackSubprocess(argv=self.argv,
|
||||
env=self.env,
|
||||
loop=self.loop,
|
||||
stdout_callback=self._stdout_cb,
|
||||
terminated_callback=self._terminated_cb)
|
||||
|
||||
def _stdout_cb(self, data):
|
||||
with self._lock:
|
||||
self._stdout.extend(data)
|
||||
|
||||
def read(self):
|
||||
with self._lock:
|
||||
data = self._stdout.copy()
|
||||
self._stdout.clear()
|
||||
return data
|
||||
|
||||
def _terminated_cb(self, rc):
|
||||
self.return_code = rc
|
||||
|
||||
def start(self):
|
||||
self.process.start()
|
||||
|
||||
def cleanup(self):
|
||||
if self.process and self.process.running:
|
||||
self.process.terminate(kill_delay=0.1)
|
||||
|
||||
def __exit__(self, __exc_type: typing.Type[BaseException], __exc_value: BaseException,
|
||||
__traceback: types.TracebackType) -> bool:
|
||||
self.cleanup()
|
||||
return False
|
||||
|
||||
|
||||
@pytest.mark.skip_ci
|
||||
@pytest.mark.asyncio
|
||||
async def test_echo():
|
||||
"""
|
||||
Echoing some text through cat.
|
||||
"""
|
||||
loop = asyncio.get_running_loop()
|
||||
with State(argv=["/bin/cat"],
|
||||
loop=loop) as state:
|
||||
with tests.helpers.SubprocessReader(argv=["/bin/cat"]) as state:
|
||||
state.start()
|
||||
assert state.process is not None
|
||||
assert state.process.running
|
||||
|
@ -84,9 +36,7 @@ async def test_echo_live():
|
|||
"""
|
||||
Check for immediate echo
|
||||
"""
|
||||
loop = asyncio.get_running_loop()
|
||||
with State(argv=["/bin/cat"],
|
||||
loop=loop) as state:
|
||||
with tests.helpers.SubprocessReader(argv=["/bin/cat"]) as state:
|
||||
state.start()
|
||||
assert state.process is not None
|
||||
assert state.process.running
|
||||
|
@ -101,6 +51,38 @@ async def test_echo_live():
|
|||
assert decoded == message
|
||||
assert not state.process.running
|
||||
|
||||
@pytest.mark.skip_ci
|
||||
@pytest.mark.asyncio
|
||||
async def test_double_echo_live():
|
||||
"""
|
||||
Check for immediate echo
|
||||
"""
|
||||
with tests.helpers.SubprocessReader(name="state", argv=["/bin/cat"]) as state:
|
||||
with tests.helpers.SubprocessReader(name="state2", argv=["/bin/cat"]) as state2:
|
||||
state.start()
|
||||
state2.start()
|
||||
assert state.process is not None
|
||||
assert state.process.running
|
||||
assert state2.process is not None
|
||||
assert state2.process.running
|
||||
message = "t"
|
||||
state.process.write(message.encode("utf-8"))
|
||||
state2.process.write(message.encode("utf-8"))
|
||||
await asyncio.sleep(0.1)
|
||||
data = state.read()
|
||||
data2 = state2.read()
|
||||
state.process.write(rnsh.process.CTRL_C)
|
||||
state2.process.write(rnsh.process.CTRL_C)
|
||||
await asyncio.sleep(0.1)
|
||||
assert len(data) > 0
|
||||
assert len(data2) > 0
|
||||
decoded = data.decode("utf-8")
|
||||
decoded2 = data.decode("utf-8")
|
||||
assert decoded == message
|
||||
assert decoded2 == message
|
||||
assert not state.process.running
|
||||
assert not state2.process.running
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_event_wait_any():
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue