From 20bd0aa5bf2ac0b370088a95e2bfcffbee2062d0 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sun, 23 Mar 2025 13:24:09 -0400 Subject: [PATCH] [ci skip] fix watch value integeration test and type assertion fix for watch_dht_values --- veilid-python/tests/test_dht.py | 193 +++++++++++++++++-------------- veilid-python/veilid/api.py | 2 +- veilid-python/veilid/json_api.py | 2 +- 3 files changed, 106 insertions(+), 91 deletions(-) diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py index 7d9a2eae..6a704a0c 100644 --- a/veilid-python/tests/test_dht.py +++ b/veilid-python/tests/test_dht.py @@ -7,7 +7,7 @@ import time import os import veilid -from veilid import ValueSubkey +from veilid import ValueSubkey, Timestamp ################################################################## BOGUS_KEY = veilid.TypedKey.from_value( @@ -245,8 +245,7 @@ async def test_open_writer_dht_value(api_connection: veilid.VeilidAPI): await rc.delete_dht_record(key) -# @pytest.mark.skipif(os.getenv("INTEGRATION") != "1", reason="integration test requires two servers running") -@pytest.mark.skip(reason = "don't work yet") +@pytest.mark.skipif(os.getenv("INTEGRATION") != "1", reason="integration test requires two servers running") @pytest.mark.asyncio async def test_watch_dht_values(): @@ -256,111 +255,129 @@ async def test_watch_dht_values(): if update.kind == veilid.VeilidUpdateKind.VALUE_CHANGE: await value_change_queue.put(update) + async def null_update_callback(update: veilid.VeilidUpdate): + pass + try: - api = await veilid.api_connector(value_change_update_callback) + api0 = await veilid.api_connector(value_change_update_callback, 0) except veilid.VeilidConnectionError: - pytest.skip("Unable to connect to veilid-server.") + pytest.skip("Unable to connect to veilid-server 0.") - # Make two routing contexts, one with and one without safety - # So we can pretend to be a different node and get the watch updates - # Normally they would not get sent if the set comes from the same target - # as the watch's target - - # XXX: this logic doesn't work because our node still suppresses updates - # XXX: if the value hasn't changed in the local record store - rcWatch = await api.new_routing_context() - - rcSet = await (await api.new_routing_context()).with_safety(veilid.SafetySelection.unsafe()) - async with rcWatch, rcSet: - # Make a DHT record - rec = await rcWatch.create_dht_record(veilid.DHTSchema.dflt(10)) + try: + api1 = await veilid.api_connector(null_update_callback, 1) + except veilid.VeilidConnectionError: + pytest.skip("Unable to connect to veilid-server 1.") - # Set some subkey we care about - vd = await rcWatch.set_dht_value(rec.key, ValueSubkey(3), b"BLAH BLAH BLAH") - assert vd is None + async with api0, api1: + # purge local and remote record stores to ensure we start fresh + await api0.debug("record purge local") + await api0.debug("record purge remote") + await api1.debug("record purge local") + await api1.debug("record purge remote") - # Make a watch on that subkey - ts = await rcWatch.watch_dht_values(rec.key, [], 0, 0xFFFFFFFF) - assert ts != 0 + # make routing contexts + rc0 = await api0.new_routing_context() + rc1 = await api1.new_routing_context() + async with rc0, rc1: - # Reopen without closing to change routing context and not lose watch - rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair()) - - # Now set the subkey and trigger an update - vd = await rcSet.set_dht_value(rec.key, ValueSubkey(3), b"BLAH") - assert vd is None - - # Now we should NOT get an update because the update is the same as our local copy - update = None - try: - update = await asyncio.wait_for(value_change_queue.get(), timeout=5) - except asyncio.TimeoutError: - pass - assert update is None + # Server 0: Make a DHT record + rec0 = await rc0.create_dht_record(veilid.DHTSchema.dflt(10)) - # Now set multiple subkeys and trigger an update - vd = await asyncio.gather(*[rcSet.set_dht_value(rec.key, ValueSubkey(3), b"BLAH BLAH"), rcSet.set_dht_value(rec.key, ValueSubkey(4), b"BZORT")]) - assert vd == [None, None] + # Server 0: Set some subkey we care about + vd = await rc0.set_dht_value(rec0.key, ValueSubkey(3), b"BLAH") + assert vd is None - # Wait for the update - upd = await asyncio.wait_for(value_change_queue.get(), timeout=5) + # Server 0: Make a watch on all the subkeys + ts = await rc0.watch_dht_values(rec0.key, [], Timestamp(0), 0xFFFFFFFF) + assert ts != 0 - # Verify the update came back but we don't get a new value because the sequence number is the same - assert upd.detail.key == rec.key - assert upd.detail.count == 0xFFFFFFFD - assert upd.detail.subkeys == [(3, 4)] - assert upd.detail.value is None + # Server 1: Open the subkey + rec1 = await rc1.open_dht_record(rec0.key, rec0.owner_key_pair()) - # Reopen without closing to change routing context and not lose watch - rec = await rcWatch.open_dht_record(rec.key, rec.owner_key_pair()) + # Server 1: Now set the subkey and trigger an update + vd = await rc1.set_dht_value(rec1.key, ValueSubkey(3), b"BLAH") + assert vd is None - # Cancel some subkeys we don't care about - still_active = await rcWatch.cancel_dht_watch(rec.key, [(ValueSubkey(0), ValueSubkey(2))]) - assert still_active + # Server 0: Now we should NOT get an update because the update is the same as our local copy + update = None + try: + update = await asyncio.wait_for(value_change_queue.get(), timeout=10) + except asyncio.TimeoutError: + pass + assert update is None - # Reopen without closing to change routing context and not lose watch - rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair()) + # Server 1: Now set subkey and trigger an update + vd = await rc1.set_dht_value(rec1.key, ValueSubkey(3), b"BLAH BLAH") + assert vd is None - # Now set multiple subkeys and trigger an update - vd = await asyncio.gather(*[rcSet.set_dht_value(rec.key, ValueSubkey(3), b"BLAH BLAH BLAH"), rcSet.set_dht_value(rec.key, ValueSubkey(5), b"BZORT BZORT")]) - assert vd == [None, None] + # Server 0: Wait for the update + upd = await asyncio.wait_for(value_change_queue.get(), timeout=10) - # Wait for the update, this longer timeout seems to help the flaky check below - upd = await asyncio.wait_for(value_change_queue.get(), timeout=10) + # Server 0: Verify the update came back with the first changed subkey's data + assert upd.detail.key == rec0.key + assert upd.detail.count == 0xFFFFFFFE + assert upd.detail.subkeys == [(3, 3)] + assert upd.detail.value.data == b"BLAH BLAH" - # Verify the update came back but we don't get a new value because the sequence number is the same - assert upd.detail.key == rec.key + # Server 1: Now set subkey and trigger an update + vd = await rc1.set_dht_value(rec1.key, ValueSubkey(4), b"BZORT") + assert vd is None - # This check is flaky on slow connections and often fails with different counts - assert upd.detail.count == 0xFFFFFFFC - assert upd.detail.subkeys == [(3, 3), (5, 5)] - assert upd.detail.value is None + # Server 0: Wait for the update + upd = await asyncio.wait_for(value_change_queue.get(), timeout=10) - # Reopen without closing to change routing context and not lose watch - rec = await rcWatch.open_dht_record(rec.key, rec.owner_key_pair()) + # Server 0: Verify the update came back with the first changed subkey's data + assert upd.detail.key == rec0.key + assert upd.detail.count == 0xFFFFFFFD + assert upd.detail.subkeys == [(4, 4)] + assert upd.detail.value.data == b"BZORT" - # Now cancel the update - still_active = await rcWatch.cancel_dht_watch(rec.key, [(ValueSubkey(3), ValueSubkey(9))]) - assert not still_active + # Server 0: Cancel some subkeys we don't care about + still_active = await rc0.cancel_dht_watch(rec0.key, [(ValueSubkey(0), ValueSubkey(3))]) + assert still_active - # Reopen without closing to change routing context and not lose watch - rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair()) + # Server 1: Now set multiple subkeys and trigger an update + vd = await asyncio.gather(*[rc1.set_dht_value(rec1.key, ValueSubkey(3), b"BLAH BLAH BLAH"), rc1.set_dht_value(rec1.key, ValueSubkey(4), b"BZORT BZORT")]) + assert vd == [None, None] - # Now set multiple subkeys - vd = await asyncio.gather(*[rcSet.set_dht_value(rec.key, ValueSubkey(3), b"BLAH BLAH BLAH BLAH"), rcSet.set_dht_value(rec.key, ValueSubkey(5), b"BZORT BZORT BZORT")]) - assert vd == [None, None] - - # Now we should NOT get an update - update = None - try: - update = await asyncio.wait_for(value_change_queue.get(), timeout=5) - except asyncio.TimeoutError: - pass - assert update is None + # Server 0: Wait for the update + upd = await asyncio.wait_for(value_change_queue.get(), timeout=10) - # Clean up - await rcSet.close_dht_record(rec.key) - await rcSet.delete_dht_record(rec.key) + # Server 0: Verify only one update came back + assert upd.detail.key == rec0.key + assert upd.detail.count == 0xFFFFFFFC + assert upd.detail.subkeys == [(4, 4)] + assert upd.detail.value.data == b"BZORT BZORT" + + # Server 0: Now we should NOT get any other update + update = None + try: + update = await asyncio.wait_for(value_change_queue.get(), timeout=10) + except asyncio.TimeoutError: + pass + assert update is None + + # Now cancel the update + still_active = await rc0.cancel_dht_watch(rec0.key, [(ValueSubkey(3), ValueSubkey(9))]) + assert not still_active + + # Now set multiple subkeys + vd = await asyncio.gather(*[rc1.set_dht_value(rec1.key, ValueSubkey(3), b"BLAH BLAH BLAH BLAH"), rc1.set_dht_value(rec1.key, ValueSubkey(5), b"BZORT BZORT BZORT")]) + assert vd == [None, None] + + # Now we should NOT get an update + update = None + try: + update = await asyncio.wait_for(value_change_queue.get(), timeout=10) + except asyncio.TimeoutError: + pass + assert update is None + + # Clean up + await rc1.close_dht_record(rec1.key) + await rc1.delete_dht_record(rec1.key) + await rc0.close_dht_record(rec0.key) + await rc0.delete_dht_record(rec0.key) @pytest.mark.asyncio @@ -486,8 +503,6 @@ async def test_schema_limit_smpl(api_connection: veilid.VeilidAPI): - - @pytest.mark.skipif(os.getenv("INTEGRATION") != "1", reason="integration test requires two servers running") @pytest.mark.asyncio async def test_dht_integration_writer_reader(): diff --git a/veilid-python/veilid/api.py b/veilid-python/veilid/api.py index b54786de..f21d0c05 100644 --- a/veilid-python/veilid/api.py +++ b/veilid-python/veilid/api.py @@ -93,7 +93,7 @@ class RoutingContext(ABC): self, key: types.TypedKey, subkeys: list[tuple[types.ValueSubkey, types.ValueSubkey]], - expiration: types.Timestamp = 0, + expiration: types.Timestamp = types.Timestamp(0), count: int = 0xFFFFFFFF, ) -> types.Timestamp: pass diff --git a/veilid-python/veilid/json_api.py b/veilid-python/veilid/json_api.py index 0a88051c..a403b6a4 100644 --- a/veilid-python/veilid/json_api.py +++ b/veilid-python/veilid/json_api.py @@ -741,7 +741,7 @@ class _JsonRoutingContext(RoutingContext): self, key: TypedKey, subkeys: list[tuple[ValueSubkey, ValueSubkey]], - expiration: Timestamp = 0, + expiration: Timestamp = Timestamp(0), count: int = 0xFFFFFFFF, ) -> Timestamp: assert isinstance(key, TypedKey)