diff --git a/veilid-core/src/veilid_api/types/dht/schema/dflt.rs b/veilid-core/src/veilid_api/types/dht/schema/dflt.rs index 500dfb45..efb5bce2 100644 --- a/veilid-core/src/veilid_api/types/dht/schema/dflt.rs +++ b/veilid-core/src/veilid_api/types/dht/schema/dflt.rs @@ -74,7 +74,7 @@ impl DHTSchemaDFLT { } // Value too big - return true; + return false; } // Wrong writer diff --git a/veilid-python/tests/conftest.py b/veilid-python/tests/conftest.py index 875c2df4..c139abb0 100644 --- a/veilid-python/tests/conftest.py +++ b/veilid-python/tests/conftest.py @@ -8,12 +8,10 @@ import pytest_asyncio import veilid from veilid.json_api import _JsonVeilidAPI - pytest_plugins = ("pytest_asyncio",) - async def simple_update_callback(update: veilid.VeilidUpdate): - print(f"VeilidUpdate: {update}") + pass @pytest_asyncio.fixture diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py index f7dc42eb..414967fe 100644 --- a/veilid-python/tests/test_dht.py +++ b/veilid-python/tests/test_dht.py @@ -1,5 +1,6 @@ # Routing context veilid tests +from typing import Awaitable, Callable import pytest import asyncio import time @@ -350,6 +351,120 @@ async def test_inspect_dht_record(api_connection: veilid.VeilidAPI): await rc.delete_dht_record(rec.key) + + +async def _run_test_schema_limit(api_connection: veilid.VeilidAPI, open_record: Callable[[veilid.RoutingContext, int], Awaitable[tuple[veilid.TypedKey, veilid.PublicKey, veilid.SecretKey]]], count: int, test_data: bytes, ): + rc = await api_connection.new_routing_context() + async with rc: + (key, owner, secret) = await open_record(rc, count) + print(f'{key} {owner}:{secret}') + + # write dht records on server 0 + records = [] + print(f'writing {count} subkeys') + for n in range(count): + await rc.set_dht_value(key, ValueSubkey(n), test_data) + print(f' {n}') + + print('syncing records to the network') + + while True: + donerecords = set() + subkeysleft = 0 + + rr = await rc.inspect_dht_record(key, []) + left = 0; [left := left + (x[1]-x[0]+1) for x in rr.offline_subkeys] + if left == 0: + break + print(f' {left} subkeys left') + time.sleep(1) + + await rc.close_dht_record(key) + + await api_connection.debug("record purge local") + await api_connection.debug("record purge remote") + + # read dht records on server 0 + print(f'reading {count} subkeys') + desc1 = await rc.open_dht_record(key) + for n in range(count): + vd0 = await rc.get_dht_value(key, ValueSubkey(n), force_refresh=True) + assert vd0.data == test_data + print(f' {n}') + + +@pytest.mark.asyncio +async def test_schema_limit_dflt(api_connection: veilid.VeilidAPI): + + async def open_record(rc: veilid.RoutingContext, count: int) -> tuple[veilid.TypedKey, veilid.PublicKey, veilid.SecretKey]: + schema = veilid.DHTSchema.dflt(count) + desc = await rc.create_dht_record(schema) + return (desc.key, desc.owner, desc.owner_secret) + + + print("Test with maximum number of subkeys before lower limit hit") + TEST_DATA = b"A" * 32768 + COUNT = 32 + await _run_test_schema_limit(api_connection, open_record, COUNT, TEST_DATA) + + print("Test with maximum number of subkeys before lower limit hit + 1 extra byte") + with pytest.raises(Exception): + TEST_DATA = b"A" * 32769 + COUNT = 32 + await _run_test_schema_limit(api_connection, open_record, COUNT, TEST_DATA) + + print("Test with minimum number of subkeys with lower limit exceeded") + TEST_DATA = b"A" * 31775 + COUNT = 33 + await _run_test_schema_limit(api_connection, open_record, COUNT, TEST_DATA) + + print("Test with minimum number of subkeys with lower limit exceeded + 1 extra byte") + with pytest.raises(Exception): + TEST_DATA = b"A" * 31776 + COUNT = 33 + await _run_test_schema_limit(api_connection, open_record, COUNT, TEST_DATA) + + +@pytest.mark.asyncio +async def test_schema_limit_smpl(api_connection: veilid.VeilidAPI): + + async def open_record(rc: veilid.RoutingContext, count: int) -> tuple[veilid.TypedKey, veilid.PublicKey, veilid.SecretKey]: + cs = await api_connection.best_crypto_system() + async with cs: + writer_keypair = await cs.generate_key_pair() + + schema = veilid.DHTSchema.smpl(0, [veilid.DHTSchemaSMPLMember(writer_keypair.key(), count)]) + desc = await rc.create_dht_record(schema) + await rc.open_dht_record(desc.key, writer_keypair) + + return (desc.key, writer_keypair.key(), writer_keypair.secret()) + + print("Test with maximum number of subkeys before lower limit hit") + TEST_DATA = b"A" * 32768 + COUNT = 32 + await _run_test_schema_limit(api_connection, open_record, COUNT, TEST_DATA) + + print("Test with maximum number of subkeys before lower limit hit + 1 extra byte") + with pytest.raises(Exception): + TEST_DATA = b"A" * 32769 + COUNT = 32 + await _run_test_schema_limit(api_connection, open_record, COUNT, TEST_DATA) + + print("Test with minimum number of subkeys with lower limit exceeded") + TEST_DATA = b"A" * 31775 + COUNT = 33 + await _run_test_schema_limit(api_connection, open_record, COUNT, TEST_DATA) + + print("Test with minimum number of subkeys with lower limit exceeded + 1 extra byte") + with pytest.raises(Exception): + TEST_DATA = b"A" * 31776 + COUNT = 33 + await _run_test_schema_limit(api_connection, open_record, COUNT, TEST_DATA) + + + + + @pytest.mark.skipif(os.getenv("INTEGRATION") != "1", reason="integration test requires two servers running") @pytest.mark.asyncio async def test_dht_integration_writer_reader(): @@ -420,6 +535,7 @@ async def test_dht_integration_writer_reader(): n += 1 +@pytest.mark.skipif(os.getenv("STRESS") != "1", reason="stress test takes a long time") @pytest.mark.asyncio async def test_dht_write_read_local(): diff --git a/veilid-python/veilid/json_api.py b/veilid-python/veilid/json_api.py index 50ab0328..ecdf3dcb 100644 --- a/veilid-python/veilid/json_api.py +++ b/veilid-python/veilid/json_api.py @@ -197,7 +197,7 @@ class _JsonVeilidAPI(VeilidAPI): if reqfuture is not None: reqfuture.set_result(j) else: - print("Missing id: {}", id) + print(f"Missing id: {id}, you may be missing a '.release()' or 'async with'") async def handle_recv_messages(self): # Read lines until we're done @@ -211,6 +211,8 @@ class _JsonVeilidAPI(VeilidAPI): # Parse line as ndjson j = json.loads(linebytes.strip()) + # print(f"j: {j}") + if self.validate_schema: _schema_validate(_VALIDATOR_RECV_MESSAGE, j) # Process the message