Merge branch 'tek/mark-python-integration-tests' into 'main'

Skip integrations tests when veilid-server isn't running

See merge request veilid/veilid!95
This commit is contained in:
Christien Rioux 2023-07-22 03:00:42 +00:00
commit 03d92b8719
3 changed files with 144 additions and 73 deletions

View File

@ -0,0 +1,70 @@
import errno
import os
import re
from collections.abc import Callable
from functools import cache
from veilid.json_api import _JsonVeilidAPI
import veilid
ERRNO_PATTERN = re.compile(r"errno (\d+)", re.IGNORECASE)
class VeilidTestConnectionError(Exception):
"""The test client could not connect to the veilid-server."""
pass
@cache
def server_info() -> tuple[str, int]:
"""Return the hostname and port of the test server."""
VEILID_SERVER = os.getenv("VEILID_SERVER")
if VEILID_SERVER is None:
return "localhost", 5959
hostname, *rest = VEILID_SERVER.split(":")
if rest:
return hostname, int(rest[0])
return hostname, 5959
async def api_connector(callback: Callable) -> _JsonVeilidAPI:
"""Return an API connection if possible.
If the connection fails due to an inability to connect to the
server's socket, raise an easy-to-catch VeilidTestConnectionError.
"""
hostname, port = server_info()
try:
return await veilid.json_api_connect(hostname, port, callback)
except OSError as exc:
# This is a little goofy. The underlying Python library handles
# connection errors in 2 ways, depending on how many connections
# it attempted to make:
#
# - If it only tried to connect to one IP address socket, the
# library propagates the one single OSError it got.
#
# - If it tried to connect to multiple sockets, perhaps because
# the hostname resolved to several addresses (e.g. "localhost"
# => 127.0.0.1 and ::1), then the library raises one exception
# with all the failure exception strings joined together.
# If errno is set, it's the first kind of exception. Check that
# it's the code we expected.
if exc.errno is not None:
if exc.errno == errno.ECONNREFUSED:
raise VeilidTestConnectionError
raise
# If not, use a regular expression to find all the errno values
# in the combined error string. Check that all of them have the
# code we're looking for.
errnos = ERRNO_PATTERN.findall(str(exc))
if all(int(err) == errno.ECONNREFUSED for err in errnos):
raise VeilidTestConnectionError
raise

View File

@ -1,35 +1,30 @@
import os """Common test fixtures."""
from functools import cache
from typing import AsyncGenerator from typing import AsyncGenerator
import pytest
import pytest_asyncio import pytest_asyncio
import veilid
from veilid.json_api import _JsonVeilidAPI from veilid.json_api import _JsonVeilidAPI
import veilid
from .api import VeilidTestConnectionError, api_connector
pytest_plugins = ("pytest_asyncio",) pytest_plugins = ("pytest_asyncio",)
@cache
def server_info() -> tuple[str, int]:
"""Return the hostname and port of the test server."""
VEILID_SERVER = os.getenv("VEILID_SERVER")
if VEILID_SERVER is None:
return "localhost", 5959
hostname, *rest = VEILID_SERVER.split(":")
if rest:
return hostname, int(rest[0])
return hostname, 5959
async def simple_update_callback(update: veilid.VeilidUpdate): async def simple_update_callback(update: veilid.VeilidUpdate):
print(f"VeilidUpdate: {update}") print(f"VeilidUpdate: {update}")
@pytest_asyncio.fixture @pytest_asyncio.fixture
async def api_connection() -> AsyncGenerator[_JsonVeilidAPI, None]: async def api_connection() -> AsyncGenerator[_JsonVeilidAPI, None]:
hostname, port = server_info() try:
api = await veilid.json_api_connect(hostname, port, simple_update_callback) api = await api_connector(simple_update_callback)
except VeilidTestConnectionError:
pytest.skip("Unable to connect to veilid-server.")
return
async with api: async with api:
# purge routes to ensure we start fresh # purge routes to ensure we start fresh
await api.debug("purge routes") await api.debug("purge routes")

View File

