From c468d9c85076387144116089d970990648e2afe1 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Mon, 18 Mar 2024 14:16:03 -0400 Subject: [PATCH] more python test cleanup and fixes --- .../example/integration_test/test_dht.dart | 3 + veilid-python/tests/test_dht.py | 139 +++++++++++++++--- veilid-python/veilid/json_api.py | 8 +- veilid-python/veilid/state.py | 6 +- veilid-python/veilid/types.py | 3 + 5 files changed, 132 insertions(+), 27 deletions(-) diff --git a/veilid-flutter/example/integration_test/test_dht.dart b/veilid-flutter/example/integration_test/test_dht.dart index 5af537c0..a6bc3b1c 100644 --- a/veilid-flutter/example/integration_test/test_dht.dart +++ b/veilid-flutter/example/integration_test/test_dht.dart @@ -335,8 +335,10 @@ Future testWatchDHTValues(Stream updateStream) async { // Reopen without closing to change routing context and not lose watch rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); + // Set the value without a watch expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH")), isNull); + // Now we should NOT get an update if (await valueChangeQueueIterator .moveNext() .timeout(const Duration(seconds: 5), onTimeout: () { @@ -345,6 +347,7 @@ Future testWatchDHTValues(Stream updateStream) async { fail("should not have a change"); } + // Clean up await rcSet.closeDHTRecord(rec.key); await rcSet.deleteDHTRecord(rec.key); } finally { diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py index ec3015a9..bdd99ace 100644 --- a/veilid-python/tests/test_dht.py +++ b/veilid-python/tests/test_dht.py @@ -5,6 +5,7 @@ import pytest import asyncio import json from . import * +from .api import VeilidTestConnectionError, api_connector ################################################################## BOGUS_KEY = veilid.TypedKey.from_value( @@ -13,7 +14,9 @@ BOGUS_KEY = veilid.TypedKey.from_value( @pytest.mark.asyncio async def test_get_dht_value_unopened(api_connection: veilid.VeilidAPI): - rc = await api_connection.new_routing_context() + rc = await (await api_connection.new_routing_context()).with_sequencing( + veilid.Sequencing.ENSURE_ORDERED + ) async with rc: with pytest.raises(veilid.VeilidAPIError): out = await rc.get_dht_value(BOGUS_KEY, veilid.ValueSubkey(0), False) @@ -21,7 +24,9 @@ async def test_get_dht_value_unopened(api_connection: veilid.VeilidAPI): @pytest.mark.asyncio async def test_open_dht_record_nonexistent_no_writer(api_connection: veilid.VeilidAPI): - rc = await api_connection.new_routing_context() + rc = await (await api_connection.new_routing_context()).with_sequencing( + veilid.Sequencing.ENSURE_ORDERED + ) async with rc: with pytest.raises(veilid.VeilidAPIError): out = await rc.open_dht_record(BOGUS_KEY, None) @@ -29,7 +34,9 @@ async def test_open_dht_record_nonexistent_no_writer(api_connection: veilid.Veil @pytest.mark.asyncio async def test_close_dht_record_nonexistent(api_connection: veilid.VeilidAPI): - rc = await api_connection.new_routing_context() + rc = await (await api_connection.new_routing_context()).with_sequencing( + veilid.Sequencing.ENSURE_ORDERED + ) async with rc: with pytest.raises(veilid.VeilidAPIError): await rc.close_dht_record(BOGUS_KEY) @@ -37,7 +44,9 @@ async def test_close_dht_record_nonexistent(api_connection: veilid.VeilidAPI): @pytest.mark.asyncio async def test_delete_dht_record_nonexistent(api_connection: veilid.VeilidAPI): - rc = await api_connection.new_routing_context() + rc = await (await api_connection.new_routing_context()).with_sequencing( + veilid.Sequencing.ENSURE_ORDERED + ) async with rc: with pytest.raises(veilid.VeilidAPIError): await rc.delete_dht_record(BOGUS_KEY) @@ -45,7 +54,9 @@ async def test_delete_dht_record_nonexistent(api_connection: veilid.VeilidAPI): @pytest.mark.asyncio async def test_create_delete_dht_record_simple(api_connection: veilid.VeilidAPI): - rc = await api_connection.new_routing_context() + rc = await (await api_connection.new_routing_context()).with_sequencing( + veilid.Sequencing.ENSURE_ORDERED + ) async with rc: rec = await rc.create_dht_record( veilid.DHTSchema.dflt(1), veilid.CryptoKind.CRYPTO_KIND_VLD0 @@ -56,7 +67,9 @@ async def test_create_delete_dht_record_simple(api_connection: veilid.VeilidAPI) @pytest.mark.asyncio async def test_get_dht_value_nonexistent(api_connection: veilid.VeilidAPI): - rc = await api_connection.new_routing_context() + rc = await (await api_connection.new_routing_context()).with_sequencing( + veilid.Sequencing.ENSURE_ORDERED + ) async with rc: rec = await rc.create_dht_record(veilid.DHTSchema.dflt(1)) assert await rc.get_dht_value(rec.key, 0, False) == None @@ -66,7 +79,9 @@ async def test_get_dht_value_nonexistent(api_connection: veilid.VeilidAPI): @pytest.mark.asyncio async def test_set_get_dht_value(api_connection: veilid.VeilidAPI): - rc = await api_connection.new_routing_context() + rc = await (await api_connection.new_routing_context()).with_sequencing( + veilid.Sequencing.ENSURE_ORDERED + ) async with rc: rec = await rc.create_dht_record(veilid.DHTSchema.dflt(2)) @@ -93,7 +108,9 @@ async def test_set_get_dht_value(api_connection: veilid.VeilidAPI): @pytest.mark.asyncio async def test_open_writer_dht_value(api_connection: veilid.VeilidAPI): - rc = await api_connection.new_routing_context() + rc = await (await api_connection.new_routing_context()).with_sequencing( + veilid.Sequencing.ENSURE_ORDERED + ) async with rc: rec = await rc.create_dht_record(veilid.DHTSchema.dflt(2)) key = rec.key @@ -204,38 +221,116 @@ async def test_open_writer_dht_value(api_connection: veilid.VeilidAPI): await rc.delete_dht_record(key) @pytest.mark.asyncio -async def test_watch_dht_values(api_connection: veilid.VeilidAPI): - rc = await api_connection.new_routing_context() - async with rc: - rec = await rc.create_dht_record(veilid.DHTSchema.dflt(10)) +async def test_watch_dht_values(): - vd = await rc.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH") + value_change_queue: asyncio.Queue[veilid.VeilidUpdate] = asyncio.Queue() + + async def value_change_update_callback(update: veilid.VeilidUpdate): + if update.kind == veilid.VeilidUpdateKind.VALUE_CHANGE: + await value_change_queue.put(update) + + try: + api = await api_connector(value_change_update_callback) + except VeilidTestConnectionError: + pytest.skip("Unable to connect to veilid-server.") + return + + # Make two routing contexts, one with and one without safety + # So we can pretend to be a different node and get the watch updates + # Normally they would not get sent if the set comes from the same target + # as the watch's target + rcWatch = await (await api.new_routing_context()).with_sequencing( + veilid.Sequencing.ENSURE_ORDERED + ) + rcSet = await (await api.new_routing_context()).with_safety( + veilid.SafetySelection.unsafe(veilid.Sequencing.ENSURE_ORDERED) + ) + async with rcWatch, rcSet: + # Make a DHT record + rec = await rcWatch.create_dht_record(veilid.DHTSchema.dflt(10)) + + # Set some subkey we care about + vd = await rcWatch.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH") assert vd == None - ts = await rc.watch_dht_values(rec.key, [], 0, 0xFFFFFFFF) + # Make a watch on that subkey + ts = await rcWatch.watch_dht_values(rec.key, [], 0, 0xFFFFFFFF) assert ts != 0 - vd = await rc.set_dht_value(rec.key, 3, b"BLAH") + # Reopen without closing to change routing context and not lose watch + rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair()) + + # Now set the subkey and trigger an update + vd = await rcSet.set_dht_value(rec.key, 3, b"BLAH") assert vd == None - still_active = await rc.cancel_dht_watch(rec.key, [(0, 2)]) + # Wait for the update + upd = await asyncio.wait_for(value_change_queue.get(), timeout=5) + + # Verify the update + assert upd.detail.key == rec.key + assert upd.detail.count == 0xFFFFFFFE + assert upd.detail.subkeys == [(3,3)] + assert upd.detail.value.seq == 1 + assert upd.detail.value.data == b"BLAH" + assert upd.detail.value.writer == rec.owner + + # Reopen without closing to change routing context and not lose watch + rec = await rcWatch.open_dht_record(rec.key, rec.owner_key_pair()) + + # Cancel some subkeys we don't care about + still_active = await rcWatch.cancel_dht_watch(rec.key, [(0, 2)]) assert still_active == True - vd = await rc.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH") + # Reopen without closing to change routing context and not lose watch + rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair()) + + # Change our subkey + vd = await rcSet.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH") assert vd == None - still_active = await rc.cancel_dht_watch(rec.key, [(3, 9)]) + # Wait for the update + upd = await asyncio.wait_for(value_change_queue.get(), timeout=5) + + # Verify the update + assert upd.detail.key == rec.key + assert upd.detail.count == 0xFFFFFFFD + assert upd.detail.subkeys == [(3,3)] + assert upd.detail.value.seq == 2 + assert upd.detail.value.data == b"BLAH BLAH BLAH" + assert upd.detail.value.writer == rec.owner + + # Reopen without closing to change routing context and not lose watch + rec = await rcWatch.open_dht_record(rec.key, rec.owner_key_pair()) + + # Now cancel the update + still_active = await rcWatch.cancel_dht_watch(rec.key, [(3, 9)]) assert still_active == False - vd = await rc.set_dht_value(rec.key, 3, b"BLAH") + # Reopen without closing to change routing context and not lose watch + rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair()) + + # Set the value without a watch + vd = await rcSet.set_dht_value(rec.key, 3, b"BLAH") assert vd == None - await rc.close_dht_record(rec.key) - await rc.delete_dht_record(rec.key) + # Now we should NOT get an update + update = None + try: + update = await asyncio.wait_for(value_change_queue.get(), timeout=5) + except asyncio.TimeoutError: + pass + assert update == None + + # Clean up + await rcSet.close_dht_record(rec.key) + await rcSet.delete_dht_record(rec.key) @pytest.mark.asyncio async def test_inspect_dht_record(api_connection: veilid.VeilidAPI): - rc = await api_connection.new_routing_context() + rc = await (await api_connection.new_routing_context()).with_sequencing( + veilid.Sequencing.ENSURE_ORDERED + ) async with rc: rec = await rc.create_dht_record(veilid.DHTSchema.dflt(2)) diff --git a/veilid-python/veilid/json_api.py b/veilid-python/veilid/json_api.py index 3a0a9adb..0ff6ee96 100644 --- a/veilid-python/veilid/json_api.py +++ b/veilid-python/veilid/json_api.py @@ -111,8 +111,12 @@ class _JsonVeilidAPI(VeilidAPI): try: self.reader = None assert self.writer is not None - self.writer.close() - await self.writer.wait_closed() + try: + self.writer.close() + await self.writer.wait_closed() + except: + # Already closed + pass self.writer = None for reqid, reqfuture in self.in_flight_requests.items(): diff --git a/veilid-python/veilid/state.py b/veilid-python/veilid/state.py index 373052e3..df273647 100644 --- a/veilid-python/veilid/state.py +++ b/veilid-python/veilid/state.py @@ -356,11 +356,11 @@ class VeilidRouteChange: class VeilidValueChange: key: TypedKey - subkeys: list[ValueSubkey] + subkeys: list[tuple[ValueSubkey, ValueSubkey]] count: int value: ValueData - def __init__(self, key: TypedKey, subkeys: list[ValueSubkey], count: int, value: ValueData): + def __init__(self, key: TypedKey, subkeys: list[tuple[ValueSubkey, ValueSubkey]], count: int, value: ValueData): self.key = key self.subkeys = subkeys self.count = count @@ -371,7 +371,7 @@ class VeilidValueChange: """JSON object hook""" return cls( TypedKey(j["key"]), - [ValueSubkey(key) for key in j["subkeys"]], + [(p[0], p[1]) for p in j["subkeys"]], j["count"], ValueData.from_json(j["value"]), ) diff --git a/veilid-python/veilid/types.py b/veilid-python/veilid/types.py index c2c2313e..1ba74dff 100644 --- a/veilid-python/veilid/types.py +++ b/veilid-python/veilid/types.py @@ -363,6 +363,9 @@ class DHTRecordDescriptor: def __repr__(self) -> str: return f"<{self.__class__.__name__}(key={self.key!r}, owner={self.owner!r}, owner_secret={self.owner_secret!r}, schema={self.schema!r})>" + def owner_key_pair(self) -> Optional[KeyPair]: + return KeyPair.from_parts(self.owner, self.owner_secret) + @classmethod def from_json(cls, j: dict) -> Self: return cls(