[ci skip] fanout revamp

This commit is contained in:
Christien Rioux 2025-03-26 21:42:36 -04:00
parent 2f70f8382f
commit 3e38e48d0e
139 changed files with 1908020 additions and 615 deletions

View file

@ -7,7 +7,7 @@ import time
import os
import veilid
from veilid import ValueSubkey, Timestamp
from veilid import ValueSubkey, Timestamp, SafetySelection
##################################################################
BOGUS_KEY = veilid.TypedKey.from_value(
@ -380,6 +380,106 @@ async def test_watch_dht_values():
await rc0.delete_dht_record(rec0.key)
@pytest.mark.skipif(os.getenv("INTEGRATION") != "1", reason="integration test requires two servers running")
@pytest.mark.skipif(os.getenv("STRESS") != "1", reason="stress test takes a long time")
@pytest.mark.asyncio
async def test_watch_many_dht_values():
value_change_queue: asyncio.Queue[veilid.VeilidUpdate] = asyncio.Queue()
async def value_change_update_callback(update: veilid.VeilidUpdate):
if update.kind == veilid.VeilidUpdateKind.VALUE_CHANGE:
await value_change_queue.put(update)
async def null_update_callback(update: veilid.VeilidUpdate):
pass
try:
api0 = await veilid.api_connector(value_change_update_callback, 0)
except veilid.VeilidConnectionError:
pytest.skip("Unable to connect to veilid-server 0.")
try:
api1 = await veilid.api_connector(null_update_callback, 1)
except veilid.VeilidConnectionError:
pytest.skip("Unable to connect to veilid-server 1.")
async with api0, api1:
# purge local and remote record stores to ensure we start fresh
await api0.debug("record purge local")
await api0.debug("record purge remote")
await api1.debug("record purge local")
await api1.debug("record purge remote")
# make routing contexts
# unsafe version for debugging
rc0 = await (await api0.new_routing_context()).with_safety(SafetySelection.unsafe())
rc1 = await (await api1.new_routing_context()).with_safety(SafetySelection.unsafe())
# safe default version
# rc0 = await api0.new_routing_context()
# rc1 = await api1.new_routing_context()
async with rc0, rc1:
COUNT = 10
records = []
# Make and watch all records
for n in range(COUNT):
print(f"making record {n}")
# Server 0: Make a DHT record
records.append(await rc0.create_dht_record(veilid.DHTSchema.dflt(1)))
# Server 0: Set some subkey we care about
vd = await rc0.set_dht_value(records[n].key, ValueSubkey(0), b"BLAH")
assert vd is None
# Server 0: Make a watch on all the subkeys
ts = await rc0.watch_dht_values(records[n].key, [], Timestamp(0), 0xFFFFFFFF)
assert ts != 0
# Open and set all records
missing_records = set()
for (n, record) in enumerate(records):
print(f"setting record {n}")
# Server 1: Open the subkey
_ignore = await rc1.open_dht_record(record.key, record.owner_key_pair())
# Server 1: Now set the subkey and trigger an update
vd = await rc1.set_dht_value(record.key, ValueSubkey(0), b"BLAH BLAH")
assert vd is None
missing_records.add(record.key)
# Server 0: Now we should get an update for every change
for n in range(len(records)):
print(f"waiting for change {n}")
# Server 0: Wait for the update
try:
upd = await asyncio.wait_for(value_change_queue.get(), timeout=10)
missing_records.remove(upd.detail.key)
except:
# Dump which records didn't get updates
for (m, record) in enumerate(records):
if record.key not in missing_records:
continue
print(f"missing update for record {m}: {record}")
info0 = await api0.debug(f"record info {record.key}")
info1 = await api1.debug(f"record info {record.key}")
print(f"from rc0: {info0}")
print(f"from rc1: {info1}")
raise
# Clean up
for record in records:
await rc1.close_dht_record(record.key)
await rc1.delete_dht_record(record.key)
await rc0.close_dht_record(record.key)
await rc0.delete_dht_record(record.key)
@pytest.mark.asyncio
async def test_inspect_dht_record(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()