@ -1,15 +1,15 @@
# Routing context veilid tests # Routing context veilid tests
import asyncio import asyncio
import os
import random import random
import sys import sys
import os
import pytest import pytest
import veilid
from veilid.types import OperationId
from .conftest import server_info import veilid
from .api import VeilidTestConnectionError, api_connector
################################################################## ##################################################################
@ -26,18 +26,24 @@ async def test_routing_contexts(api_connection: veilid.VeilidAPI):
async with rcp: async with rcp:
pass pass
rc = await (await api_connection.new_routing_context()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED) rc = await (await api_connection.new_routing_context()).with_sequencing(
veilid.Sequencing.ENSURE_ORDERED
)
async with rc: async with rc:
pass pass
rc = await (await api_connection.new_routing_context()).with_custom_privacy( rc = await (await api_connection.new_routing_context()).with_custom_privacy(
veilid.SafetySelection.safe( veilid.SafetySelection.safe(
veilid.SafetySpec(None, 2, veilid.Stability.RELIABLE, veilid.SafetySpec(
veilid.Sequencing.ENSURE_ORDERED) None, 2, veilid.Stability.RELIABLE, veilid.Sequencing.ENSURE_ORDERED
)) )
)
)
await rc.release() await rc.release()
rc = await (await api_connection.new_routing_context()).with_custom_privacy(veilid.SafetySelection.unsafe(veilid.Sequencing.ENSURE_ORDERED)) rc = await (await api_connection.new_routing_context()).with_custom_privacy(
veilid.SafetySelection.unsafe(veilid.Sequencing.ENSURE_ORDERED)
)
await rc.release() await rc.release()
@ -50,10 +56,12 @@ async def test_routing_context_app_message_loopback():
if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE: if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE:
await app_message_queue.put(update) await app_message_queue.put(update)
hostname, port = server_info() try:
api = await veilid.json_api_connect( api = await api_connector(app_message_queue_update_callback)
hostname, port, app_message_queue_update_callback except VeilidTestConnectionError:
) pytest.skip("Unable to connect to veilid-server.")
return
async with api: async with api:
# purge routes to ensure we start fresh # purge routes to ensure we start fresh
await api.debug("purge routes") await api.debug("purge routes")
@ -61,7 +69,6 @@ async def test_routing_context_app_message_loopback():
# make a routing context that uses a safety route # make a routing context that uses a safety route
rc = await (await api.new_routing_context()).with_privacy() rc = await (await api.new_routing_context()).with_privacy()
async with rc: async with rc:
# make a new local private route # make a new local private route
prl, blob = await api.new_private_route() prl, blob = await api.new_private_route()
@ -89,8 +96,12 @@ async def test_routing_context_app_call_loopback():
if update.kind == veilid.VeilidUpdateKind.APP_CALL: if update.kind == veilid.VeilidUpdateKind.APP_CALL:
await app_call_queue.put(update) await app_call_queue.put(update)
hostname, port = server_info() try:
api = await veilid.json_api_connect(hostname, port, app_call_queue_update_callback) api = await api_connector(app_call_queue_update_callback)
except VeilidTestConnectionError:
pytest.skip("Unable to connect to veilid-server.")
return
async with api: async with api:
# purge routes to ensure we start fresh # purge routes to ensure we start fresh
await api.debug("purge routes") await api.debug("purge routes")
@ -98,7 +109,6 @@ async def test_routing_context_app_call_loopback():
# make a routing context that uses a safety route # make a routing context that uses a safety route
rc = await (await api.new_routing_context()).with_privacy() rc = await (await api.new_routing_context()).with_privacy()
async with rc: async with rc:
# make a new local private route # make a new local private route
prl, blob = await api.new_private_route() prl, blob = await api.new_private_route()
@ -131,33 +141,33 @@ async def test_routing_context_app_call_loopback():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_routing_context_app_message_loopback_big_packets(): async def test_routing_context_app_message_loopback_big_packets():
app_message_queue: asyncio.Queue = asyncio.Queue() app_message_queue: asyncio.Queue = asyncio.Queue()
global got_message count_hack = [0]
got_message = 0
async def app_message_queue_update_callback(update: veilid.VeilidUpdate): async def app_message_queue_update_callback(update: veilid.VeilidUpdate):
if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE: if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE:
global got_message count_hack[0] += 1
got_message += 1 print(f"{count_hack[0]} ", end="")
sys.stdout.write("{} ".format(got_message))
await app_message_queue.put(update) await app_message_queue.put(update)
sent_messages: set[bytes] = set() sent_messages: set[bytes] = set()
hostname, port = server_info() try:
api = await veilid.json_api_connect( api = await api_connector(app_message_queue_update_callback)
hostname, port, app_message_queue_update_callback except VeilidTestConnectionError:
) pytest.skip("Unable to connect to veilid-server.")
return
async with api: async with api:
# purge routes to ensure we start fresh # purge routes to ensure we start fresh
await api.debug("purge routes") await api.debug("purge routes")
# make a routing context that uses a safety route # make a routing context that uses a safety route
rc = await (await (await api.new_routing_context()).with_privacy()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED) rc = await (
await (await api.new_routing_context()).with_privacy()
).with_sequencing(veilid.Sequencing.ENSURE_ORDERED)
async with rc: async with rc:
# make a new local private route # make a new local private route
prl, blob = await api.new_private_route() prl, blob = await api.new_private_route()
@ -166,7 +176,6 @@ async def test_routing_context_app_message_loopback_big_packets():
# do this test 1000 times # do this test 1000 times
for _ in range(1000): for _ in range(1000):
# send a random sized random app message to our own private route # send a random sized random app message to our own private route
message = random.randbytes(random.randint(0, 32768)) message = random.randbytes(random.randint(0, 32768))
await rc.app_message(prr, message) await rc.app_message(prr, message)
@ -187,8 +196,7 @@ async def test_routing_context_app_message_loopback_big_packets():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_routing_context_app_call_loopback_big_packets(): async def test_routing_context_app_call_loopback_big_packets():
global got_message count_hack = [0]
got_message = 0
app_call_queue: asyncio.Queue = asyncio.Queue() app_call_queue: asyncio.Queue = asyncio.Queue()
@ -200,18 +208,17 @@ async def test_routing_context_app_call_loopback_big_packets():
while True: while True:
update = await app_call_queue.get() update = await app_call_queue.get()
global got_message count_hack[0] += 1
got_message += 1 print(f"{count_hack[0]} ", end="", flush=True)
sys.stdout.write("{} ".format(got_message))
sys.stdout.flush()
await api.app_call_reply(update.detail.call_id, update.detail.message) await api.app_call_reply(update.detail.call_id, update.detail.message)
hostname, port = server_info() try:
api = await veilid.json_api_connect( api = await api_connector(app_call_queue_update_callback)
hostname, port, app_call_queue_update_callback except VeilidTestConnectionError:
) pytest.skip("Unable to connect to veilid-server.")
return
async with api: async with api:
# purge routes to ensure we start fresh # purge routes to ensure we start fresh
await api.debug("purge routes") await api.debug("purge routes")
@ -221,9 +228,10 @@ async def test_routing_context_app_call_loopback_big_packets():
) )
# make a routing context that uses a safety route # make a routing context that uses a safety route
rc = await (await (await api.new_routing_context()).with_privacy()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED) rc = await (
await (await api.new_routing_context()).with_privacy()
).with_sequencing(veilid.Sequencing.ENSURE_ORDERED)
async with rc: async with rc:
# make a new local private route # make a new local private route
prl, blob = await api.new_private_route() prl, blob = await api.new_private_route()
@ -232,7 +240,6 @@ async def test_routing_context_app_call_loopback_big_packets():
# do this test 10 times # do this test 10 times
for _ in range(10): for _ in range(10):
# send a random sized random app message to our own private route # send a random sized random app message to our own private route
message = random.randbytes(random.randint(0, 32768)) message = random.randbytes(random.randint(0, 32768))
out_message = await rc.app_call(prr, message) out_message = await rc.app_call(prr, message)
@ -242,20 +249,23 @@ async def test_routing_context_app_call_loopback_big_packets():
app_call_task.cancel() app_call_task.cancel()
@pytest.mark.skipif(os.getenv("NOSKIP") != "1", reason="unneeded test, only for performance check") @pytest.mark.skipif(
os.getenv("NOSKIP") != "1", reason="unneeded test, only for performance check"
)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_routing_context_app_message_loopback_bandwidth(): async def test_routing_context_app_message_loopback_bandwidth():
app_message_queue: asyncio.Queue = asyncio.Queue() app_message_queue: asyncio.Queue = asyncio.Queue()
async def app_message_queue_update_callback(update: veilid.VeilidUpdate): async def app_message_queue_update_callback(update: veilid.VeilidUpdate):
if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE: if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE:
await app_message_queue.put(True) await app_message_queue.put(True)
hostname, port = server_info() try:
api = await veilid.json_api_connect( api = await api_connector(app_message_queue_update_callback)
hostname, port, app_message_queue_update_callback except VeilidTestConnectionError:
) pytest.skip("Unable to connect to veilid-server.")
return
async with api: async with api:
# purge routes to ensure we start fresh # purge routes to ensure we start fresh
await api.debug("purge routes") await api.debug("purge routes")
@ -265,7 +275,6 @@ async def test_routing_context_app_message_loopback_bandwidth():
# rc = await (await api.new_routing_context()).with_privacy() # rc = await (await api.new_routing_context()).with_privacy()
rc = await api.new_routing_context() rc = await api.new_routing_context()
async with rc: async with rc:
# make a new local private route # make a new local private route
prl, blob = await api.new_private_route() prl, blob = await api.new_private_route()
@ -275,12 +284,9 @@ async def test_routing_context_app_message_loopback_bandwidth():
# do this test 1000 times # do this test 1000 times
message = random.randbytes(16384) message = random.randbytes(16384)
for _ in range(10000): for _ in range(10000):
# send a random sized random app message to our own private route # send a random sized random app message to our own private route
await rc.app_message(prr, message) await rc.app_message(prr, message)
# we should get the same number of messages back (not storing all that data) # we should get the same number of messages back (not storing all that data)
for _ in range(10000): for _ in range(10000):
await asyncio.wait_for( await asyncio.wait_for(app_message_queue.get(), timeout=10)
app_message_queue.get(), timeout=10
)