mirror of
https://gitlab.com/veilid/veilid.git
synced 2024-12-13 01:34:27 -05:00
207 lines
7.3 KiB
Python
207 lines
7.3 KiB
Python
# Routing context veilid tests
|
|
|
|
import veilid
|
|
import pytest
|
|
import asyncio
|
|
import json
|
|
from . import *
|
|
|
|
##################################################################
|
|
BOGUS_KEY = veilid.TypedKey.from_value(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.PublicKey.from_bytes(b' '))
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_dht_value_unopened(api_connection: veilid.VeilidAPI):
|
|
rc = await api_connection.new_routing_context()
|
|
async with rc:
|
|
with pytest.raises(veilid.VeilidAPIError):
|
|
out = await rc.get_dht_value(BOGUS_KEY, veilid.ValueSubkey(0), False)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_open_dht_record_nonexistent_no_writer(api_connection: veilid.VeilidAPI):
|
|
rc = await api_connection.new_routing_context()
|
|
async with rc:
|
|
with pytest.raises(veilid.VeilidAPIError):
|
|
out = await rc.open_dht_record(BOGUS_KEY, None)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_close_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
|
|
rc = await api_connection.new_routing_context()
|
|
async with rc:
|
|
with pytest.raises(veilid.VeilidAPIError):
|
|
await rc.close_dht_record(BOGUS_KEY)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_delete_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
|
|
rc = await api_connection.new_routing_context()
|
|
async with rc:
|
|
with pytest.raises(veilid.VeilidAPIError):
|
|
await rc.delete_dht_record(BOGUS_KEY)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_delete_dht_record_simple(api_connection: veilid.VeilidAPI):
|
|
rc = await api_connection.new_routing_context()
|
|
async with rc:
|
|
rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
|
|
await rc.close_dht_record(rec.key)
|
|
await rc.delete_dht_record(rec.key)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_dht_value_nonexistent(api_connection: veilid.VeilidAPI):
|
|
rc = await api_connection.new_routing_context()
|
|
async with rc:
|
|
rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
|
|
assert await rc.get_dht_value(rec.key, 0, False) == None
|
|
await rc.close_dht_record(rec.key)
|
|
await rc.delete_dht_record(rec.key)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_set_get_dht_value(api_connection: veilid.VeilidAPI):
|
|
rc = await api_connection.new_routing_context()
|
|
async with rc:
|
|
rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(2))
|
|
|
|
vd = await rc.set_dht_value(rec.key, 0, b"BLAH BLAH BLAH")
|
|
assert vd != None
|
|
|
|
vd2 = await rc.get_dht_value(rec.key, 0, False)
|
|
assert vd2 != None
|
|
|
|
vd3 = await rc.get_dht_value(rec.key, 0, True)
|
|
assert vd3 != None
|
|
|
|
vd4 = await rc.get_dht_value(rec.key, 1, False)
|
|
assert vd4 == None
|
|
|
|
print("vd: {}", vd.__dict__)
|
|
print("vd2: {}", vd2.__dict__)
|
|
print("vd3: {}", vd3.__dict__)
|
|
|
|
assert vd == vd2
|
|
assert vd2 == vd3
|
|
|
|
await rc.close_dht_record(rec.key)
|
|
await rc.delete_dht_record(rec.key)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_open_writer_dht_value(api_connection: veilid.VeilidAPI):
|
|
rc = await api_connection.new_routing_context()
|
|
async with rc:
|
|
rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(2))
|
|
key = rec.key
|
|
owner = rec.owner
|
|
secret = rec.owner_secret
|
|
print(f"key:{key}")
|
|
|
|
cs = await api_connection.get_crypto_system(veilid.CryptoKind.CRYPTO_KIND_VLD0)
|
|
async with cs:
|
|
assert await cs.validate_key_pair(owner, secret)
|
|
other_keypair = await cs.generate_key_pair()
|
|
|
|
va = b"Qwertyuiop Asdfghjkl Zxcvbnm"
|
|
vb = b"1234567890"
|
|
vc = b"!@#$%^&*()"
|
|
|
|
# Test subkey writes
|
|
vdtemp = await rc.set_dht_value(key, 1, va)
|
|
assert vdtemp != None
|
|
assert vdtemp.data == va
|
|
assert vdtemp.seq == 0
|
|
assert vdtemp.writer == owner
|
|
|
|
vdtemp = await rc.get_dht_value(key, 1, False)
|
|
assert vdtemp.data == va
|
|
assert vdtemp.seq == 0
|
|
assert vdtemp.writer == owner
|
|
|
|
vdtemp = await rc.get_dht_value(key, 0, False)
|
|
assert vdtemp == None
|
|
|
|
vdtemp = await rc.set_dht_value(key, 0, vb)
|
|
assert vdtemp.data == vb
|
|
assert vdtemp.seq == 0
|
|
|
|
vdtemp = await rc.get_dht_value(key, 0, True)
|
|
assert vdtemp.data == vb
|
|
|
|
vdtemp = await rc.get_dht_value(key, 1, True)
|
|
assert vdtemp.data == va
|
|
|
|
# Equal value should not trigger sequence number update
|
|
vdtemp = await rc.set_dht_value(key, 1, va)
|
|
assert vdtemp != None
|
|
assert vdtemp.data == va
|
|
assert vdtemp.seq == 0
|
|
assert vdtemp.writer == owner
|
|
|
|
# Different value should trigger sequence number update
|
|
vdtemp = await rc.set_dht_value(key, 1, vb)
|
|
assert vdtemp != None
|
|
assert vdtemp.data == vb
|
|
assert vdtemp.seq == 1
|
|
assert vdtemp.writer == owner
|
|
|
|
# Now that we initialized some subkeys
|
|
# and verified they stored correctly
|
|
# Delete things locally and reopen and see if we can write
|
|
# with the same writer key
|
|
|
|
await rc.close_dht_record(key)
|
|
await rc.delete_dht_record(key)
|
|
|
|
rec = await rc.open_dht_record(key, veilid.KeyPair.from_parts(owner, secret))
|
|
assert rec != None
|
|
assert rec.key == key
|
|
assert rec.owner == owner
|
|
assert rec.owner_secret == secret
|
|
assert rec.schema.kind == veilid.DHTSchemaKind.DFLT
|
|
assert rec.schema.o_cnt == 2
|
|
|
|
# Verify subkey 1 can be set before it is get but newer is available online
|
|
vdtemp = await rc.set_dht_value(key, 1, vc)
|
|
assert vdtemp != None
|
|
assert vdtemp.data == vb
|
|
assert vdtemp.seq == 1
|
|
assert vdtemp.writer == owner
|
|
|
|
# Verify subkey 1 can be set a second time and it updates because seq is newer
|
|
vdtemp = await rc.set_dht_value(key, 1, vc)
|
|
assert vdtemp != None
|
|
assert vdtemp.data == vc
|
|
assert vdtemp.seq == 2
|
|
assert vdtemp.writer == owner
|
|
|
|
# Verify the network got the subkey update with a refresh check
|
|
vdtemp = await rc.get_dht_value(key, 1, True)
|
|
assert vdtemp != None
|
|
assert vdtemp.data == vc
|
|
assert vdtemp.seq == 2
|
|
assert vdtemp.writer == owner
|
|
|
|
# Delete things locally and reopen and see if we can write
|
|
# with a different writer key (should fail)
|
|
|
|
await rc.close_dht_record(key)
|
|
await rc.delete_dht_record(key)
|
|
|
|
rec = await rc.open_dht_record(key, other_keypair)
|
|
assert rec != None
|
|
assert rec.key == key
|
|
assert rec.owner == owner
|
|
assert rec.owner_secret == None
|
|
assert rec.schema.kind == veilid.DHTSchemaKind.DFLT
|
|
assert rec.schema.o_cnt == 2
|
|
|
|
# Verify subkey 1 can NOT be set because we have the wrong writer
|
|
with pytest.raises(veilid.VeilidAPIError):
|
|
vdtemp = await rc.set_dht_value(key, 1, va)
|
|
|
|
# Verify subkey 0 can NOT be set because we have the wrong writer
|
|
with pytest.raises(veilid.VeilidAPIError):
|
|
vdtemp = await rc.set_dht_value(key, 0, va)
|
|
|
|
# Clean up
|
|
await rc.close_dht_record(key)
|
|
await rc.delete_dht_record(key)
|
|
|