veilid/veilid-python/tests/test_routing_context.py

282 lines
9.5 KiB
Python
Raw Normal View History

2023-06-15 20:22:54 -04:00
# Routing context veilid tests
import asyncio
import os
2023-06-23 17:01:52 -04:00
import random
2023-06-25 18:28:32 -04:00
import sys
2023-06-17 14:34:09 -04:00
import pytest
2023-06-17 14:34:09 -04:00
import veilid
from .api import VeilidTestConnectionError, api_connector
2023-06-15 20:22:54 -04:00
##################################################################
2023-06-17 14:34:09 -04:00
2023-06-15 20:22:54 -04:00
@pytest.mark.asyncio
2023-06-18 18:47:39 -04:00
async def test_routing_contexts(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
pass
2023-06-17 14:34:09 -04:00
rc = await api_connection.new_routing_context()
2023-06-18 18:47:39 -04:00
async with rc:
2023-11-05 18:38:05 -05:00
rcp = await rc.with_default_safety(release=False)
2023-06-18 18:47:39 -04:00
async with rcp:
pass
rc = await (await api_connection.new_routing_context()).with_sequencing(
veilid.Sequencing.ENSURE_ORDERED
)
async with rc:
pass
2023-11-05 18:38:05 -05:00
rc = await (await api_connection.new_routing_context()).with_safety(
veilid.SafetySelection.safe(
2023-11-05 18:38:05 -05:00
veilid.SafetySpec(None, 2, veilid.Stability.LOW_LATENCY, veilid.Sequencing.NO_PREFERENCE)
)
)
await rc.release()
2023-11-05 18:38:05 -05:00
rc = await (await api_connection.new_routing_context()).with_safety(
veilid.SafetySelection.unsafe(veilid.Sequencing.PREFER_ORDERED)
)
await rc.release()
2023-06-17 14:34:09 -04:00
2023-06-15 20:22:54 -04:00
@pytest.mark.asyncio
async def test_routing_context_app_message_loopback():
2023-06-17 15:09:40 -04:00
# Seriously, mypy?
app_message_queue: asyncio.Queue = asyncio.Queue()
2023-06-15 20:22:54 -04:00
2023-06-26 21:29:02 -04:00
async def app_message_queue_update_callback(update: veilid.VeilidUpdate):
2023-06-15 20:22:54 -04:00
if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE:
await app_message_queue.put(update)
try:
api = await api_connector(app_message_queue_update_callback)
except VeilidTestConnectionError:
pytest.skip("Unable to connect to veilid-server.")
return
2023-06-15 20:22:54 -04:00
async with api:
2023-06-16 13:14:34 -04:00
# purge routes to ensure we start fresh
await api.debug("purge routes")
2023-06-17 14:34:09 -04:00
2023-06-15 20:22:54 -04:00
# make a routing context that uses a safety route
2023-11-05 18:38:05 -05:00
rc = await api.new_routing_context()
2023-06-18 18:47:39 -04:00
async with rc:
# make a new local private route
prl, blob = await api.new_private_route()
2023-06-15 20:22:54 -04:00
2023-06-18 18:47:39 -04:00
# import it as a remote route as well so we can send to it
prr = await api.import_remote_private_route(blob)
2023-06-15 20:22:54 -04:00
2023-06-18 18:47:39 -04:00
# send an app message to our own private route
message = b"abcd1234"
await rc.app_message(prr, message)
2023-06-17 14:34:09 -04:00
2023-06-18 18:47:39 -04:00
# we should get the same message back
update: veilid.VeilidUpdate = await asyncio.wait_for(
app_message_queue.get(), timeout=10
)
2023-06-15 20:22:54 -04:00
2023-06-18 18:47:39 -04:00
assert isinstance(update.detail, veilid.VeilidAppMessage)
assert update.detail.message == message
2023-06-15 20:22:54 -04:00
2023-06-16 11:57:55 -04:00
@pytest.mark.asyncio
async def test_routing_context_app_call_loopback():
2023-06-17 15:09:40 -04:00
app_call_queue: asyncio.Queue = asyncio.Queue()
2023-06-16 11:57:55 -04:00
2023-06-26 21:29:02 -04:00
async def app_call_queue_update_callback(update: veilid.VeilidUpdate):
2023-06-16 11:57:55 -04:00
if update.kind == veilid.VeilidUpdateKind.APP_CALL:
await app_call_queue.put(update)
try:
api = await api_connector(app_call_queue_update_callback)
except VeilidTestConnectionError:
pytest.skip("Unable to connect to veilid-server.")
return
2023-06-16 11:57:55 -04:00
async with api:
2023-06-16 13:14:34 -04:00
# purge routes to ensure we start fresh
await api.debug("purge routes")
# make a routing context
rc = await api.new_routing_context()
2023-06-18 18:47:39 -04:00
async with rc:
# make a new local private route
prl, blob = await api.new_private_route()
# import it as a remote route as well so we can send to it
prr = await api.import_remote_private_route(blob)
# send an app message to our own private route
request = b"abcd1234"
2023-07-22 13:06:46 -04:00
app_call_task = asyncio.create_task(rc.app_call(prr, request), name="app call task")
2023-06-18 18:47:39 -04:00
# we should get the same request back
2023-07-22 13:06:46 -04:00
update: veilid.VeilidUpdate = await asyncio.wait_for(app_call_queue.get(), timeout=10)
2023-06-18 18:47:39 -04:00
appcall = update.detail
assert isinstance(appcall, veilid.VeilidAppCall)
assert appcall.message == request
# now we reply to the request
reply = b"qwer5678"
await api.app_call_reply(appcall.call_id, reply)
# now we should get the reply from the call
result = await app_call_task
assert result == reply
2023-06-23 17:01:52 -04:00
@pytest.mark.asyncio
async def test_routing_context_app_message_loopback_big_packets():
app_message_queue: asyncio.Queue = asyncio.Queue()
2023-07-21 20:35:39 -04:00
count_hack = [0]
2023-07-04 13:38:25 -04:00
2023-06-26 21:29:02 -04:00
async def app_message_queue_update_callback(update: veilid.VeilidUpdate):
2023-06-23 17:01:52 -04:00
if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE:
2023-07-21 20:35:39 -04:00
count_hack[0] += 1
print(f"{count_hack[0]} ", end="")
2023-06-23 17:01:52 -04:00
await app_message_queue.put(update)
2023-06-24 11:16:34 -04:00
sent_messages: set[bytes] = set()
try:
api = await api_connector(app_message_queue_update_callback)
except VeilidTestConnectionError:
pytest.skip("Unable to connect to veilid-server.")
return
2023-06-23 17:01:52 -04:00
async with api:
# purge routes to ensure we start fresh
await api.debug("purge routes")
# make a routing context that uses a safety route
rc = await api.new_routing_context()
2023-06-23 17:01:52 -04:00
async with rc:
# make a new local private route
prl, blob = await api.new_private_route()
2023-06-23 17:01:52 -04:00
# import it as a remote route as well so we can send to it
prr = await api.import_remote_private_route(blob)
2023-06-26 21:29:02 -04:00
# do this test 1000 times
for _ in range(1000):
2023-06-23 17:01:52 -04:00
# send a random sized random app message to our own private route
2023-06-23 21:12:48 -04:00
message = random.randbytes(random.randint(0, 32768))
2023-06-23 17:01:52 -04:00
await rc.app_message(prr, message)
2023-06-24 11:16:34 -04:00
sent_messages.add(message)
# we should get the same messages back
2023-06-25 01:23:24 -04:00
print(len(sent_messages))
for n in range(len(sent_messages)):
print(n)
2023-06-23 17:01:52 -04:00
update: veilid.VeilidUpdate = await asyncio.wait_for(
app_message_queue.get(), timeout=10
)
assert isinstance(update.detail, veilid.VeilidAppMessage)
2023-06-24 11:16:34 -04:00
assert update.detail.message in sent_messages
2023-06-25 18:28:32 -04:00
2023-07-04 13:38:25 -04:00
2023-06-25 18:28:32 -04:00
@pytest.mark.asyncio
async def test_routing_context_app_call_loopback_big_packets():
2023-07-21 20:35:39 -04:00
count_hack = [0]
2023-07-04 13:38:25 -04:00
2023-06-26 21:29:02 -04:00
app_call_queue: asyncio.Queue = asyncio.Queue()
async def app_call_queue_update_callback(update: veilid.VeilidUpdate):
2023-06-25 18:28:32 -04:00
if update.kind == veilid.VeilidUpdateKind.APP_CALL:
2023-06-26 21:29:02 -04:00
await app_call_queue.put(update)
async def app_call_queue_task_handler(api: veilid.VeilidAPI):
while True:
update = await app_call_queue.get()
2023-07-04 13:38:25 -04:00
2023-07-21 20:35:39 -04:00
count_hack[0] += 1
print(f"{count_hack[0]} ", end="", flush=True)
2023-06-25 18:28:32 -04:00
2023-06-26 21:29:02 -04:00
await api.app_call_reply(update.detail.call_id, update.detail.message)
2023-07-04 13:38:25 -04:00
try:
api = await api_connector(app_call_queue_update_callback)
except VeilidTestConnectionError:
pytest.skip("Unable to connect to veilid-server.")
return
2023-06-25 18:28:32 -04:00
async with api:
# purge routes to ensure we start fresh
await api.debug("purge routes")
2023-07-22 13:06:46 -04:00
app_call_task = asyncio.create_task(app_call_queue_task_handler(api), name="app call task")
2023-06-26 21:29:02 -04:00
2023-06-25 18:28:32 -04:00
# make a routing context that uses a safety route
2023-11-05 18:38:05 -05:00
rc = await (await api.new_routing_context()).with_sequencing(
2023-07-22 13:06:46 -04:00
veilid.Sequencing.ENSURE_ORDERED
)
2023-06-25 18:28:32 -04:00
async with rc:
# make a new local private route
2023-11-04 21:21:08 -04:00
prl, blob = await api.new_custom_private_route(
[veilid.CryptoKind.CRYPTO_KIND_VLD0],
veilid.Stability.RELIABLE,
veilid.Sequencing.ENSURE_ORDERED)
2023-06-25 18:28:32 -04:00
# import it as a remote route as well so we can send to it
prr = await api.import_remote_private_route(blob)
2023-06-26 21:29:02 -04:00
# do this test 10 times
for _ in range(10):
2023-06-25 18:28:32 -04:00
# send a random sized random app message to our own private route
message = random.randbytes(random.randint(0, 32768))
out_message = await rc.app_call(prr, message)
assert message == out_message
2023-07-04 13:38:25 -04:00
2023-06-26 21:29:02 -04:00
app_call_task.cancel()
2023-07-22 13:06:46 -04:00
@pytest.mark.skipif(os.getenv("NOSKIP") != "1", reason="unneeded test, only for performance check")
2023-06-26 21:29:02 -04:00
@pytest.mark.asyncio
async def test_routing_context_app_message_loopback_bandwidth():
app_message_queue: asyncio.Queue = asyncio.Queue()
async def app_message_queue_update_callback(update: veilid.VeilidUpdate):
if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE:
await app_message_queue.put(True)
try:
api = await api_connector(app_message_queue_update_callback)
except VeilidTestConnectionError:
pytest.skip("Unable to connect to veilid-server.")
return
2023-06-26 21:29:02 -04:00
async with api:
# purge routes to ensure we start fresh
await api.debug("purge routes")
# make a routing context that uses a safety route
rc = await api.new_routing_context()
async with rc:
# make a new local private route
prl, blob = await api.new_private_route()
# import it as a remote route as well so we can send to it
prr = await api.import_remote_private_route(blob)
# do this test 1000 times
message = random.randbytes(16384)
for _ in range(10000):
# send a random sized random app message to our own private route
await rc.app_message(prr, message)
# we should get the same number of messages back (not storing all that data)
for _ in range(10000):
await asyncio.wait_for(app_message_queue.get(), timeout=10)