fix python test

This commit is contained in:
Christien Rioux 2024-03-06 16:10:01 -05:00
parent 78e84f6573
commit 6d7c62fb6b

View File

@ -69,10 +69,10 @@ async def test_routing_context_app_message_loopback():
async with rc: async with rc:
# make a new local private route # make a new local private route
prl, blob = await api.new_private_route() prl, blob = await api.new_private_route()
try:
# import it as a remote route as well so we can send to it # import it as a remote route as well so we can send to it
prr = await api.import_remote_private_route(blob) prr = await api.import_remote_private_route(blob)
try:
# send an app message to our own private route # send an app message to our own private route
message = b"abcd1234" message = b"abcd1234"
await rc.app_message(prr, message) await rc.app_message(prr, message)
@ -84,6 +84,12 @@ async def test_routing_context_app_message_loopback():
assert isinstance(update.detail, veilid.VeilidAppMessage) assert isinstance(update.detail, veilid.VeilidAppMessage)
assert update.detail.message == message assert update.detail.message == message
finally:
# release imported private route
await api.release_private_route(prr)
finally:
# release local private route
await api.release_private_route(prl)
@pytest.mark.asyncio @pytest.mark.asyncio
@ -109,9 +115,10 @@ async def test_routing_context_app_call_loopback():
async with rc: async with rc:
# make a new local private route # make a new local private route
prl, blob = await api.new_private_route() prl, blob = await api.new_private_route()
try:
# import it as a remote route as well so we can send to it # import it as a remote route as well so we can send to it
prr = await api.import_remote_private_route(blob) prr = await api.import_remote_private_route(blob)
try:
# send an app message to our own private route # send an app message to our own private route
request = b"abcd1234" request = b"abcd1234"
@ -132,6 +139,12 @@ async def test_routing_context_app_call_loopback():
result = await app_call_task result = await app_call_task
assert result == reply assert result == reply
finally:
# release imported private route
await api.release_private_route(prr)
finally:
# release local private route
await api.release_private_route(prl)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_routing_context_app_message_loopback_big_packets(): async def test_routing_context_app_message_loopback_big_packets():
@ -162,12 +175,12 @@ async def test_routing_context_app_message_loopback_big_packets():
async with rc: async with rc:
# make a new local private route # make a new local private route
prl, blob = await api.new_private_route() prl, blob = await api.new_private_route()
try:
# import it as a remote route as well so we can send to it # import it as a remote route as well so we can send to it
prr = await api.import_remote_private_route(blob) prr = await api.import_remote_private_route(blob)
try:
# do this test 100 times # do this test 5 times
for _ in range(100): for _ in range(5):
# send a random sized random app message to our own private route # send a random sized random app message to our own private route
message = random.randbytes(random.randint(0, 32768)) message = random.randbytes(random.randint(0, 32768))
await rc.app_message(prr, message) await rc.app_message(prr, message)
@ -184,6 +197,13 @@ async def test_routing_context_app_message_loopback_big_packets():
assert isinstance(update.detail, veilid.VeilidAppMessage) assert isinstance(update.detail, veilid.VeilidAppMessage)
assert update.detail.message in sent_messages assert update.detail.message in sent_messages
finally:
# release imported private route
await api.release_private_route(prr)
finally:
# release local private route
await api.release_private_route(prl)
@pytest.mark.asyncio @pytest.mark.asyncio
@ -216,7 +236,7 @@ async def test_routing_context_app_call_loopback_big_packets():
await api.debug("purge routes") await api.debug("purge routes")
app_call_task = asyncio.create_task(app_call_queue_task_handler(api), name="app call task") app_call_task = asyncio.create_task(app_call_queue_task_handler(api), name="app call task")
try:
# make a routing context that uses a safety route # make a routing context that uses a safety route
rc = await (await api.new_routing_context()).with_sequencing( rc = await (await api.new_routing_context()).with_sequencing(
veilid.Sequencing.ENSURE_ORDERED veilid.Sequencing.ENSURE_ORDERED
@ -227,18 +247,24 @@ async def test_routing_context_app_call_loopback_big_packets():
[veilid.CryptoKind.CRYPTO_KIND_VLD0], [veilid.CryptoKind.CRYPTO_KIND_VLD0],
veilid.Stability.RELIABLE, veilid.Stability.RELIABLE,
veilid.Sequencing.ENSURE_ORDERED) veilid.Sequencing.ENSURE_ORDERED)
try:
# import it as a remote route as well so we can send to it # import it as a remote route as well so we can send to it
prr = await api.import_remote_private_route(blob) prr = await api.import_remote_private_route(blob)
try:
# do this test 10 times # do this test 5 times
for _ in range(10): for _ in range(5):
# send a random sized random app message to our own private route # send a random sized random app message to our own private route
message = random.randbytes(random.randint(0, 32768)) message = random.randbytes(random.randint(0, 32768))
out_message = await rc.app_call(prr, message) out_message = await rc.app_call(prr, message)
assert message == out_message assert message == out_message
finally:
# release imported private route
await api.release_private_route(prr)
finally:
# release local private route
await api.release_private_route(prl)
finally:
app_call_task.cancel() app_call_task.cancel()
@ -266,10 +292,10 @@ async def test_routing_context_app_message_loopback_bandwidth():
async with rc: async with rc:
# make a new local private route # make a new local private route
prl, blob = await api.new_private_route() prl, blob = await api.new_private_route()
try:
# import it as a remote route as well so we can send to it # import it as a remote route as well so we can send to it
prr = await api.import_remote_private_route(blob) prr = await api.import_remote_private_route(blob)
try:
# do this test 1000 times # do this test 1000 times
message = random.randbytes(16384) message = random.randbytes(16384)
for _ in range(10000): for _ in range(10000):
@ -279,3 +305,9 @@ async def test_routing_context_app_message_loopback_bandwidth():
# we should get the same number of messages back (not storing all that data) # we should get the same number of messages back (not storing all that data)
for _ in range(10000): for _ in range(10000):
await asyncio.wait_for(app_message_queue.get(), timeout=10) await asyncio.wait_for(app_message_queue.get(), timeout=10)
finally:
# release imported private route
await api.release_private_route(prr)
finally:
# release local private route
await api.release_private_route(prl)