validate types for python api calls

add more stress tests to python
fix deadlock in veilid_api duration testing
correct offline_subkey_writes inflight reporting
This commit is contained in:
Christien Rioux 2025-03-11 09:31:15 -04:00
parent 3010452274
commit 67eeb87c28
5 changed files with 499 additions and 90 deletions

View file

@ -1,6 +1,6 @@
# Routing context veilid tests
from typing import Awaitable, Callable
from typing import Any, Awaitable, Callable, Optional
import pytest
import asyncio
import time
@ -374,13 +374,13 @@ async def test_inspect_dht_record(api_connection: veilid.VeilidAPI):
rr = await rc.inspect_dht_record(rec.key, [], veilid.DHTReportScope.LOCAL)
print("rr: {}", rr.__dict__)
assert rr.subkeys == [[0,1]]
assert rr.subkeys == [(0,1)]
assert rr.local_seqs == [0, 0xFFFFFFFF]
assert rr.network_seqs == []
rr2 = await rc.inspect_dht_record(rec.key, [], veilid.DHTReportScope.SYNC_GET)
print("rr2: {}", rr2.__dict__)
assert rr2.subkeys == [[0,1]]
assert rr2.subkeys == [(0,1)]
assert rr2.local_seqs == [0, 0xFFFFFFFF]
assert rr2.network_seqs == [0, 0xFFFFFFFF]
@ -390,42 +390,28 @@ async def test_inspect_dht_record(api_connection: veilid.VeilidAPI):
async def _run_test_schema_limit(api_connection: veilid.VeilidAPI, open_record: Callable[[veilid.RoutingContext, int], Awaitable[tuple[veilid.TypedKey, veilid.PublicKey, veilid.SecretKey]]], count: int, test_data: bytes, ):
async def _run_test_schema_limit(api_connection: veilid.VeilidAPI, open_record: Callable[[veilid.RoutingContext, int], Awaitable[tuple[veilid.DHTRecordDescriptor, Optional[veilid.KeyPair]]]], count: int, test_data: bytes):
rc = await api_connection.new_routing_context()
async with rc:
(key, owner, secret) = await open_record(rc, count)
print(f'{key} {owner}:{secret}')
(desc, writer) = await open_record(rc, count)
print(f'{desc.key} {writer}')
# write dht records on server 0
records = []
print(f'writing {count} subkeys')
for n in range(count):
await rc.set_dht_value(key, ValueSubkey(n), test_data)
await rc.set_dht_value(desc.key, ValueSubkey(n), test_data)
print(f' {n}')
print('syncing records to the network')
await sync(rc, [desc])
while True:
donerecords = set()
subkeysleft = 0
rr = await rc.inspect_dht_record(key, [])
left = 0; [left := left + (x[1]-x[0]+1) for x in rr.offline_subkeys]
if left == 0:
break
print(f' {left} subkeys left')
time.sleep(1)
await rc.close_dht_record(key)
await api_connection.debug("record purge local")
await api_connection.debug("record purge remote")
await rc.close_dht_record(desc.key)
# read dht records on server 0
print(f'reading {count} subkeys')
desc1 = await rc.open_dht_record(key)
desc1 = await rc.open_dht_record(desc.key)
for n in range(count):
vd0 = await rc.get_dht_value(key, ValueSubkey(n), force_refresh=True)
vd0 = await rc.get_dht_value(desc1.key, ValueSubkey(n))
assert vd0.data == test_data
print(f' {n}')
@ -433,10 +419,10 @@ async def _run_test_schema_limit(api_connection: veilid.VeilidAPI, open_record:
@pytest.mark.asyncio
async def test_schema_limit_dflt(api_connection: veilid.VeilidAPI):
async def open_record(rc: veilid.RoutingContext, count: int) -> tuple[veilid.TypedKey, veilid.PublicKey, veilid.SecretKey]:
async def open_record(rc: veilid.RoutingContext, count: int) -> tuple[veilid.DHTRecordDescriptor, Optional[veilid.KeyPair]]:
schema = veilid.DHTSchema.dflt(count)
desc = await rc.create_dht_record(schema)
return (desc.key, desc.owner, desc.owner_secret)
return (desc, desc.owner_key_pair())
print("Test with maximum number of subkeys before lower limit hit")
@ -474,7 +460,7 @@ async def test_schema_limit_smpl(api_connection: veilid.VeilidAPI):
desc = await rc.create_dht_record(schema)
await rc.open_dht_record(desc.key, writer_keypair)
return (desc.key, writer_keypair.key(), writer_keypair.secret())
return (desc, writer_keypair)
print("Test with maximum number of subkeys before lower limit hit")
TEST_DATA = b"A" * 32768
@ -545,18 +531,7 @@ async def test_dht_integration_writer_reader():
await rc0.set_dht_value(desc.key, ValueSubkey(0), TEST_DATA)
print('syncing records to the network')
recleft = len(records)
for desc in records:
while True:
rr = await rc0.inspect_dht_record(desc.key, [])
left = 0; [left := left + (x[1]-x[0]+1) for x in rr.offline_subkeys]
if left == 0:
await rc0.close_dht_record(desc.key)
break
print(f' {recleft} records {left} subkeys left')
time.sleep(0.1)
recleft-=1
await sync(rc0, records)
# read dht records on server 1
print(f'reading {COUNT} records')
@ -636,6 +611,96 @@ async def test_dht_write_read_local():
print(f' {n}')
n += 1
@pytest.mark.skipif(os.getenv("STRESS") != "1", reason="stress test takes a long time")
@pytest.mark.asyncio
async def test_dht_write_read_full_subkeys_local():
async def null_update_callback(update: veilid.VeilidUpdate):
pass
try:
api0 = await veilid.api_connector(null_update_callback, 0)
except veilid.VeilidConnectionError:
pytest.skip("Unable to connect to veilid-server 0.")
async with api0:
# purge local and remote record stores to ensure we start fresh
await api0.debug("record purge local")
await api0.debug("record purge remote")
# make routing contexts
rc0 = await api0.new_routing_context()
async with rc0:
# Number of records
COUNT = 8
# Number of subkeys per record
SUBKEY_COUNT = 32
# Nonce to encrypt test data
NONCE = veilid.Nonce.from_bytes(b"A"*24)
# Secret to encrypt test data
SECRET = veilid.SharedSecret.from_bytes(b"A"*32)
# Max subkey size
MAX_SUBKEY_SIZE = min(32768, 1024*1024/SUBKEY_COUNT)
# MAX_SUBKEY_SIZE = 256
# write dht records on server 0
records = []
subkey_data_list = []
schema = veilid.DHTSchema.dflt(SUBKEY_COUNT)
print(f'writing {COUNT} records with full subkeys')
init_futures = set()
for n in range(COUNT):
# Make encrypted data that is consistent and hard to compress
subkey_data = bytes(chr(ord("A")+n)*MAX_SUBKEY_SIZE, 'ascii')
print(f"subkey_data({n}):len={len(subkey_data)}")
cs = await api0.best_crypto_system()
async with cs:
subkey_data = await cs.crypt_no_auth(subkey_data, NONCE, SECRET)
subkey_data_list.append(subkey_data)
desc = await rc0.create_dht_record(schema)
records.append(desc)
for i in range(SUBKEY_COUNT):
init_futures.add(rc0.set_dht_value(desc.key, ValueSubkey(i), subkey_data))
print(f' {n}: {desc.key} {desc.owner}:{desc.owner_secret}')
# Wait for all records to synchronize, with progress bars
await sync_win(rc0, records, SUBKEY_COUNT, init_futures)
for desc0 in records:
await rc0.close_dht_record(desc0.key)
await api0.debug("record purge local")
await api0.debug("record purge remote")
# read dht records on server 0
print(f'reading {COUNT} records')
for n, desc0 in enumerate(records):
desc1 = await rc0.open_dht_record(desc0.key)
for i in range(SUBKEY_COUNT):
vd0 = None
while vd0 == None:
vd0 = await rc0.get_dht_value(desc1.key, ValueSubkey(i), force_refresh=True)
if vd0 != None:
assert vd0.data == subkey_data_list[n]
break
time.sleep(1)
print(f"retrying record {n} subkey {i}")
await rc0.close_dht_record(desc1.key)
print(f' {n}')
async def sync(rc: veilid.RoutingContext, records: list[veilid.DHTRecordDescriptor]):
print('syncing records to the network')
syncrecords = records.copy()
@ -646,9 +711,119 @@ async def sync(rc: veilid.RoutingContext, records: list[veilid.DHTRecordDescript
rr = await rc.inspect_dht_record(desc.key, [])
left = 0; [left := left + (x[1]-x[0]+1) for x in rr.offline_subkeys]
if left == 0:
donerecords.add(desc)
if veilid.ValueSeqNum.NONE not in rr.local_seqs:
donerecords.add(desc)
else:
subkeysleft += left
syncrecords = [x for x in syncrecords if x not in donerecords]
print(f' {len(syncrecords)} records {subkeysleft} subkeys left')
time.sleep(1)
async def sync_win(
rc: veilid.RoutingContext,
records: list[veilid.DHTRecordDescriptor],
subkey_count: int,
init_futures: set[Awaitable[Any]]
):
import curses
screen = curses.initscr()
curses.start_color()
curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_BLUE)
curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_CYAN)
curses.init_pair(3, curses.COLOR_BLACK, curses.COLOR_YELLOW)
curses.init_pair(4, curses.COLOR_BLACK, curses.COLOR_GREEN)
HEIGHT=len(records) + 3
GRAPHWIDTH = subkey_count
WIDTH=GRAPHWIDTH + 4 + 1 + 43 + 2
cur_lines = curses.LINES
cur_cols = curses.COLS
win = curses.newwin(HEIGHT, WIDTH,
max(0, int(cur_lines/2) - int(HEIGHT/2)),
max(0, int(cur_cols/2) - int(WIDTH/2)))
win.clear()
win.border(0,0,0,0)
win.nodelay(True)
# Record inspection and completion state
# Records we are done inspecting and have finished sync
donerecords: set[veilid.TypedKey] = set()
# Records we are currently inspecting that are in the futures set
futurerecords: set[veilid.TypedKey] = set()
# All the futures we are waiting for
futures = set()
# The record report state
recordreports: dict[veilid.TypedKey, veilid.DHTRecordReport] = dict()
# Add initial futures with None key
for fut in init_futures:
async def _do_init_fut(fut):
return (None, await fut)
futures.add(asyncio.create_task(_do_init_fut(fut)))
# Loop until all records are completed
while len(donerecords) != len(records):
# Update the futures with inspects for unfinished records
for n, desc in enumerate(records):
if desc.key in donerecords or desc.key in futurerecords:
continue
async def _do_inspect(key: veilid.TypedKey):
return (key, await rc.inspect_dht_record(key, []))
futures.add(asyncio.create_task(_do_inspect(desc.key)))
futurerecords.add(desc.key)
# Wait for some futures to finish
done, futures = await asyncio.wait(futures, return_when = asyncio.FIRST_COMPLETED)
# Process finished futures into the state
for rr_fut in done:
key: veilid.TypedKey
rr: veilid.DHTRecordReport
key, rr = await rr_fut
if key is not None:
futurerecords.remove(key)
if len(rr.subkeys) == 1 and rr.subkeys[0] == (0, subkey_count-1) and veilid.ValueSeqNum.NONE not in rr.local_seqs and len(rr.offline_subkeys) == 0:
if key in recordreports:
del recordreports[key]
donerecords.add(key)
else:
recordreports[key] = rr
# Re-render the state
if cur_lines != curses.LINES or cur_cols != curses.COLS:
cur_lines = curses.LINES
cur_cols = curses.COLS
win.move(
max(0, int(cur_lines/2) - int(HEIGHT/2)),
max(0, int(cur_cols/2) - int(WIDTH/2)))
win.border(0,0,0,0)
win.addstr(1, 1, "syncing records to the network", curses.color_pair(0))
for n, rr in enumerate(records):
key = rr.key
win.addstr(n+2, GRAPHWIDTH+1, key, curses.color_pair(0))
if key in donerecords:
win.addstr(n+2, 1, " " * subkey_count, curses.color_pair(4))
elif key in recordreports:
rr = recordreports[key]
win.addstr(n+2, 1, " " * subkey_count, curses.color_pair(1))
for (a,b) in rr.subkeys:
for m in range(a, b+1):
if rr.local_seqs[m] != veilid.ValueSeqNum.NONE:
win.addstr(n+2, m+1, " ", curses.color_pair(2))
for (a,b) in rr.offline_subkeys:
win.addstr(n+2, a+1, " " * (b-a+1), curses.color_pair(3))
else:
win.addstr(n+2, 1, " " * subkey_count, curses.color_pair(1))
win.refresh()
curses.endwin()