[ci skip] fix watch value integeration test and type assertion fix for watch_dht_values

This commit is contained in:
Christien Rioux 2025-03-23 13:24:09 -04:00
parent 72b1434abc
commit 2f70f8382f
3 changed files with 106 additions and 91 deletions

View file

@ -7,7 +7,7 @@ import time
import os import os
import veilid import veilid
from veilid import ValueSubkey from veilid import ValueSubkey, Timestamp
################################################################## ##################################################################
BOGUS_KEY = veilid.TypedKey.from_value( BOGUS_KEY = veilid.TypedKey.from_value(
@ -245,8 +245,7 @@ async def test_open_writer_dht_value(api_connection: veilid.VeilidAPI):
await rc.delete_dht_record(key) await rc.delete_dht_record(key)
# @pytest.mark.skipif(os.getenv("INTEGRATION") != "1", reason="integration test requires two servers running") @pytest.mark.skipif(os.getenv("INTEGRATION") != "1", reason="integration test requires two servers running")
@pytest.mark.skip(reason = "don't work yet")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_watch_dht_values(): async def test_watch_dht_values():
@ -256,111 +255,129 @@ async def test_watch_dht_values():
if update.kind == veilid.VeilidUpdateKind.VALUE_CHANGE: if update.kind == veilid.VeilidUpdateKind.VALUE_CHANGE:
await value_change_queue.put(update) await value_change_queue.put(update)
async def null_update_callback(update: veilid.VeilidUpdate):
pass
try: try:
api = await veilid.api_connector(value_change_update_callback) api0 = await veilid.api_connector(value_change_update_callback, 0)
except veilid.VeilidConnectionError: except veilid.VeilidConnectionError:
pytest.skip("Unable to connect to veilid-server.") pytest.skip("Unable to connect to veilid-server 0.")
# Make two routing contexts, one with and one without safety try:
# So we can pretend to be a different node and get the watch updates api1 = await veilid.api_connector(null_update_callback, 1)
# Normally they would not get sent if the set comes from the same target except veilid.VeilidConnectionError:
# as the watch's target pytest.skip("Unable to connect to veilid-server 1.")
# XXX: this logic doesn't work because our node still suppresses updates async with api0, api1:
# XXX: if the value hasn't changed in the local record store # purge local and remote record stores to ensure we start fresh
rcWatch = await api.new_routing_context() await api0.debug("record purge local")
await api0.debug("record purge remote")
await api1.debug("record purge local")
await api1.debug("record purge remote")
rcSet = await (await api.new_routing_context()).with_safety(veilid.SafetySelection.unsafe()) # make routing contexts
async with rcWatch, rcSet: rc0 = await api0.new_routing_context()
# Make a DHT record rc1 = await api1.new_routing_context()
rec = await rcWatch.create_dht_record(veilid.DHTSchema.dflt(10)) async with rc0, rc1:
# Set some subkey we care about # Server 0: Make a DHT record
vd = await rcWatch.set_dht_value(rec.key, ValueSubkey(3), b"BLAH BLAH BLAH") rec0 = await rc0.create_dht_record(veilid.DHTSchema.dflt(10))
# Server 0: Set some subkey we care about
vd = await rc0.set_dht_value(rec0.key, ValueSubkey(3), b"BLAH")
assert vd is None assert vd is None
# Make a watch on that subkey # Server 0: Make a watch on all the subkeys
ts = await rcWatch.watch_dht_values(rec.key, [], 0, 0xFFFFFFFF) ts = await rc0.watch_dht_values(rec0.key, [], Timestamp(0), 0xFFFFFFFF)
assert ts != 0 assert ts != 0
# Reopen without closing to change routing context and not lose watch # Server 1: Open the subkey
rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair()) rec1 = await rc1.open_dht_record(rec0.key, rec0.owner_key_pair())
# Now set the subkey and trigger an update # Server 1: Now set the subkey and trigger an update
vd = await rcSet.set_dht_value(rec.key, ValueSubkey(3), b"BLAH") vd = await rc1.set_dht_value(rec1.key, ValueSubkey(3), b"BLAH")
assert vd is None assert vd is None
# Now we should NOT get an update because the update is the same as our local copy # Server 0: Now we should NOT get an update because the update is the same as our local copy
update = None update = None
try: try:
update = await asyncio.wait_for(value_change_queue.get(), timeout=5) update = await asyncio.wait_for(value_change_queue.get(), timeout=10)
except asyncio.TimeoutError: except asyncio.TimeoutError:
pass pass
assert update is None assert update is None
# Now set multiple subkeys and trigger an update # Server 1: Now set subkey and trigger an update
vd = await asyncio.gather(*[rcSet.set_dht_value(rec.key, ValueSubkey(3), b"BLAH BLAH"), rcSet.set_dht_value(rec.key, ValueSubkey(4), b"BZORT")]) vd = await rc1.set_dht_value(rec1.key, ValueSubkey(3), b"BLAH BLAH")
assert vd == [None, None] assert vd is None
# Wait for the update # Server 0: Wait for the update
upd = await asyncio.wait_for(value_change_queue.get(), timeout=5)
# Verify the update came back but we don't get a new value because the sequence number is the same
assert upd.detail.key == rec.key
assert upd.detail.count == 0xFFFFFFFD
assert upd.detail.subkeys == [(3, 4)]
assert upd.detail.value is None
# Reopen without closing to change routing context and not lose watch
rec = await rcWatch.open_dht_record(rec.key, rec.owner_key_pair())
# Cancel some subkeys we don't care about
still_active = await rcWatch.cancel_dht_watch(rec.key, [(ValueSubkey(0), ValueSubkey(2))])
assert still_active
# Reopen without closing to change routing context and not lose watch
rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair())
# Now set multiple subkeys and trigger an update
vd = await asyncio.gather(*[rcSet.set_dht_value(rec.key, ValueSubkey(3), b"BLAH BLAH BLAH"), rcSet.set_dht_value(rec.key, ValueSubkey(5), b"BZORT BZORT")])
assert vd == [None, None]
# Wait for the update, this longer timeout seems to help the flaky check below
upd = await asyncio.wait_for(value_change_queue.get(), timeout=10) upd = await asyncio.wait_for(value_change_queue.get(), timeout=10)
# Verify the update came back but we don't get a new value because the sequence number is the same # Server 0: Verify the update came back with the first changed subkey's data
assert upd.detail.key == rec.key assert upd.detail.key == rec0.key
assert upd.detail.count == 0xFFFFFFFE
assert upd.detail.subkeys == [(3, 3)]
assert upd.detail.value.data == b"BLAH BLAH"
# This check is flaky on slow connections and often fails with different counts # Server 1: Now set subkey and trigger an update
vd = await rc1.set_dht_value(rec1.key, ValueSubkey(4), b"BZORT")
assert vd is None
# Server 0: Wait for the update
upd = await asyncio.wait_for(value_change_queue.get(), timeout=10)
# Server 0: Verify the update came back with the first changed subkey's data
assert upd.detail.key == rec0.key
assert upd.detail.count == 0xFFFFFFFD
assert upd.detail.subkeys == [(4, 4)]
assert upd.detail.value.data == b"BZORT"
# Server 0: Cancel some subkeys we don't care about
still_active = await rc0.cancel_dht_watch(rec0.key, [(ValueSubkey(0), ValueSubkey(3))])
assert still_active
# Server 1: Now set multiple subkeys and trigger an update
vd = await asyncio.gather(*[rc1.set_dht_value(rec1.key, ValueSubkey(3), b"BLAH BLAH BLAH"), rc1.set_dht_value(rec1.key, ValueSubkey(4), b"BZORT BZORT")])
assert vd == [None, None]
# Server 0: Wait for the update
upd = await asyncio.wait_for(value_change_queue.get(), timeout=10)
# Server 0: Verify only one update came back
assert upd.detail.key == rec0.key
assert upd.detail.count == 0xFFFFFFFC assert upd.detail.count == 0xFFFFFFFC
assert upd.detail.subkeys == [(3, 3), (5, 5)] assert upd.detail.subkeys == [(4, 4)]
assert upd.detail.value is None assert upd.detail.value.data == b"BZORT BZORT"
# Reopen without closing to change routing context and not lose watch # Server 0: Now we should NOT get any other update
rec = await rcWatch.open_dht_record(rec.key, rec.owner_key_pair()) update = None
try:
update = await asyncio.wait_for(value_change_queue.get(), timeout=10)
except asyncio.TimeoutError:
pass
assert update is None
# Now cancel the update # Now cancel the update
still_active = await rcWatch.cancel_dht_watch(rec.key, [(ValueSubkey(3), ValueSubkey(9))]) still_active = await rc0.cancel_dht_watch(rec0.key, [(ValueSubkey(3), ValueSubkey(9))])
assert not still_active assert not still_active
# Reopen without closing to change routing context and not lose watch
rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair())
# Now set multiple subkeys # Now set multiple subkeys
vd = await asyncio.gather(*[rcSet.set_dht_value(rec.key, ValueSubkey(3), b"BLAH BLAH BLAH BLAH"), rcSet.set_dht_value(rec.key, ValueSubkey(5), b"BZORT BZORT BZORT")]) vd = await asyncio.gather(*[rc1.set_dht_value(rec1.key, ValueSubkey(3), b"BLAH BLAH BLAH BLAH"), rc1.set_dht_value(rec1.key, ValueSubkey(5), b"BZORT BZORT BZORT")])
assert vd == [None, None] assert vd == [None, None]
# Now we should NOT get an update # Now we should NOT get an update
update = None update = None
try: try:
update = await asyncio.wait_for(value_change_queue.get(), timeout=5) update = await asyncio.wait_for(value_change_queue.get(), timeout=10)
except asyncio.TimeoutError: except asyncio.TimeoutError:
pass pass
assert update is None assert update is None
# Clean up # Clean up
await rcSet.close_dht_record(rec.key) await rc1.close_dht_record(rec1.key)
await rcSet.delete_dht_record(rec.key) await rc1.delete_dht_record(rec1.key)
await rc0.close_dht_record(rec0.key)
await rc0.delete_dht_record(rec0.key)
@pytest.mark.asyncio @pytest.mark.asyncio
@ -486,8 +503,6 @@ async def test_schema_limit_smpl(api_connection: veilid.VeilidAPI):
@pytest.mark.skipif(os.getenv("INTEGRATION") != "1", reason="integration test requires two servers running") @pytest.mark.skipif(os.getenv("INTEGRATION") != "1", reason="integration test requires two servers running")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_dht_integration_writer_reader(): async def test_dht_integration_writer_reader():

View file

@ -93,7 +93,7 @@ class RoutingContext(ABC):
self, self,
key: types.TypedKey, key: types.TypedKey,
subkeys: list[tuple[types.ValueSubkey, types.ValueSubkey]], subkeys: list[tuple[types.ValueSubkey, types.ValueSubkey]],
expiration: types.Timestamp = 0, expiration: types.Timestamp = types.Timestamp(0),
count: int = 0xFFFFFFFF, count: int = 0xFFFFFFFF,
) -> types.Timestamp: ) -> types.Timestamp:
pass pass

View file

@ -741,7 +741,7 @@ class _JsonRoutingContext(RoutingContext):
self, self,
key: TypedKey, key: TypedKey,
subkeys: list[tuple[ValueSubkey, ValueSubkey]], subkeys: list[tuple[ValueSubkey, ValueSubkey]],
expiration: Timestamp = 0, expiration: Timestamp = Timestamp(0),
count: int = 0xFFFFFFFF, count: int = 0xFFFFFFFF,
) -> Timestamp: ) -> Timestamp:
assert isinstance(key, TypedKey) assert isinstance(key, TypedKey)