more python test cleanup and fixes

This commit is contained in:
Christien Rioux 2024-03-18 14:16:03 -04:00
parent 800348451e
commit c468d9c850
5 changed files with 132 additions and 27 deletions

View File

@ -335,8 +335,10 @@ Future<void> testWatchDHTValues(Stream<VeilidUpdate> updateStream) async {
// Reopen without closing to change routing context and not lose watch // Reopen without closing to change routing context and not lose watch
rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair()); rec = await rcSet.openDHTRecord(rec.key, writer: rec.ownerKeyPair());
// Set the value without a watch
expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH")), isNull); expect(await rcSet.setDHTValue(rec.key, 3, utf8.encode("BLAH")), isNull);
// Now we should NOT get an update
if (await valueChangeQueueIterator if (await valueChangeQueueIterator
.moveNext() .moveNext()
.timeout(const Duration(seconds: 5), onTimeout: () { .timeout(const Duration(seconds: 5), onTimeout: () {
@ -345,6 +347,7 @@ Future<void> testWatchDHTValues(Stream<VeilidUpdate> updateStream) async {
fail("should not have a change"); fail("should not have a change");
} }
// Clean up
await rcSet.closeDHTRecord(rec.key); await rcSet.closeDHTRecord(rec.key);
await rcSet.deleteDHTRecord(rec.key); await rcSet.deleteDHTRecord(rec.key);
} finally { } finally {

View File

@ -5,6 +5,7 @@ import pytest
import asyncio import asyncio
import json import json
from . import * from . import *
from .api import VeilidTestConnectionError, api_connector
################################################################## ##################################################################
BOGUS_KEY = veilid.TypedKey.from_value( BOGUS_KEY = veilid.TypedKey.from_value(
@ -13,7 +14,9 @@ BOGUS_KEY = veilid.TypedKey.from_value(
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_dht_value_unopened(api_connection: veilid.VeilidAPI): async def test_get_dht_value_unopened(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context() rc = await (await api_connection.new_routing_context()).with_sequencing(
veilid.Sequencing.ENSURE_ORDERED
)
async with rc: async with rc:
with pytest.raises(veilid.VeilidAPIError): with pytest.raises(veilid.VeilidAPIError):
out = await rc.get_dht_value(BOGUS_KEY, veilid.ValueSubkey(0), False) out = await rc.get_dht_value(BOGUS_KEY, veilid.ValueSubkey(0), False)
@ -21,7 +24,9 @@ async def test_get_dht_value_unopened(api_connection: veilid.VeilidAPI):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_open_dht_record_nonexistent_no_writer(api_connection: veilid.VeilidAPI): async def test_open_dht_record_nonexistent_no_writer(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context() rc = await (await api_connection.new_routing_context()).with_sequencing(
veilid.Sequencing.ENSURE_ORDERED
)
async with rc: async with rc:
with pytest.raises(veilid.VeilidAPIError): with pytest.raises(veilid.VeilidAPIError):
out = await rc.open_dht_record(BOGUS_KEY, None) out = await rc.open_dht_record(BOGUS_KEY, None)
@ -29,7 +34,9 @@ async def test_open_dht_record_nonexistent_no_writer(api_connection: veilid.Veil
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_close_dht_record_nonexistent(api_connection: veilid.VeilidAPI): async def test_close_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context() rc = await (await api_connection.new_routing_context()).with_sequencing(
veilid.Sequencing.ENSURE_ORDERED
)
async with rc: async with rc:
with pytest.raises(veilid.VeilidAPIError): with pytest.raises(veilid.VeilidAPIError):
await rc.close_dht_record(BOGUS_KEY) await rc.close_dht_record(BOGUS_KEY)
@ -37,7 +44,9 @@ async def test_close_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_delete_dht_record_nonexistent(api_connection: veilid.VeilidAPI): async def test_delete_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context() rc = await (await api_connection.new_routing_context()).with_sequencing(
veilid.Sequencing.ENSURE_ORDERED
)
async with rc: async with rc:
with pytest.raises(veilid.VeilidAPIError): with pytest.raises(veilid.VeilidAPIError):
await rc.delete_dht_record(BOGUS_KEY) await rc.delete_dht_record(BOGUS_KEY)
@ -45,7 +54,9 @@ async def test_delete_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_create_delete_dht_record_simple(api_connection: veilid.VeilidAPI): async def test_create_delete_dht_record_simple(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context() rc = await (await api_connection.new_routing_context()).with_sequencing(
veilid.Sequencing.ENSURE_ORDERED
)
async with rc: async with rc:
rec = await rc.create_dht_record( rec = await rc.create_dht_record(
veilid.DHTSchema.dflt(1), veilid.CryptoKind.CRYPTO_KIND_VLD0 veilid.DHTSchema.dflt(1), veilid.CryptoKind.CRYPTO_KIND_VLD0
@ -56,7 +67,9 @@ async def test_create_delete_dht_record_simple(api_connection: veilid.VeilidAPI)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_dht_value_nonexistent(api_connection: veilid.VeilidAPI): async def test_get_dht_value_nonexistent(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context() rc = await (await api_connection.new_routing_context()).with_sequencing(
veilid.Sequencing.ENSURE_ORDERED
)
async with rc: async with rc:
rec = await rc.create_dht_record(veilid.DHTSchema.dflt(1)) rec = await rc.create_dht_record(veilid.DHTSchema.dflt(1))
assert await rc.get_dht_value(rec.key, 0, False) == None assert await rc.get_dht_value(rec.key, 0, False) == None
@ -66,7 +79,9 @@ async def test_get_dht_value_nonexistent(api_connection: veilid.VeilidAPI):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_set_get_dht_value(api_connection: veilid.VeilidAPI): async def test_set_get_dht_value(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context() rc = await (await api_connection.new_routing_context()).with_sequencing(
veilid.Sequencing.ENSURE_ORDERED
)
async with rc: async with rc:
rec = await rc.create_dht_record(veilid.DHTSchema.dflt(2)) rec = await rc.create_dht_record(veilid.DHTSchema.dflt(2))
@ -93,7 +108,9 @@ async def test_set_get_dht_value(api_connection: veilid.VeilidAPI):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_open_writer_dht_value(api_connection: veilid.VeilidAPI): async def test_open_writer_dht_value(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context() rc = await (await api_connection.new_routing_context()).with_sequencing(
veilid.Sequencing.ENSURE_ORDERED
)
async with rc: async with rc:
rec = await rc.create_dht_record(veilid.DHTSchema.dflt(2)) rec = await rc.create_dht_record(veilid.DHTSchema.dflt(2))
key = rec.key key = rec.key
@ -204,38 +221,116 @@ async def test_open_writer_dht_value(api_connection: veilid.VeilidAPI):
await rc.delete_dht_record(key) await rc.delete_dht_record(key)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_watch_dht_values(api_connection: veilid.VeilidAPI): async def test_watch_dht_values():
rc = await api_connection.new_routing_context()
async with rc:
rec = await rc.create_dht_record(veilid.DHTSchema.dflt(10))
vd = await rc.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH") value_change_queue: asyncio.Queue[veilid.VeilidUpdate] = asyncio.Queue()
async def value_change_update_callback(update: veilid.VeilidUpdate):
if update.kind == veilid.VeilidUpdateKind.VALUE_CHANGE:
await value_change_queue.put(update)
try:
api = await api_connector(value_change_update_callback)
except VeilidTestConnectionError:
pytest.skip("Unable to connect to veilid-server.")
return
# Make two routing contexts, one with and one without safety
# So we can pretend to be a different node and get the watch updates
# Normally they would not get sent if the set comes from the same target
# as the watch's target
rcWatch = await (await api.new_routing_context()).with_sequencing(
veilid.Sequencing.ENSURE_ORDERED
)
rcSet = await (await api.new_routing_context()).with_safety(
veilid.SafetySelection.unsafe(veilid.Sequencing.ENSURE_ORDERED)
)
async with rcWatch, rcSet:
# Make a DHT record
rec = await rcWatch.create_dht_record(veilid.DHTSchema.dflt(10))
# Set some subkey we care about
vd = await rcWatch.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH")
assert vd == None assert vd == None
ts = await rc.watch_dht_values(rec.key, [], 0, 0xFFFFFFFF) # Make a watch on that subkey
ts = await rcWatch.watch_dht_values(rec.key, [], 0, 0xFFFFFFFF)
assert ts != 0 assert ts != 0
vd = await rc.set_dht_value(rec.key, 3, b"BLAH") # Reopen without closing to change routing context and not lose watch
rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair())
# Now set the subkey and trigger an update
vd = await rcSet.set_dht_value(rec.key, 3, b"BLAH")
assert vd == None assert vd == None
still_active = await rc.cancel_dht_watch(rec.key, [(0, 2)]) # Wait for the update
upd = await asyncio.wait_for(value_change_queue.get(), timeout=5)
# Verify the update
assert upd.detail.key == rec.key
assert upd.detail.count == 0xFFFFFFFE
assert upd.detail.subkeys == [(3,3)]
assert upd.detail.value.seq == 1
assert upd.detail.value.data == b"BLAH"
assert upd.detail.value.writer == rec.owner
# Reopen without closing to change routing context and not lose watch
rec = await rcWatch.open_dht_record(rec.key, rec.owner_key_pair())
# Cancel some subkeys we don't care about
still_active = await rcWatch.cancel_dht_watch(rec.key, [(0, 2)])
assert still_active == True assert still_active == True
vd = await rc.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH") # Reopen without closing to change routing context and not lose watch
rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair())
# Change our subkey
vd = await rcSet.set_dht_value(rec.key, 3, b"BLAH BLAH BLAH")
assert vd == None assert vd == None
still_active = await rc.cancel_dht_watch(rec.key, [(3, 9)]) # Wait for the update
upd = await asyncio.wait_for(value_change_queue.get(), timeout=5)
# Verify the update
assert upd.detail.key == rec.key
assert upd.detail.count == 0xFFFFFFFD
assert upd.detail.subkeys == [(3,3)]
assert upd.detail.value.seq == 2
assert upd.detail.value.data == b"BLAH BLAH BLAH"
assert upd.detail.value.writer == rec.owner
# Reopen without closing to change routing context and not lose watch
rec = await rcWatch.open_dht_record(rec.key, rec.owner_key_pair())
# Now cancel the update
still_active = await rcWatch.cancel_dht_watch(rec.key, [(3, 9)])
assert still_active == False assert still_active == False
vd = await rc.set_dht_value(rec.key, 3, b"BLAH") # Reopen without closing to change routing context and not lose watch
rec = await rcSet.open_dht_record(rec.key, rec.owner_key_pair())
# Set the value without a watch
vd = await rcSet.set_dht_value(rec.key, 3, b"BLAH")
assert vd == None assert vd == None
await rc.close_dht_record(rec.key) # Now we should NOT get an update
await rc.delete_dht_record(rec.key) update = None
try:
update = await asyncio.wait_for(value_change_queue.get(), timeout=5)
except asyncio.TimeoutError:
pass
assert update == None
# Clean up
await rcSet.close_dht_record(rec.key)
await rcSet.delete_dht_record(rec.key)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_inspect_dht_record(api_connection: veilid.VeilidAPI): async def test_inspect_dht_record(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context() rc = await (await api_connection.new_routing_context()).with_sequencing(
veilid.Sequencing.ENSURE_ORDERED
)
async with rc: async with rc:
rec = await rc.create_dht_record(veilid.DHTSchema.dflt(2)) rec = await rc.create_dht_record(veilid.DHTSchema.dflt(2))

View File

@ -111,8 +111,12 @@ class _JsonVeilidAPI(VeilidAPI):
try: try:
self.reader = None self.reader = None
assert self.writer is not None assert self.writer is not None
try:
self.writer.close() self.writer.close()
await self.writer.wait_closed() await self.writer.wait_closed()
except:
# Already closed
pass
self.writer = None self.writer = None
for reqid, reqfuture in self.in_flight_requests.items(): for reqid, reqfuture in self.in_flight_requests.items():

View File

@ -356,11 +356,11 @@ class VeilidRouteChange:
class VeilidValueChange: class VeilidValueChange:
key: TypedKey key: TypedKey
subkeys: list[ValueSubkey] subkeys: list[tuple[ValueSubkey, ValueSubkey]]
count: int count: int
value: ValueData value: ValueData
def __init__(self, key: TypedKey, subkeys: list[ValueSubkey], count: int, value: ValueData): def __init__(self, key: TypedKey, subkeys: list[tuple[ValueSubkey, ValueSubkey]], count: int, value: ValueData):
self.key = key self.key = key
self.subkeys = subkeys self.subkeys = subkeys
self.count = count self.count = count
@ -371,7 +371,7 @@ class VeilidValueChange:
"""JSON object hook""" """JSON object hook"""
return cls( return cls(
TypedKey(j["key"]), TypedKey(j["key"]),
[ValueSubkey(key) for key in j["subkeys"]], [(p[0], p[1]) for p in j["subkeys"]],
j["count"], j["count"],
ValueData.from_json(j["value"]), ValueData.from_json(j["value"]),
) )

View File

@ -363,6 +363,9 @@ class DHTRecordDescriptor:
def __repr__(self) -> str: def __repr__(self) -> str:
return f"<{self.__class__.__name__}(key={self.key!r}, owner={self.owner!r}, owner_secret={self.owner_secret!r}, schema={self.schema!r})>" return f"<{self.__class__.__name__}(key={self.key!r}, owner={self.owner!r}, owner_secret={self.owner_secret!r}, schema={self.schema!r})>"
def owner_key_pair(self) -> Optional[KeyPair]:
return KeyPair.from_parts(self.owner, self.owner_secret)
@classmethod @classmethod
def from_json(cls, j: dict) -> Self: def from_json(cls, j: dict) -> Self:
return cls( return cls(