mirror of
https://github.com/markqvist/rnsh.git
synced 2025-06-20 20:14:15 -04:00
Add tests for and fix #14
This commit is contained in:
parent
a07ce53bf9
commit
458a2391df
5 changed files with 82 additions and 8 deletions
|
@ -123,14 +123,17 @@ async def do_connected_test(listener_args: str, initiator_args: str, test: calla
|
|||
assert len(ih) == 32
|
||||
assert len(dh) == 32
|
||||
assert len(iih) == 32
|
||||
assert "dh" in initiator_args
|
||||
initiator_args = initiator_args.replace("dh", dh)
|
||||
listener_args = listener_args.replace("iih", iih)
|
||||
with tests.helpers.SubprocessReader(name="listener", argv=shlex.split(f"poetry run -- rnsh -l -c \"{td}\" {listener_args}")) as listener, \
|
||||
tests.helpers.SubprocessReader(name="initiator", argv=shlex.split(f"poetry run -- rnsh -c \"{td}\" {dh} {initiator_args}")) as initiator:
|
||||
tests.helpers.SubprocessReader(name="initiator", argv=shlex.split(f"poetry run -- rnsh -c \"{td}\" {initiator_args}")) as initiator:
|
||||
# listener startup
|
||||
listener.start()
|
||||
await asyncio.sleep(0.1)
|
||||
assert listener.process.running
|
||||
# wait for process to start up
|
||||
await asyncio.sleep(5)
|
||||
await asyncio.sleep(2)
|
||||
# read the output
|
||||
text = listener.read().decode("utf-8")
|
||||
assert text.index(dh) is not None
|
||||
|
@ -166,7 +169,55 @@ async def test_rnsh_get_echo_through():
|
|||
text = initiator.read().decode("utf-8").replace("\r", "").replace("\n", "")
|
||||
assert text[len(text)-len(cwd):] == cwd
|
||||
|
||||
await do_connected_test("-n -C -- /bin/pwd", "", test)
|
||||
await do_connected_test("-n -C -- /bin/pwd", "dh", test)
|
||||
|
||||
|
||||
@pytest.mark.skip_ci
|
||||
@pytest.mark.asyncio
|
||||
async def test_rnsh_no_ident():
|
||||
cwd = os.getcwd()
|
||||
|
||||
async def test(td: str, ih: str, dh: str, iih: str, listener: tests.helpers.SubprocessReader,
|
||||
initiator: tests.helpers.SubprocessReader):
|
||||
start_time = time.time()
|
||||
while initiator.return_code is None and time.time() - start_time < 3:
|
||||
await asyncio.sleep(0.1)
|
||||
text = initiator.read().decode("utf-8").replace("\r", "").replace("\n", "")
|
||||
assert text[len(text)-len(cwd):] == cwd
|
||||
|
||||
await do_connected_test("-n -C -- /bin/pwd", "-N dh", test)
|
||||
|
||||
|
||||
@pytest.mark.skip_ci
|
||||
@pytest.mark.asyncio
|
||||
async def test_rnsh_invalid_ident():
|
||||
cwd = os.getcwd()
|
||||
|
||||
async def test(td: str, ih: str, dh: str, iih: str, listener: tests.helpers.SubprocessReader,
|
||||
initiator: tests.helpers.SubprocessReader):
|
||||
start_time = time.time()
|
||||
while initiator.return_code is None and time.time() - start_time < 3:
|
||||
await asyncio.sleep(0.1)
|
||||
text = initiator.read().decode("utf-8").replace("\r", "").replace("\n", "")
|
||||
assert "not allowed" in text
|
||||
|
||||
await do_connected_test("-a 12345678901234567890123456789012 -C -- /bin/pwd", "dh", test)
|
||||
|
||||
|
||||
@pytest.mark.skip_ci
|
||||
@pytest.mark.asyncio
|
||||
async def test_rnsh_valid_ident():
|
||||
cwd = os.getcwd()
|
||||
|
||||
async def test(td: str, ih: str, dh: str, iih: str, listener: tests.helpers.SubprocessReader,
|
||||
initiator: tests.helpers.SubprocessReader):
|
||||
start_time = time.time()
|
||||
while initiator.return_code is None and time.time() - start_time < 3:
|
||||
await asyncio.sleep(0.1)
|
||||
text = initiator.read().decode("utf-8").replace("\r", "").replace("\n", "")
|
||||
assert text[len(text)-len(cwd):] == cwd
|
||||
|
||||
await do_connected_test("-a iih -C -- /bin/pwd", "dh", test)
|
||||
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue