Merge branch 'tek/pythonification' into 'main'

Mold the new Python code into more idiomatic shape

See merge request veilid/veilid!26
This commit is contained in:
John Smith 2023-06-17 22:57:20 +00:00
commit b8e5039251
12 changed files with 1738 additions and 1331 deletions

View File

@ -1,34 +0,0 @@
from typing import Callable, Awaitable
import os
import pytest
pytest_plugins = ('pytest_asyncio',)
import veilid
##################################################################
VEILID_SERVER = os.getenv("VEILID_SERVER")
if VEILID_SERVER is not None:
vsparts = VEILID_SERVER.split(":")
VEILID_SERVER = vsparts[0]
if len(vsparts) == 2:
VEILID_SERVER_PORT = int(vsparts[1])
else:
VEILID_SERVER_PORT = 5959
else:
VEILID_SERVER = "localhost"
VEILID_SERVER_PORT = 5959
##################################################################
async def simple_connect_and_run(func: Callable[[veilid.VeilidAPI], Awaitable]):
api = await veilid.json_api_connect(VEILID_SERVER, VEILID_SERVER_PORT, simple_update_callback)
async with api:
# purge routes to ensure we start fresh
await api.debug("purge routes")
await func(api)
async def simple_update_callback(update: veilid.VeilidUpdate):
print("VeilidUpdate: {}".format(update))

View File

@ -0,0 +1,37 @@
import os
from functools import cache
from typing import AsyncGenerator
import pytest_asyncio
import veilid
from veilid.json_api import _JsonVeilidAPI
pytest_plugins = ("pytest_asyncio",)
@cache
def server_info() -> tuple[str, int]:
"""Return the hostname and port of the test server."""
VEILID_SERVER = os.getenv("VEILID_SERVER")
if VEILID_SERVER is None:
return "localhost", 5959
hostname, *rest = VEILID_SERVER.split(":")
if rest:
return hostname, int(rest[0])
return hostname, 5959
async def simple_update_callback(update: veilid.VeilidUpdate):
print(f"VeilidUpdate: {update}")
@pytest_asyncio.fixture
async def api_connection() -> AsyncGenerator[_JsonVeilidAPI, None]:
hostname, port = server_info()
api = await veilid.json_api_connect(hostname, port, simple_update_callback)
async with api:
# purge routes to ensure we start fresh
await api.debug("purge routes")
yield api

View File

@ -1,39 +1,44 @@
# Basic veilid tests
import veilid
import socket
import pytest
from . import *
import veilid
##################################################################
@pytest.mark.asyncio
async def test_connect():
async def func(api: veilid.VeilidAPI):
pass
await simple_connect_and_run(func)
from .conftest import simple_update_callback
@pytest.mark.asyncio
async def test_get_node_id():
async def func(api: veilid.VeilidAPI):
# get our own node id
state = await api.get_state()
node_id = state.config.config.network.routing_table.node_id.pop()
await simple_connect_and_run(func)
async def test_connect(api_connection):
pass
@pytest.mark.asyncio
async def test_get_node_id(api_connection):
state = await api_connection.get_state()
node_ids = state.config.config.network.routing_table.node_id
assert len(node_ids) >= 1
for node_id in node_ids:
assert node_id[4] == ":"
@pytest.mark.asyncio
async def test_fail_connect():
with pytest.raises(Exception):
api = await veilid.json_api_connect("fuahwelifuh32luhwafluehawea", 1, simple_update_callback)
async with api:
pass
with pytest.raises(socket.gaierror) as exc:
await veilid.json_api_connect(
"fuahwelifuh32luhwafluehawea", 1, simple_update_callback
)
assert exc.value.errno == socket.EAI_NONAME
@pytest.mark.asyncio
async def test_version():
async def func(api: veilid.VeilidAPI):
v = await api.veilid_version()
print("veilid_version: {}".format(v.__dict__))
vstr = await api.veilid_version_string()
print("veilid_version_string: {}".format(vstr))
await simple_connect_and_run(func)
async def test_version(api_connection):
v = await api_connection.veilid_version()
print(f"veilid_version: {v.__dict__}")
assert v.__dict__.keys() >= {"_major", "_minor", "_patch"}
vstr = await api_connection.veilid_version_string()
print(f"veilid_version_string: {vstr}")

View File

@ -1,42 +1,49 @@
# Crypto veilid tests
import veilid
import pytest
from . import *
import veilid
from veilid.api import CryptoSystem
##################################################################
@pytest.mark.asyncio
async def test_best_crypto_system():
async def func(api: veilid.VeilidAPI):
bcs = await api.best_crypto_system()
await simple_connect_and_run(func)
@pytest.mark.asyncio
async def test_get_crypto_system():
async def func(api: veilid.VeilidAPI):
cs = await api.get_crypto_system(veilid.CryptoKind.CRYPTO_KIND_VLD0)
# clean up handle early
del cs
await simple_connect_and_run(func)
@pytest.mark.asyncio
async def test_get_crypto_system_invalid():
async def func(api: veilid.VeilidAPI):
with pytest.raises(veilid.VeilidAPIError):
cs = await api.get_crypto_system(veilid.CryptoKind.CRYPTO_KIND_NONE)
await simple_connect_and_run(func)
async def test_best_crypto_system(api_connection):
bcs: CryptoSystem = await api_connection.best_crypto_system()
assert await bcs.default_salt_length() == 16
@pytest.mark.asyncio
async def test_hash_and_verify_password():
async def func(api: veilid.VeilidAPI):
bcs = await api.best_crypto_system()
nonce = await bcs.random_nonce()
salt = nonce.to_bytes()
# Password match
phash = await bcs.hash_password(b"abc123", salt)
assert await bcs.verify_password(b"abc123", phash)
# Password mismatch
phash2 = await bcs.hash_password(b"abc1234", salt)
assert not await bcs.verify_password(b"abc12345", phash)
await simple_connect_and_run(func)
async def test_get_crypto_system(api_connection):
cs: CryptoSystem = await api_connection.get_crypto_system(
veilid.CryptoKind.CRYPTO_KIND_VLD0
)
assert await cs.default_salt_length() == 16
# clean up handle early
del cs
@pytest.mark.asyncio
async def test_get_crypto_system_invalid(api_connection):
with pytest.raises(veilid.VeilidAPIErrorInvalidArgument) as exc:
await api_connection.get_crypto_system(veilid.CryptoKind.CRYPTO_KIND_NONE)
assert exc.value.context == "unsupported cryptosystem"
assert exc.value.argument == "kind"
assert exc.value.value == "NONE"
@pytest.mark.asyncio
async def test_hash_and_verify_password(api_connection):
bcs = await api_connection.best_crypto_system()
nonce = await bcs.random_nonce()
salt = nonce.to_bytes()
# Password match
phash = await bcs.hash_password(b"abc123", salt)
assert await bcs.verify_password(b"abc123", phash)
# Password mismatch
phash2 = await bcs.hash_password(b"abc1234", salt)
assert not await bcs.verify_password(b"abc12345", phash)

View File

@ -1,37 +1,41 @@
# Routing context veilid tests
import veilid
import pytest
import asyncio
import json
from . import *
import pytest
import veilid
from veilid.types import OperationId
from .conftest import server_info
##################################################################
@pytest.mark.asyncio
async def test_routing_contexts():
async def func(api: veilid.VeilidAPI):
rc = await api.new_routing_context()
rcp = await rc.with_privacy()
rcps = await rcp.with_sequencing(veilid.Sequencing.ENSURE_ORDERED)
rcpsr = await rcps.with_custom_privacy(veilid.Stability.RELIABLE)
await simple_connect_and_run(func)
async def test_routing_contexts(api_connection):
rc = await api_connection.new_routing_context()
rcp = await rc.with_privacy()
rcps = await rcp.with_sequencing(veilid.Sequencing.ENSURE_ORDERED)
await rcps.with_custom_privacy(veilid.Stability.RELIABLE)
@pytest.mark.asyncio
async def test_routing_context_app_message_loopback():
app_message_queue = asyncio.Queue()
# Seriously, mypy?
app_message_queue: asyncio.Queue = asyncio.Queue()
async def app_message_queue_update_callback(update: veilid.VeilidUpdate):
if update.kind == veilid.VeilidUpdateKind.APP_MESSAGE:
await app_message_queue.put(update)
api = await veilid.json_api_connect(VEILID_SERVER, VEILID_SERVER_PORT, app_message_queue_update_callback)
hostname, port = server_info()
api = await veilid.json_api_connect(
hostname, port, app_message_queue_update_callback
)
async with api:
# purge routes to ensure we start fresh
await api.debug("purge routes")
# make a routing context that uses a safety route
rc = await (await api.new_routing_context()).with_privacy()
@ -40,29 +44,31 @@ async def test_routing_context_app_message_loopback():
# import it as a remote route as well so we can send to it
prr = await api.import_remote_private_route(blob)
# send an app message to our own private route
message = b"abcd1234"
await rc.app_message(prr, message)
# we should get the same message back
update: veilid.VeilidUpdate = await asyncio.wait_for(app_message_queue.get(), timeout=10)
appmsg: veilid.VeilidAppMessage = update.detail
assert appmsg.message == message
update: veilid.VeilidUpdate = await asyncio.wait_for(
app_message_queue.get(), timeout=10
)
assert isinstance(update.detail, veilid.VeilidAppMessage)
assert update.detail.message == message
@pytest.mark.asyncio
async def test_routing_context_app_call_loopback():
app_call_queue = asyncio.Queue()
app_call_queue: asyncio.Queue = asyncio.Queue()
async def app_call_queue_update_callback(update: veilid.VeilidUpdate):
if update.kind == veilid.VeilidUpdateKind.APP_CALL:
await app_call_queue.put(update)
api = await veilid.json_api_connect(VEILID_SERVER, VEILID_SERVER_PORT, app_call_queue_update_callback)
hostname, port = server_info()
api = await veilid.json_api_connect(hostname, port, app_call_queue_update_callback)
async with api:
# purge routes to ensure we start fresh
await api.debug("purge routes")
@ -74,19 +80,30 @@ async def test_routing_context_app_call_loopback():
# import it as a remote route as well so we can send to it
prr = await api.import_remote_private_route(blob)
# send an app message to our own private route
request = b"abcd1234"
app_call_task = asyncio.create_task(rc.app_call(prr, request), name = "app call task")
app_call_task = asyncio.create_task(
rc.app_call(prr, request), name="app call task"
)
# we should get the same request back
update: veilid.VeilidUpdate = await asyncio.wait_for(app_call_queue.get(), timeout=10)
appcall: veilid.VeilidAppCall = update.detail
update: veilid.VeilidUpdate = await asyncio.wait_for(
app_call_queue.get(), timeout=10
)
appcall = update.detail
assert isinstance(appcall, veilid.VeilidAppCall)
assert appcall.message == request
# now we reply to the request
reply = b"qwer5678"
await api.app_call_reply(appcall.call_id, reply)
# TK: OperationId use to be a subclass of `int`. When I wrapped `appcall.call_id` in int(),
# this failed JSON schema validation, which defines `call_id` as a string. Maybe that was a
# typo, and OperationId is really *supposed* to be a str? Alternatively, perhaps the
# signature of `app_call_reply` is wrong and it's supposed to take a type other than
# OperationId?
await api.app_call_reply(OperationId(appcall.call_id), reply)
# now we should get the reply from the call
result = await app_call_task

View File

@ -2,5 +2,5 @@ from .api import *
from .config import *
from .error import *
from .json_api import *
from .error import *
from .state import *
from .types import *

View File

@ -1,211 +1,331 @@
from abc import ABC, abstractmethod
from typing import Self
from typing import Optional, Self
from . import types
from .state import VeilidState
from .state import *
from .config import *
from .error import *
from .types import *
class RoutingContext(ABC):
@abstractmethod
@abstractmethod
async def with_privacy(self) -> Self:
pass
@abstractmethod
async def with_custom_privacy(self, stability: Stability) -> Self:
@abstractmethod
async def with_custom_privacy(self, stability: types.Stability) -> Self:
pass
@abstractmethod
async def with_sequencing(self, sequencing: Sequencing) -> Self:
@abstractmethod
async def with_sequencing(self, sequencing: types.Sequencing) -> Self:
pass
@abstractmethod
async def app_call(self, target: TypedKey | RouteId, request: bytes) -> bytes:
@abstractmethod
async def app_call(
self, target: types.TypedKey | types.RouteId, request: bytes
) -> bytes:
pass
@abstractmethod
async def app_message(self, target: TypedKey | RouteId, message: bytes):
@abstractmethod
async def app_message(self, target: types.TypedKey | types.RouteId, message: bytes):
pass
@abstractmethod
async def create_dht_record(self, kind: CryptoKind, schema: DHTSchema) -> DHTRecordDescriptor:
@abstractmethod
async def create_dht_record(
self, kind: types.CryptoKind, schema: types.DHTSchema
) -> types.DHTRecordDescriptor:
pass
@abstractmethod
async def open_dht_record(self, key: TypedKey, writer: Optional[KeyPair]) -> DHTRecordDescriptor:
@abstractmethod
async def open_dht_record(
self, key: types.TypedKey, writer: Optional[types.KeyPair]
) -> types.DHTRecordDescriptor:
pass
@abstractmethod
async def close_dht_record(self, key: TypedKey):
@abstractmethod
async def close_dht_record(self, key: types.TypedKey):
pass
@abstractmethod
async def delete_dht_record(self, key: TypedKey):
@abstractmethod
async def delete_dht_record(self, key: types.TypedKey):
pass
@abstractmethod
async def get_dht_value(self, key: TypedKey, subkey: ValueSubkey, force_refresh: bool) -> Optional[ValueData]:
@abstractmethod
async def get_dht_value(
self, key: types.TypedKey, subkey: types.ValueSubkey, force_refresh: bool
) -> Optional[types.ValueData]:
pass
@abstractmethod
async def set_dht_value(self, key: TypedKey, subkey: ValueSubkey, data: bytes) -> Optional[ValueData]:
@abstractmethod
async def set_dht_value(
self, key: types.TypedKey, subkey: types.ValueSubkey, data: bytes
) -> Optional[types.ValueData]:
pass
@abstractmethod
async def watch_dht_values(self, key: TypedKey, subkeys: list[(ValueSubkey, ValueSubkey)], expiration: Timestamp, count: int) -> Timestamp:
@abstractmethod
async def watch_dht_values(
self,
key: types.TypedKey,
subkeys: list[tuple[types.ValueSubkey, types.ValueSubkey]],
expiration: types.Timestamp,
count: int,
) -> types.Timestamp:
pass
@abstractmethod
async def cancel_dht_watch(self, key: TypedKey, subkeys: list[(ValueSubkey, ValueSubkey)]) -> bool:
@abstractmethod
async def cancel_dht_watch(
self,
key: types.TypedKey,
subkeys: list[tuple[types.ValueSubkey, types.ValueSubkey]],
) -> bool:
pass
class TableDbTransaction(ABC):
@abstractmethod
@abstractmethod
async def commit(self):
pass
@abstractmethod
@abstractmethod
async def rollback(self):
pass
@abstractmethod
@abstractmethod
async def store(self, col: int, key: bytes, value: bytes):
pass
@abstractmethod
@abstractmethod
async def delete(self, col: int, key: bytes):
pass
class TableDb(ABC):
@abstractmethod
@abstractmethod
async def get_column_count(self) -> int:
pass
@abstractmethod
@abstractmethod
async def get_keys(self, col: int) -> list[bytes]:
pass
@abstractmethod
@abstractmethod
async def transact(self) -> TableDbTransaction:
pass
@abstractmethod
@abstractmethod
async def store(self, col: int, key: bytes, value: bytes):
pass
@abstractmethod
@abstractmethod
async def load(self, col: int, key: bytes) -> Optional[bytes]:
pass
@abstractmethod
@abstractmethod
async def delete(self, col: int, key: bytes) -> Optional[bytes]:
pass
class CryptoSystem(ABC):
@abstractmethod
async def cached_dh(self, key: PublicKey, secret: SecretKey) -> SharedSecret:
@abstractmethod
async def cached_dh(
self, key: types.PublicKey, secret: types.SecretKey
) -> types.SharedSecret:
pass
@abstractmethod
async def compute_dh(self, key: PublicKey, secret: SecretKey) -> SharedSecret:
@abstractmethod
async def compute_dh(
self, key: types.PublicKey, secret: types.SecretKey
) -> types.SharedSecret:
pass
@abstractmethod
@abstractmethod
async def random_bytes(self, len: int) -> bytes:
pass
@abstractmethod
@abstractmethod
async def default_salt_length(self) -> int:
pass
@abstractmethod
@abstractmethod
async def hash_password(self, password: bytes, salt: bytes) -> str:
pass
@abstractmethod
@abstractmethod
async def verify_password(self, password: bytes, password_hash: str) -> bool:
pass
@abstractmethod
async def derive_shared_secret(self, password: bytes, salt: bytes) -> SharedSecret:
@abstractmethod
async def derive_shared_secret(
self, password: bytes, salt: bytes
) -> types.SharedSecret:
pass
@abstractmethod
async def random_nonce(self) -> Nonce:
@abstractmethod
async def random_nonce(self) -> types.Nonce:
pass
@abstractmethod
async def random_shared_secret(self) -> SharedSecret:
@abstractmethod
async def random_shared_secret(self) -> types.SharedSecret:
pass
@abstractmethod
async def generate_key_pair(self) -> KeyPair:
@abstractmethod
async def generate_key_pair(self) -> types.KeyPair:
pass
@abstractmethod
async def generate_hash(self, data: bytes) -> HashDigest:
@abstractmethod
async def generate_hash(self, data: bytes) -> types.HashDigest:
pass
@abstractmethod
async def validate_key_pair(self, key: PublicKey, secret: SecretKey) -> bool:
@abstractmethod
async def validate_key_pair(
self, key: types.PublicKey, secret: types.SecretKey
) -> bool:
pass
@abstractmethod
async def validate_hash(self, data: bytes, hash_digest: HashDigest) -> bool:
@abstractmethod
async def validate_hash(self, data: bytes, hash_digest: types.HashDigest) -> bool:
pass
@abstractmethod
async def distance(self, key1: CryptoKey, key2: CryptoKey) -> CryptoKeyDistance:
@abstractmethod
async def distance(
self, key1: types.CryptoKey, key2: types.CryptoKey
) -> types.CryptoKeyDistance:
pass
@abstractmethod
async def sign(self, key: PublicKey, secret: SecretKey, data: bytes) -> Signature:
@abstractmethod
async def sign(
self, key: types.PublicKey, secret: types.SecretKey, data: bytes
) -> types.Signature:
pass
@abstractmethod
async def verify(self, key: PublicKey, data: bytes, signature: Signature):
@abstractmethod
async def verify(
self, key: types.PublicKey, data: bytes, signature: types.Signature
):
pass
@abstractmethod
@abstractmethod
async def aead_overhead(self) -> int:
pass
@abstractmethod
async def decrypt_aead(self, body: bytes, nonce: Nonce, shared_secret: SharedSecret, associated_data: Optional[bytes]) -> bytes:
@abstractmethod
async def decrypt_aead(
self,
body: bytes,
nonce: types.Nonce,
shared_secret: types.SharedSecret,
associated_data: Optional[bytes],
) -> bytes:
pass
@abstractmethod
async def encrypt_aead(self, body: bytes, nonce: Nonce, shared_secret: SharedSecret, associated_data: Optional[bytes]) -> bytes:
@abstractmethod
async def encrypt_aead(
self,
body: bytes,
nonce: types.Nonce,
shared_secret: types.SharedSecret,
associated_data: Optional[bytes],
) -> bytes:
pass
@abstractmethod
async def crypt_no_auth(self, body: bytes, nonce: Nonce, shared_secret: SharedSecret) -> bytes:
@abstractmethod
async def crypt_no_auth(
self, body: bytes, nonce: types.Nonce, shared_secret: types.SharedSecret
) -> bytes:
pass
class VeilidAPI(ABC):
@abstractmethod
@abstractmethod
async def control(self, args: list[str]) -> str:
pass
@abstractmethod
@abstractmethod
async def get_state(self) -> VeilidState:
pass
@abstractmethod
@abstractmethod
async def attach(self):
pass
@abstractmethod
@abstractmethod
async def detach(self):
pass
@abstractmethod
async def new_private_route(self) -> Tuple[RouteId, bytes]:
@abstractmethod
async def new_private_route(self) -> tuple[types.RouteId, bytes]:
pass
@abstractmethod
async def new_custom_private_route(self, kinds: list[CryptoKind], stability: Stability, sequencing: Sequencing) -> Tuple[RouteId, bytes]:
@abstractmethod
async def new_custom_private_route(
self,
kinds: list[types.CryptoKind],
stability: types.Stability,
sequencing: types.Sequencing,
) -> tuple[types.RouteId, bytes]:
pass
@abstractmethod
async def import_remote_private_route(self, blob: bytes) -> RouteId:
@abstractmethod
async def import_remote_private_route(self, blob: bytes) -> types.RouteId:
pass
@abstractmethod
async def release_private_route(self, route_id: RouteId):
@abstractmethod
async def release_private_route(self, route_id: types.RouteId):
pass
@abstractmethod
async def app_call_reply(self, call_id: OperationId, message: bytes):
@abstractmethod
async def app_call_reply(self, call_id: types.OperationId, message: bytes):
pass
@abstractmethod
@abstractmethod
async def new_routing_context(self) -> RoutingContext:
pass
@abstractmethod
@abstractmethod
async def open_table_db(self, name: str, column_count: int) -> TableDb:
pass
@abstractmethod
@abstractmethod
async def delete_table_db(self, name: str):
pass
@abstractmethod
async def get_crypto_system(self, kind: CryptoKind) -> CryptoSystem:
@abstractmethod
async def get_crypto_system(self, kind: types.CryptoKind) -> CryptoSystem:
pass
@abstractmethod
@abstractmethod
async def best_crypto_system(self) -> CryptoSystem:
pass
@abstractmethod
async def verify_signatures(self, node_ids: list[TypedKey], data: bytes, signatures: list[TypedSignature]) -> list[TypedKey]:
@abstractmethod
async def verify_signatures(
self,
node_ids: list[types.TypedKey],
data: bytes,
signatures: list[types.TypedSignature],
) -> list[types.TypedKey]:
pass
@abstractmethod
async def generate_signatures(self, data: bytes, key_pairs: list[TypedKeyPair]) -> list[TypedSignature]:
@abstractmethod
async def generate_signatures(
self, data: bytes, key_pairs: list[types.TypedKeyPair]
) -> list[types.TypedSignature]:
pass
@abstractmethod
async def generate_key_pair(self, kind: CryptoKind) -> list[TypedKeyPair]:
@abstractmethod
async def generate_key_pair(self, kind: types.CryptoKind) -> list[types.TypedKeyPair]:
pass
@abstractmethod
async def now(self) -> Timestamp:
@abstractmethod
async def now(self) -> types.Timestamp:
pass
@abstractmethod
@abstractmethod
async def debug(self, command: str) -> str:
pass
@abstractmethod
@abstractmethod
async def veilid_version_string(self) -> str:
pass
@abstractmethod
async def veilid_version(self) -> VeilidVersion:
@abstractmethod
async def veilid_version(self) -> types.VeilidVersion:
pass

View File

@ -1,18 +1,46 @@
from typing import Self, Optional
from dataclasses import dataclass, fields
from enum import StrEnum
from json import dumps
from typing import Optional, Self
from .types import TypedKey, TypedSecret
from .types import *
class VeilidConfigLogLevel(StrEnum):
OFF = 'Off'
ERROR = 'Error'
WARN = 'Warn'
INFO = 'Info'
DEBUG = 'Debug'
TRACE = 'Trace'
OFF = "Off"
ERROR = "Error"
WARN = "Warn"
INFO = "Info"
DEBUG = "Debug"
TRACE = "Trace"
class VeilidConfigCapabilities:
@dataclass
class ConfigBase:
@classmethod
def from_json(cls, json_data: dict) -> Self:
"""Return an instance of this type from the input data."""
args = {}
for field in fields(cls):
key = field.name
value = json_data[key]
try:
# See if this field's type knows how to load itself from JSON input.
loader = field.type.from_json
except AttributeError:
# No, it doesn't. Use the raw value.
args[key] = value
else:
# Yes, it does. Use the loading function's output.
args[key] = loader(value)
return cls(**args)
def to_json(self) -> dict:
return self.__dict__
@dataclass
class VeilidConfigCapabilities(ConfigBase):
protocol_udp: bool
protocol_connect_tcp: bool
protocol_accept_tcp: bool
@ -21,30 +49,9 @@ class VeilidConfigCapabilities:
protocol_connect_wss: bool
protocol_accept_wss: bool
def __init__(self, protocol_udp: bool, protocol_connect_tcp: bool, protocol_accept_tcp: bool,
protocol_connect_ws: bool, protocol_accept_ws: bool, protocol_connect_wss: bool, protocol_accept_wss: bool):
self.protocol_udp = protocol_udp
self.protocol_connect_tcp = protocol_connect_tcp
self.protocol_accept_tcp = protocol_accept_tcp
self.protocol_connect_ws = protocol_connect_ws
self.protocol_accept_ws = protocol_accept_ws
self.protocol_connect_wss = protocol_connect_wss
self.protocol_accept_wss = protocol_accept_wss
@staticmethod
def from_json(j: dict) -> Self:
return VeilidConfigCapabilities(j['protocol_udp'],
j['protocol_connect_tcp'],
j['protocol_accept_tcp'],
j['protocol_connect_ws'],
j['protocol_accept_ws'],
j['protocol_connect_wss'],
j['protocol_accept_wss'])
def to_json(self) -> dict:
return self.__dict__
class VeilidConfigProtectedStore:
@dataclass
class VeilidConfigProtectedStore(ConfigBase):
allow_insecure_fallback: bool
always_use_insecure_storage: bool
directory: str
@ -52,52 +59,21 @@ class VeilidConfigProtectedStore:
device_encryption_key_password: str
new_device_encryption_key_password: Optional[str]
def __init__(self, allow_insecure_fallback: bool, always_use_insecure_storage: bool,
directory: str, delete: bool, device_encryption_key_password: str, new_device_encryption_key_password: Optional[str]):
self.allow_insecure_fallback = allow_insecure_fallback
self.always_use_insecure_storage = always_use_insecure_storage
self.directory = directory
self.delete = delete
self.device_encryption_key_password = device_encryption_key_password
self.new_device_encryption_key_password = new_device_encryption_key_password
@staticmethod
def from_json(j: dict) -> Self:
return VeilidConfigProtectedStore(j['allow_insecure_fallback'], j['always_use_insecure_storage'],
j['directory'], j['delete'], j['device_encryption_key_password'], j['new_device_encryption_key_password'])
def to_json(self) -> dict:
return self.__dict__
class VeilidConfigTableStore:
@dataclass
class VeilidConfigTableStore(ConfigBase):
directory: str
delete: bool
def __init__(self, directory: str, delete: bool):
self.directory = directory
self.delete = delete
@staticmethod
def from_json(j: dict) -> Self:
return VeilidConfigTableStore(j['directory'], j['delete'])
def to_json(self) -> dict:
return self.__dict__
class VeilidConfigBlockStore:
@dataclass
class VeilidConfigBlockStore(ConfigBase):
directory: str
delete: bool
def __init__(self, directory: str, delete: bool):
self.directory = directory
self.delete = delete
@staticmethod
def from_json(j: dict) -> Self:
return VeilidConfigBlockStore(j['directory'], j['delete'])
def to_json(self) -> dict:
return self.__dict__
class VeilidConfigRoutingTable:
@dataclass
class VeilidConfigRoutingTable(ConfigBase):
node_id: list[TypedKey]
node_id_secret: list[TypedSecret]
bootstrap: list[str]
@ -107,34 +83,9 @@ class VeilidConfigRoutingTable:
limit_attached_good: int
limit_attached_weak: int
def __init__(self, node_id: list[TypedKey], node_id_secret: list[TypedSecret], bootstrap: list[str], limit_over_attached: int,
limit_fully_attached: int, limit_attached_strong: int, limit_attached_good: int, limit_attached_weak: int):
self.node_id = node_id
self.node_id_secret = node_id_secret
self.bootstrap = bootstrap
self.limit_over_attached = limit_over_attached
self.limit_fully_attached = limit_fully_attached
self.limit_attached_strong = limit_attached_strong
self.limit_attached_good = limit_attached_good
self.limit_attached_weak = limit_attached_weak
@staticmethod
def from_json(j: dict) -> Self:
return VeilidConfigRoutingTable(
list(map(lambda x: TypedKey(x), j['node_id'])),
list(map(lambda x: TypedSecret(x), j['node_id_secret'])),
j['bootstrap'],
j['limit_over_attached'],
j['limit_fully_attached'],
j['limit_attached_strong'],
j['limit_attached_good'],
j['limit_attached_weak'])
def to_json(self) -> dict:
return self.__dict__
class VeilidConfigRPC:
@dataclass
class VeilidConfigRPC(ConfigBase):
concurrency: int
queue_size: int
max_timestamp_behind_ms: Optional[int]
@ -143,31 +94,9 @@ class VeilidConfigRPC:
max_route_hop_count: int
default_route_hop_count: int
def __init__(self, concurrency: int, queue_size: int, max_timestamp_behind_ms: Optional[int], max_timestamp_ahead_ms: Optional[int],
timeout_ms: int, max_route_hop_count: int, default_route_hop_count: int):
self.concurrency = concurrency
self.queue_size = queue_size
self.max_timestamp_behind_ms = max_timestamp_behind_ms
self.max_timestamp_ahead_ms = max_timestamp_ahead_ms
self.timeout_ms = timeout_ms
self.max_route_hop_count = max_route_hop_count
self.default_route_hop_count = default_route_hop_count
@staticmethod
def from_json(j: dict) -> Self:
return VeilidConfigRPC(
j['concurrency'],
j['queue_size'],
j['max_timestamp_behind_ms'],
j['max_timestamp_ahead_ms'],
j['timeout_ms'],
j['max_route_hop_count'],
j['default_route_hop_count'])
def to_json(self) -> dict:
return self.__dict__
class VeilidConfigDHT:
@dataclass
class VeilidConfigDHT(ConfigBase):
max_find_node_count: int
resolve_node_timeout_ms: int
resolve_node_count: int
@ -188,265 +117,83 @@ class VeilidConfigDHT:
remote_max_subkey_cache_memory_mb: int
remote_max_storage_space_mb: int
def __init__(self, max_find_node_count: int, resolve_node_timeout_ms: int, resolve_node_count: int,
resolve_node_fanout: int, get_value_timeout_ms: int, get_value_count: int, get_value_fanout: int,
set_value_timeout_ms: int, set_value_count: int, set_value_fanout: int,
min_peer_count: int, min_peer_refresh_time_ms: int, validate_dial_info_receipt_time_ms: int,
local_subkey_cache_size: int, local_max_subkey_cache_memory_mb: int,
remote_subkey_cache_size: int, remote_max_records: int, remote_max_subkey_cache_memory_mb: int, remote_max_storage_space_mb: int):
self.max_find_node_count = max_find_node_count
self.resolve_node_timeout_ms =resolve_node_timeout_ms
self.resolve_node_count = resolve_node_count
self.resolve_node_fanout = resolve_node_fanout
self.get_value_timeout_ms = get_value_timeout_ms
self.get_value_count = get_value_count
self.get_value_fanout = get_value_fanout
self.set_value_timeout_ms = set_value_timeout_ms
self.set_value_count = set_value_count
self.set_value_fanout = set_value_fanout
self.min_peer_count = min_peer_count
self.min_peer_refresh_time_ms = min_peer_refresh_time_ms
self.validate_dial_info_receipt_time_ms = validate_dial_info_receipt_time_ms
self.local_subkey_cache_size = local_subkey_cache_size
self.local_max_subkey_cache_memory_mb = local_max_subkey_cache_memory_mb
self.remote_subkey_cache_size = remote_subkey_cache_size
self.remote_max_records = remote_max_records
self.remote_max_subkey_cache_memory_mb = remote_max_subkey_cache_memory_mb
self.remote_max_storage_space_mb = remote_max_storage_space_mb
@staticmethod
def from_json(j: dict) -> Self:
return VeilidConfigDHT(
j['max_find_node_count'],
j['resolve_node_timeout_ms'],
j['resolve_node_count'],
j['resolve_node_fanout'],
j['get_value_timeout_ms'],
j['get_value_count'],
j['get_value_fanout'],
j['set_value_timeout_ms'],
j['set_value_count'],
j['set_value_fanout'],
j['min_peer_count'],
j['min_peer_refresh_time_ms'],
j['validate_dial_info_receipt_time_ms'],
j['local_subkey_cache_size'],
j['local_max_subkey_cache_memory_mb'],
j['remote_subkey_cache_size'],
j['remote_max_records'],
j['remote_max_subkey_cache_memory_mb'],
j['remote_max_storage_space_mb'])
def to_json(self) -> dict:
return self.__dict__
class VeilidConfigTLS:
@dataclass
class VeilidConfigTLS(ConfigBase):
certificate_path: str
private_key_path: str
connection_initial_timeout_ms: int
def __init__(self, certificate_path: str, private_key_path: str, connection_initial_timeout_ms: int):
self.certificate_path = certificate_path
self.private_key_path = private_key_path
self.connection_initial_timeout_ms = connection_initial_timeout_ms
@staticmethod
def from_json(j: dict) -> Self:
return VeilidConfigTLS(
j['certificate_path'],
j['private_key_path'],
j['connection_initial_timeout_ms'])
def to_json(self) -> dict:
return self.__dict__
class VeilidConfigHTTPS:
@dataclass
class VeilidConfigHTTPS(ConfigBase):
enabled: bool
listen_address: str
path: str
url: Optional[str]
def __init__(self, enabled: bool, listen_address: str, path: str, url: Optional[str]):
self.enabled = enabled
self.listen_address = listen_address
self.path = path
self.url = url
@staticmethod
def from_json(j: dict) -> Self:
return VeilidConfigHTTPS(
j['enabled'],
j['listen_address'],
j['path'],
j['url'])
def to_json(self) -> dict:
return self.__dict__
class VeilidConfigHTTP:
@dataclass
class VeilidConfigHTTP(ConfigBase):
enabled: bool
listen_address: str
path: str
url: Optional[str]
def __init__(self, enabled: bool, listen_address: str, path: str, url: Optional[str]):
self.enabled = enabled
self.listen_address = listen_address
self.path = path
self.url = url
@staticmethod
def from_json(j: dict) -> Self:
return VeilidConfigHTTP(
j['enabled'],
j['listen_address'],
j['path'],
j['url'])
def to_json(self) -> dict:
return self.__dict__
class VeilidConfigApplication:
@dataclass
class VeilidConfigApplication(ConfigBase):
https: VeilidConfigHTTPS
http: VeilidConfigHTTP
def __init__(self, https: VeilidConfigHTTPS, http: VeilidConfigHTTP):
self.https = https
self.http = http
@staticmethod
def from_json(j: dict) -> Self:
return VeilidConfigApplication(
VeilidConfigHTTPS.from_json(j['https']),
VeilidConfigHTTP.from_json(j['http']))
def to_json(self) -> dict:
return self.__dict__
class VeilidConfigUDP:
@dataclass
class VeilidConfigUDP(ConfigBase):
enabled: bool
socket_pool_size: int
listen_address: str
public_address: Optional[str]
def __init__(self, enabled: bool, socket_pool_size: int, listen_address: str, public_address: Optional[str]):
self.enabled = enabled
self.socket_pool_size = socket_pool_size
self.listen_address = listen_address
self.public_address = public_address
@staticmethod
def from_json(j: dict) -> Self:
return VeilidConfigUDP(
j['enabled'],
j['socket_pool_size'],
j['listen_address'],
j['public_address'])
def to_json(self) -> dict:
return self.__dict__
class VeilidConfigTCP:
@dataclass
class VeilidConfigTCP(ConfigBase):
connect: bool
listen: bool
max_connections: int
listen_address: str
public_address: Optional[str]
def __init__(self, connect: bool, listen: bool, max_connections: int, listen_address: str, public_address: Optional[str]):
self.connect = connect
self.listen = listen
self.max_connections = max_connections
self.listen_address = listen_address
self.public_address = public_address
@staticmethod
def from_json(j: dict) -> Self:
return VeilidConfigTCP(
j['connect'],
j['listen'],
j['max_connections'],
j['listen_address'],
j['public_address'])
def to_json(self) -> dict:
return self.__dict__
class VeilidConfigWS:
@dataclass
class VeilidConfigWS(ConfigBase):
connect: bool
listen: bool
max_connections: int
listen_address: str
path: str
url: Optional[str]
def __init__(self, connect: bool, listen: bool, max_connections: int, listen_address: str, path: str, url: Optional[str]):
self.connect = connect
self.listen = listen
self.max_connections = max_connections
self.listen_address = listen_address
self.path = path
self.url = url
@staticmethod
def from_json(j: dict) -> Self:
return VeilidConfigWS(
j['connect'],
j['listen'],
j['max_connections'],
j['listen_address'],
j['path'],
j['url'])
def to_json(self) -> dict:
return self.__dict__
class VeilidConfigWSS:
@dataclass
class VeilidConfigWSS(ConfigBase):
connect: bool
listen: bool
max_connections: int
listen_address: str
path: str
url: Optional[str]
def __init__(self, connect: bool, listen: bool, max_connections: int, listen_address: str, path: str, url: Optional[str]):
self.connect = connect
self.listen = listen
self.max_connections = max_connections
self.listen_address = listen_address
self.path = path
self.url = url
@staticmethod
def from_json(j: dict) -> Self:
return VeilidConfigWSS(
j['connect'],
j['listen'],
j['max_connections'],
j['listen_address'],
j['path'],
j['url'])
def to_json(self) -> dict:
return self.__dict__
class VeilidConfigProtocol:
@dataclass
class VeilidConfigProtocol(ConfigBase):
udp: VeilidConfigUDP
tcp: VeilidConfigTCP
ws: VeilidConfigWS
wss: VeilidConfigWSS
def __init__(self, udp: VeilidConfigUDP, tcp: VeilidConfigTCP, ws: VeilidConfigWS, wss: VeilidConfigWSS):
self.udp = udp
self.tcp = tcp
self.ws = ws
self.wss = wss
@staticmethod
def from_json(j: dict) -> Self:
return VeilidConfigProtocol(
VeilidConfigUDP.from_json(j['udp']),
VeilidConfigTCP.from_json(j['tcp']),
VeilidConfigWS.from_json(j['ws']),
VeilidConfigWSS.from_json(j['wss']))
def to_json(self) -> dict:
return self.__dict__
class VeilidConfigNetwork:
@dataclass
class VeilidConfigNetwork(ConfigBase):
connection_initial_timeout_ms: int
connection_inactivity_timeout_ms: int
max_connections_per_ip4: int
@ -466,58 +213,9 @@ class VeilidConfigNetwork:
application: VeilidConfigApplication
protocol: VeilidConfigProtocol
def __init__(self, connection_initial_timeout_ms: int, connection_inactivity_timeout_ms: int,
max_connections_per_ip4: int, max_connections_per_ip6_prefix: int,
max_connections_per_ip6_prefix_size: int, max_connection_frequency_per_min: int,
client_whitelist_timeout_ms: int, reverse_connection_receipt_time_ms: int,
hole_punch_receipt_time_ms: int, routing_table: VeilidConfigRoutingTable,
rpc: VeilidConfigRPC, dht: VeilidConfigDHT, upnp: bool, detect_address_changes: bool,
restricted_nat_retries: int, tls: VeilidConfigTLS, application: VeilidConfigApplication, protocol: VeilidConfigProtocol):
self.connection_initial_timeout_ms = connection_initial_timeout_ms
self.connection_inactivity_timeout_ms = connection_inactivity_timeout_ms
self.max_connections_per_ip4 = max_connections_per_ip4
self.max_connections_per_ip6_prefix = max_connections_per_ip6_prefix
self.max_connections_per_ip6_prefix_size = max_connections_per_ip6_prefix_size
self.max_connection_frequency_per_min = max_connection_frequency_per_min
self.client_whitelist_timeout_ms = client_whitelist_timeout_ms
self.reverse_connection_receipt_time_ms = reverse_connection_receipt_time_ms
self.hole_punch_receipt_time_ms = hole_punch_receipt_time_ms
self.routing_table = routing_table
self.rpc = rpc
self.dht = dht
self.upnp = upnp
self.detect_address_changes = detect_address_changes
self.restricted_nat_retries = restricted_nat_retries
self.tls = tls
self.application = application
self.protocol = protocol
@staticmethod
def from_json(j: dict) -> Self:
return VeilidConfigNetwork(
j['connection_initial_timeout_ms'],
j['connection_inactivity_timeout_ms'],
j['max_connections_per_ip4'],
j['max_connections_per_ip6_prefix'],
j['max_connections_per_ip6_prefix_size'],
j['max_connection_frequency_per_min'],
j['client_whitelist_timeout_ms'],
j['reverse_connection_receipt_time_ms'],
j['hole_punch_receipt_time_ms'],
VeilidConfigRoutingTable.from_json(j['routing_table']),
VeilidConfigRPC.from_json(j['rpc']),
VeilidConfigDHT.from_json(j['dht']),
j['upnp'],
j['detect_address_changes'],
j['restricted_nat_retries'],
VeilidConfigTLS.from_json(j['tls']),
VeilidConfigApplication.from_json(j['application']),
VeilidConfigProtocol.from_json(j['protocol']))
def to_json(self) -> dict:
return self.__dict__
class VeilidConfig:
@dataclass
class VeilidConfig(ConfigBase):
program_name: str
namespace: str
capabilities: VeilidConfigCapabilities
@ -525,29 +223,3 @@ class VeilidConfig:
table_store: VeilidConfigTableStore
block_store: VeilidConfigBlockStore
network: VeilidConfigNetwork
def __init__(self, program_name: str, namespace: str, capabilities: VeilidConfigCapabilities,
protected_store: VeilidConfigProtectedStore, table_store: VeilidConfigTableStore,
block_store: VeilidConfigBlockStore, network: VeilidConfigNetwork):
self.program_name = program_name
self.namespace = namespace
self.capabilities = capabilities
self.protected_store = protected_store
self.table_store = table_store
self.block_store = block_store
self.network = network
@staticmethod
def from_json(j: dict) -> Self:
'''JSON object hook'''
return VeilidConfig(j['program_name'], j['namespace'],
VeilidConfigCapabilities.from_json(j['capabilities']),
VeilidConfigProtectedStore.from_json(j['protected_store']),
VeilidConfigTableStore.from_json(j['table_store']),
VeilidConfigBlockStore.from_json(j['block_store']),
VeilidConfigNetwork.from_json(j['network']))
def to_json(self) -> dict:
return self.__dict__

View File

@ -1,142 +1,155 @@
from typing import Self, Any
import inspect
from dataclasses import dataclass
from typing import Any, Self
_ERROR_REGISTRY: dict[str, type] = {}
class VeilidAPIError(Exception):
"""Veilid API error exception base class"""
pass
@staticmethod
def from_json(j: dict) -> Self:
match j['kind']:
case 'NotInitialized':
return VeilidAPIErrorNotInitialized()
case 'AlreadyInitialized':
return VeilidAPIErrorAlreadyInitialized()
case 'Timeout':
return VeilidAPIErrorTimeout()
case 'TryAgain':
return VeilidAPIErrorTryAgain()
case 'Shutdown':
return VeilidAPIErrorShutdown()
case 'InvalidTarget':
return VeilidAPIErrorInvalidTarget()
case 'NoConnection':
return VeilidAPIErrorNoConnection(j['message'])
case 'KeyNotFound':
return VeilidAPIErrorKeyNotFound(j['key'])
case 'Internal':
return VeilidAPIErrorInternal(j['message'])
case 'Unimplemented':
return VeilidAPIErrorUnimplemented(j['message'])
case 'ParseError':
return VeilidAPIErrorParseError(j['message'], j['value'])
case 'InvalidArgument':
return VeilidAPIErrorInvalidArgument(j['context'], j['argument'], j['value'])
case 'MissingArgument':
return VeilidAPIErrorMissingArgument(j['context'], j['argument'])
case 'Generic':
return VeilidAPIErrorGeneric(j['message'])
case _:
return VeilidAPIError("Unknown exception type: {}".format(j['kind']))
label = "Base class"
def __init__(self, *args, **kwargs):
super().__init__(self.label, *args, **kwargs)
@classmethod
def from_json(cls, json: dict) -> Self:
kind = json["kind"]
try:
error_class = _ERROR_REGISTRY[kind]
except KeyError:
return cls(f"Unknown exception type: {kind}")
args = {key: value for key, value in json.items() if key != "kind"}
return error_class(**args)
@dataclass
class VeilidAPIErrorNotInitialized(VeilidAPIError):
"""Veilid was not initialized"""
def __init__(self):
super().__init__("Not initialized")
label = "Not initialized"
@dataclass
class VeilidAPIErrorAlreadyInitialized(VeilidAPIError):
"""Veilid was already initialized"""
def __init__(self):
super().__init__("Already initialized")
label = "Already initialized"
@dataclass
class VeilidAPIErrorTimeout(VeilidAPIError):
"""Veilid operation timed out"""
def __init__(self):
super().__init__("Timeout")
label = "Timeout"
@dataclass
class VeilidAPIErrorTryAgain(VeilidAPIError):
"""Operation could not be performed at this time, retry again later"""
def __init__(self):
super().__init__("Try again")
label = "Try again"
@dataclass
class VeilidAPIErrorShutdown(VeilidAPIError):
"""Veilid was already shut down"""
def __init__(self):
super().__init__("Shutdown")
label = "Shutdown"
@dataclass
class VeilidAPIErrorInvalidTarget(VeilidAPIError):
"""Target of operation is not valid"""
def __init__(self):
super().__init__("Invalid target")
label = "Invalid target"
@dataclass
class VeilidAPIErrorNoConnection(VeilidAPIError):
"""Connection could not be established"""
message: str
def __init__(self, message: str):
super().__init__("No connection")
self.message = message
label = "No connection"
message: str
@dataclass
class VeilidAPIErrorKeyNotFound(VeilidAPIError):
"""Key was not found"""
key: str
def __init__(self, key: str):
super().__init__("Key not found")
self.key = key
label = "Key not found"
key: str
@dataclass
class VeilidAPIErrorInternal(VeilidAPIError):
"""Veilid experienced an internal failure"""
message: str
def __init__(self, message: str):
super().__init__("Internal")
self.message = message
label = "Internal"
message: str
@dataclass
class VeilidAPIErrorUnimplemented(VeilidAPIError):
"""Functionality is not yet implemented"""
message: str
def __init__(self, message: str):
super().__init__("Unimplemented")
self.message = message
label = "Unimplemented"
message: str
@dataclass
class VeilidAPIErrorParseError(VeilidAPIError):
"""Value was not in a parseable format"""
label = "Parse error"
message: str
value: str
def __init__(self, message: str, value: str):
super().__init__("Parse error")
self.message = message
self.value = value
@dataclass
class VeilidAPIErrorInvalidArgument(VeilidAPIError):
"""Argument is not valid in this context"""
label = "Invalid argument"
context: str
argument: str
value: str
def __init__(self, context: str, argument: str, value: str):
super().__init__("Invalid argument")
self.context = context
self.argument = argument
self.value = value
@dataclass
class VeilidAPIErrorMissingArgument(VeilidAPIError):
"""Required argument was missing"""
label = "Missing argument"
context: str
argument: str
def __init__(self, context: str, argument: str):
super().__init__("Missing argument")
self.context = context
self.argument = argument
@dataclass
class VeilidAPIErrorGeneric(VeilidAPIError):
"""Generic error message"""
label = "Generic"
message: str
def __init__(self, message: str):
super().__init__("Generic")
self.message = message
# Build a mapping of canonicalized labels to their exception classes. Do this in-place to update
# the object inside the closure so VeilidAPIError.from_json can access the values.
_ERROR_REGISTRY.clear()
_ERROR_REGISTRY.update(
{
obj.label.title().replace(" ", ""): obj
for obj in vars().values()
if inspect.isclass(obj) and issubclass(obj, VeilidAPIError)
}
)
def raise_api_result(api_result: dict) -> Any:
if "value" in api_result:
return api_result["value"]
elif "error" in api_result:
if "error" in api_result:
raise VeilidAPIError.from_json(api_result["error"])
else:
raise ValueError("Invalid format for ApiResult")
raise ValueError("Invalid format for ApiResult")

File diff suppressed because it is too large Load Diff

View File

@ -1,36 +1,47 @@
from typing import Self, Optional
from enum import StrEnum
from typing import Optional, Self
from .config import VeilidConfig
from .types import (ByteCount, RouteId, Timestamp, TimestampDuration, TypedKey,
ValueData, ValueSubkey, VeilidLogLevel,
urlsafe_b64decode_no_pad)
from .types import *
from .config import *
class AttachmentState(StrEnum):
DETACHED = 'Detached'
ATTACHING = 'Attaching'
ATTACHED_WEAK = 'AttachedWeak'
ATTACHED_GOOD = 'AttachedGood'
ATTACHED_STRONG = 'AttachedStrong'
FULLY_ATTACHED = 'FullyAttached'
OVER_ATTACHED = 'OverAttached'
DETACHING = 'Detaching'
DETACHED = "Detached"
ATTACHING = "Attaching"
ATTACHED_WEAK = "AttachedWeak"
ATTACHED_GOOD = "AttachedGood"
ATTACHED_STRONG = "AttachedStrong"
FULLY_ATTACHED = "FullyAttached"
OVER_ATTACHED = "OverAttached"
DETACHING = "Detaching"
class VeilidStateAttachment:
state: AttachmentState
public_internet_ready: bool
local_network_ready: bool
def __init__(self, state: AttachmentState, public_internet_ready: bool, local_network_ready: bool):
def __init__(
self,
state: AttachmentState,
public_internet_ready: bool,
local_network_ready: bool,
):
self.state = state
self.public_internet_ready = public_internet_ready
self.local_network_ready = local_network_ready
@staticmethod
def from_json(j: dict) -> Self:
'''JSON object hook'''
return VeilidStateAttachment(
AttachmentState(j['state']),
j['public_internet_ready'],
j['local_network_ready'])
@classmethod
def from_json(cls, j: dict) -> Self:
"""JSON object hook"""
return cls(
AttachmentState(j["state"]),
j["public_internet_ready"],
j["local_network_ready"],
)
class RPCStats:
messages_sent: int
@ -42,9 +53,17 @@ class RPCStats:
recent_lost_answers: int
failed_to_send: int
def __init__(self, messages_sent: int, messages_rcvd: int, questions_in_flight: int,
last_question_ts: Optional[Timestamp], last_seen_ts: Optional[Timestamp],
first_consecutive_seen_ts: Optional[Timestamp], recent_lost_answers: int, failed_to_send: int):
def __init__(
self,
messages_sent: int,
messages_rcvd: int,
questions_in_flight: int,
last_question_ts: Optional[Timestamp],
last_seen_ts: Optional[Timestamp],
first_consecutive_seen_ts: Optional[Timestamp],
recent_lost_answers: int,
failed_to_send: int,
):
self.messages_sent = messages_sent
self.messages_rcvd = messages_rcvd
self.questions_in_flight = questions_in_flight
@ -53,37 +72,47 @@ class RPCStats:
self.first_consecutive_seen_ts = first_consecutive_seen_ts
self.recent_lost_answers = recent_lost_answers
self.failed_to_send = failed_to_send
@staticmethod
def from_json(j: dict) -> Self:
'''JSON object hook'''
return RPCStats(
j['messages_sent'],
j['messages_rcvd'],
j['questions_in_flight'],
None if j['last_question_ts'] is None else Timestamp(j['last_question_ts']),
None if j['last_seen_ts'] is None else Timestamp(j['last_seen_ts']),
None if j['first_consecutive_seen_ts'] is None else Timestamp(j['first_consecutive_seen_ts']),
j['recent_lost_answers'],
j['failed_to_send'])
@classmethod
def from_json(cls, j: dict) -> Self:
"""JSON object hook"""
return cls(
j["messages_sent"],
j["messages_rcvd"],
j["questions_in_flight"],
None if j["last_question_ts"] is None else Timestamp(j["last_question_ts"]),
None if j["last_seen_ts"] is None else Timestamp(j["last_seen_ts"]),
None
if j["first_consecutive_seen_ts"] is None
else Timestamp(j["first_consecutive_seen_ts"]),
j["recent_lost_answers"],
j["failed_to_send"],
)
class LatencyStats:
fastest: TimestampDuration
average: TimestampDuration
slowest: TimestampDuration
def __init__(self, fastest: TimestampDuration, average: TimestampDuration, slowest: TimestampDuration):
def __init__(
self,
fastest: TimestampDuration,
average: TimestampDuration,
slowest: TimestampDuration,
):
self.fastest = fastest
self.average = average
self.slowest = slowest
@staticmethod
def from_json(j: dict) -> Self:
'''JSON object hook'''
return LatencyStats(
TimestampDuration(j['fastest']),
TimestampDuration(j['average']),
TimestampDuration(j['slowest']))
@classmethod
def from_json(cls, j: dict) -> Self:
"""JSON object hook"""
return cls(
TimestampDuration(j["fastest"]),
TimestampDuration(j["average"]),
TimestampDuration(j["slowest"]),
)
class TransferStats:
@ -92,20 +121,27 @@ class TransferStats:
average: ByteCount
minimum: ByteCount
def __init__(self, total: ByteCount, maximum: ByteCount, average: ByteCount, minimum: ByteCount):
def __init__(
self,
total: ByteCount,
maximum: ByteCount,
average: ByteCount,
minimum: ByteCount,
):
self.total = total
self.maximum = maximum
self.average = average
self.minimum = minimum
@staticmethod
def from_json(j: dict) -> Self:
'''JSON object hook'''
return TransferStats(
ByteCount(j['total']),
ByteCount(j['maximum']),
ByteCount(j['average']),
ByteCount(j['minimum']))
@classmethod
def from_json(cls, j: dict) -> Self:
"""JSON object hook"""
return cls(
ByteCount(j["total"]),
ByteCount(j["maximum"]),
ByteCount(j["average"]),
ByteCount(j["minimum"]),
)
class TransferStatsDownUp:
@ -116,12 +152,11 @@ class TransferStatsDownUp:
self.down = down
self.up = up
@staticmethod
def from_json(j: dict) -> Self:
'''JSON object hook'''
return TransferStatsDownUp(
TransferStats.from_json(j['down']),
TransferStats.from_json(j['up']))
@classmethod
def from_json(cls, j: dict) -> Self:
"""JSON object hook"""
return cls(TransferStats.from_json(j["down"]), TransferStats.from_json(j["up"]))
class PeerStats:
time_added: Timestamp
@ -129,20 +164,28 @@ class PeerStats:
latency: Optional[LatencyStats]
transfer: TransferStatsDownUp
def __init__(self, time_added: Timestamp, rpc_stats: RPCStats, latency: Optional[LatencyStats], transfer: TransferStatsDownUp):
def __init__(
self,
time_added: Timestamp,
rpc_stats: RPCStats,
latency: Optional[LatencyStats],
transfer: TransferStatsDownUp,
):
self.time_added = time_added
self.rpc_stats = rpc_stats
self.latency = latency
self.latency = latency
self.transfer = transfer
@staticmethod
def from_json(j: dict) -> Self:
'''JSON object hook'''
return PeerStats(
j['time_added'],
RPCStats.from_json(j['rpc_stats']),
None if j['latency'] is None else LatencyStats.from_json(j['latency']),
TransferStatsDownUp.from_json(j['transfer']))
@classmethod
def from_json(cls, j: dict) -> Self:
"""JSON object hook"""
return cls(
j["time_added"],
RPCStats.from_json(j["rpc_stats"]),
None if j["latency"] is None else LatencyStats.from_json(j["latency"]),
TransferStatsDownUp.from_json(j["transfer"]),
)
class PeerTableData:
node_ids: list[str]
@ -153,14 +196,14 @@ class PeerTableData:
self.node_ids = node_ids
self.peer_address = peer_address
self.peer_stats = peer_stats
@staticmethod
def from_json(j: dict) -> Self:
'''JSON object hook'''
return PeerTableData(
j['node_ids'],
j['peer_address'],
PeerStats.from_json(j['peer_stats']))
@classmethod
def from_json(cls, j: dict) -> Self:
"""JSON object hook"""
return cls(
j["node_ids"], j["peer_address"], PeerStats.from_json(j["peer_stats"])
)
class VeilidStateNetwork:
started: bool
@ -168,102 +211,120 @@ class VeilidStateNetwork:
bps_up: ByteCount
peers: list[PeerTableData]
def __init__(self, started: bool, bps_down: ByteCount, bps_up: ByteCount, peers: list[PeerTableData]):
def __init__(
self,
started: bool,
bps_down: ByteCount,
bps_up: ByteCount,
peers: list[PeerTableData],
):
self.started = started
self.bps_down = bps_down
self.bps_up = bps_up
self.peers = peers
@staticmethod
def from_json(j: dict) -> Self:
'''JSON object hook'''
return VeilidStateNetwork(
j['started'],
ByteCount(j['bps_down']),
ByteCount(j['bps_up']),
list(map(lambda x: PeerTableData.from_json(x), j['peers'])))
@classmethod
def from_json(cls, j: dict) -> Self:
"""JSON object hook"""
return cls(
j["started"],
ByteCount(j["bps_down"]),
ByteCount(j["bps_up"]),
[PeerTableData.from_json(peer) for peer in j["peers"]],
)
class VeilidStateConfig:
config: VeilidConfig
def __init__(self, config: VeilidConfig):
self.config = config
@staticmethod
def from_json(j: dict) -> Self:
'''JSON object hook'''
return VeilidStateConfig(
VeilidConfig.from_json(j['config'])
)
@classmethod
def from_json(cls, j: dict) -> Self:
"""JSON object hook"""
return cls(VeilidConfig.from_json(j["config"]))
class VeilidState:
attachment: VeilidStateAttachment
network: VeilidStateNetwork
config: VeilidStateConfig
def __init__(self, attachment: VeilidStateAttachment, network: VeilidStateNetwork, config: VeilidStateConfig):
def __init__(
self,
attachment: VeilidStateAttachment,
network: VeilidStateNetwork,
config: VeilidStateConfig,
):
self.attachment = attachment
self.network = network
self.config = config
@staticmethod
def from_json(j: dict) -> Self:
'''JSON object hook'''
return VeilidState(
VeilidStateAttachment.from_json(j['attachment']),
VeilidStateNetwork.from_json(j['network']),
VeilidStateConfig.from_json(j['config']))
@classmethod
def from_json(cls, j: dict) -> Self:
"""JSON object hook"""
return cls(
VeilidStateAttachment.from_json(j["attachment"]),
VeilidStateNetwork.from_json(j["network"]),
VeilidStateConfig.from_json(j["config"]),
)
class VeilidLog:
log_level: VeilidLogLevel
message: str
backtrace: Optional[str]
def __init__(self, log_level: VeilidLogLevel, message: str, backtrace: Optional[str]):
def __init__(
self, log_level: VeilidLogLevel, message: str, backtrace: Optional[str]
):
self.log_level = log_level
self.message = message
self.backtrace = backtrace
@staticmethod
def from_json(j: dict) -> Self:
'''JSON object hook'''
return VeilidLog(
VeilidLogLevel(j['log_level']),
j['message'],
j['backtrace'])
@classmethod
def from_json(cls, j: dict) -> Self:
"""JSON object hook"""
return cls(VeilidLogLevel(j["log_level"]), j["message"], j["backtrace"])
class VeilidAppMessage:
sender: Optional[TypedKey]
message: bytes
def __init__(self, sender: Optional[TypedKey], message: bytes):
self.sender = sender
self.message = message
@staticmethod
def from_json(j: dict) -> Self:
'''JSON object hook'''
return VeilidAppMessage(
None if j['sender'] is None else TypedKey(j['sender']),
urlsafe_b64decode_no_pad(j['message']))
@classmethod
def from_json(cls, j: dict) -> Self:
"""JSON object hook"""
return cls(
None if j["sender"] is None else TypedKey(j["sender"]),
urlsafe_b64decode_no_pad(j["message"]),
)
class VeilidAppCall:
sender: Optional[TypedKey]
message: bytes
call_id: str
def __init__(self, sender: Optional[TypedKey], message: bytes, call_id: str):
self.sender = sender
self.message = message
self.call_id = call_id
@staticmethod
def from_json(j: dict) -> Self:
'''JSON object hook'''
return VeilidAppCall(
None if j['sender'] is None else TypedKey(j['sender']),
urlsafe_b64decode_no_pad(j['message']),
j['call_id'])
@classmethod
def from_json(cls, j: dict) -> Self:
"""JSON object hook"""
return cls(
None if j["sender"] is None else TypedKey(j["sender"]),
urlsafe_b64decode_no_pad(j["message"]),
j["call_id"],
)
class VeilidRouteChange:
dead_routes: list[RouteId]
@ -272,13 +333,15 @@ class VeilidRouteChange:
def __init__(self, dead_routes: list[RouteId], dead_remote_routes: list[RouteId]):
self.dead_routes = dead_routes
self.dead_remote_routes = dead_remote_routes
@staticmethod
def from_json(j: dict) -> Self:
'''JSON object hook'''
return VeilidRouteChange(
list(map(lambda x: RouteId(x), j['dead_routes'])),
list(map(lambda x: RouteId(x), j['dead_remote_routes'])))
@classmethod
def from_json(cls, j: dict) -> Self:
"""JSON object hook"""
return cls(
[RouteId(route) for route in j["dead_routes"]],
[RouteId(route) for route in j["dead_remote_routes"]],
)
class VeilidValueChange:
key: TypedKey
@ -286,20 +349,23 @@ class VeilidValueChange:
count: int
value: ValueData
def __init__(self, key: TypedKey, subkeys: list[ValueSubkey], count: int, value: ValueData):
def __init__(
self, key: TypedKey, subkeys: list[ValueSubkey], count: int, value: ValueData
):
self.key = key
self.subkeys = subkeys
self.count = count
self.value = value
@staticmethod
def from_json(j: dict) -> Self:
'''JSON object hook'''
return VeilidValueChange(
TypedKey(j['key']),
list(map(lambda x: ValueSubkey(x), j['subkeys'])),
j['count'],
ValueData.from_json(j['value']))
@classmethod
def from_json(cls, j: dict) -> Self:
"""JSON object hook"""
return cls(
TypedKey(j["key"]),
[ValueSubkey(key) for key in j["subkeys"]],
j["count"],
ValueData.from_json(j["value"]),
)
class VeilidUpdateKind(StrEnum):
@ -313,19 +379,36 @@ class VeilidUpdateKind(StrEnum):
VALUE_CHANGE = "ValueChange"
SHUTDOWN = "Shutdown"
VeilidUpdateDetailType = Optional[
VeilidLog
| VeilidAppMessage
| VeilidAppCall
| VeilidStateAttachment
| VeilidStateNetwork
| VeilidStateConfig
| VeilidRouteChange
| VeilidValueChange
]
class VeilidUpdate:
kind: VeilidUpdateKind
detail: Optional[VeilidLog | VeilidAppMessage | VeilidAppCall | VeilidStateAttachment | VeilidStateNetwork | VeilidStateConfig | VeilidRouteChange | VeilidValueChange]
detail: VeilidUpdateDetailType
def __init__(self, kind: VeilidUpdateKind, detail: Optional[VeilidLog | VeilidAppMessage | VeilidAppCall | VeilidStateAttachment | VeilidStateNetwork | VeilidStateConfig | VeilidRouteChange | VeilidValueChange]):
def __init__(
self,
kind: VeilidUpdateKind,
detail: VeilidUpdateDetailType,
):
self.kind = kind
self.detail = detail
@staticmethod
def from_json(j: dict) -> Self:
'''JSON object hook'''
kind = VeilidUpdateKind(j['kind'])
detail = None
@classmethod
def from_json(cls, j: dict) -> Self:
"""JSON object hook"""
kind = VeilidUpdateKind(j["kind"])
detail: VeilidUpdateDetailType = None
match kind:
case VeilidUpdateKind.LOG:
detail = VeilidLog.from_json(j)
@ -347,4 +430,4 @@ class VeilidUpdate:
detail = None
case _:
raise ValueError("Unknown VeilidUpdateKind")
return VeilidUpdate(kind, detail)
return cls(kind, detail)

View File

@ -1,12 +1,11 @@
import time
import json
import base64
import json
from enum import StrEnum
from typing import Self, Optional, Any, Tuple
from typing import Any, Optional, Self, Tuple
####################################################################
def urlsafe_b64encode_no_pad(b: bytes) -> str:
"""
Removes any `=` used as padding from the encoded string.
@ -22,6 +21,7 @@ def urlsafe_b64decode_no_pad(s: str) -> bytes:
s = s + ("=" * padding)
return base64.urlsafe_b64decode(s)
class VeilidJSONEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, bytes):
@ -29,173 +29,206 @@ class VeilidJSONEncoder(json.JSONEncoder):
if hasattr(o, "to_json") and callable(o.to_json):
return o.to_json()
return json.JSONEncoder.default(self, o)
@staticmethod
def dumps(req: Any, *args, **kwargs) -> str:
return json.dumps(req, cls = VeilidJSONEncoder, *args, **kwargs)
return json.dumps(req, cls=VeilidJSONEncoder, *args, **kwargs)
####################################################################
class VeilidLogLevel(StrEnum):
ERROR = 'Error'
WARN = 'Warn'
INFO = 'Info'
DEBUG = 'Debug'
TRACE = 'Trace'
ERROR = "Error"
WARN = "Warn"
INFO = "Info"
DEBUG = "Debug"
TRACE = "Trace"
class CryptoKind(StrEnum):
CRYPTO_KIND_NONE = "NONE"
CRYPTO_KIND_VLD0 = "VLD0"
class Stability(StrEnum):
LOW_LATENCY = "LowLatency"
RELIABLE = "Reliable"
class Sequencing(StrEnum):
NO_PREFERENCE = "NoPreference"
PREFER_ORDERED = "PreferOrdered"
ENSURE_ORDERED = "EnsureOrdered"
class DHTSchemaKind(StrEnum):
DFLT = "DFLT"
SMPL = "SMPL"
####################################################################
class Timestamp(int):
pass
class TimestampDuration(int):
pass
class ByteCount(int):
pass
class OperationId(int):
class OperationId(str):
pass
class RouteId(str):
pass
class CryptoKey:
class EncodedString(str):
def to_bytes(self) -> bytes:
return urlsafe_b64decode_no_pad(self)
class CryptoKeyDistance(CryptoKey, str):
@staticmethod
def from_bytes(b: bytes) -> Self:
return CryptoKeyDistance(urlsafe_b64encode_no_pad(b))
@classmethod
def from_bytes(cls, b: bytes) -> Self:
return cls(urlsafe_b64encode_no_pad(b))
class PublicKey(CryptoKey, str):
@staticmethod
def from_bytes(b: bytes) -> Self:
return PublicKey(urlsafe_b64encode_no_pad(b))
class SecretKey(CryptoKey, str):
@staticmethod
def from_bytes(b: bytes) -> Self:
return SecretKey(urlsafe_b64encode_no_pad(b))
class CryptoKey(EncodedString):
pass
class SharedSecret(CryptoKey, str):
@staticmethod
def from_bytes(b: bytes) -> Self:
return SharedSecret(urlsafe_b64encode_no_pad(b))
class HashDigest(CryptoKey, str):
@staticmethod
def from_bytes(b: bytes) -> Self:
return HashDigest(urlsafe_b64encode_no_pad(b))
class CryptoKeyDistance(CryptoKey):
pass
class Signature(str):
@staticmethod
def from_bytes(b: bytes) -> Self:
return Signature(urlsafe_b64encode_no_pad(b))
def to_bytes(self) -> bytes:
return urlsafe_b64decode_no_pad(self)
class Nonce(str):
@staticmethod
def from_bytes(b: bytes) -> Self:
return Signature(urlsafe_b64encode_no_pad(b))
def to_bytes(self) -> bytes:
return urlsafe_b64decode_no_pad(self)
class PublicKey(CryptoKey):
pass
class SecretKey(CryptoKey):
pass
class SharedSecret(CryptoKey):
pass
class HashDigest(CryptoKey):
pass
class Signature(EncodedString):
pass
class Nonce(EncodedString):
pass
class KeyPair(str):
@staticmethod
def from_parts(key: PublicKey, secret: SecretKey) -> Self:
return KeyPair(key + ":" + secret)
def key(self) -> PublicKey:
return PublicKey(str.split(":", 1)[0])
def secret(self) -> SecretKey:
return SecretKey(str.split(":", 1)[1])
def to_parts(self) -> Tuple[PublicKey, SecretKey]:
parts = str.split(":", 1)
return (PublicKey(parts[0]), SecretKey(parts[1]))
@classmethod
def from_parts(cls, key: PublicKey, secret: SecretKey) -> Self:
return cls(f"{key}:{secret}")
class CryptoTyped:
def key(self) -> PublicKey:
return PublicKey(self.split(":", 1)[0])
def secret(self) -> SecretKey:
return SecretKey(self.split(":", 1)[1])
def to_parts(self) -> Tuple[PublicKey, SecretKey]:
public, secret = self.split(":", 1)
return (PublicKey(public), SecretKey(secret))
class CryptoTyped(str):
def kind(self) -> CryptoKind:
if self[4] != ':':
if self[4] != ":":
raise ValueError("Not CryptoTyped")
return CryptoKind(self[0:4])
def _value(self) -> str:
if self[4] != ':':
if self[4] != ":":
raise ValueError("Not CryptoTyped")
return self[5:]
class TypedKey(CryptoTyped, str):
@staticmethod
def from_value(kind: CryptoKind, value: PublicKey) -> Self:
return TypedKey(kind + ":" + value)
class TypedKey(CryptoTyped):
@classmethod
def from_value(cls, kind: CryptoKind, value: PublicKey) -> Self:
return cls(f"{kind}:{value}")
def value(self) -> PublicKey:
PublicKey(self._value())
class TypedSecret(CryptoTyped, str):
@staticmethod
def from_value(kind: CryptoKind, value: SecretKey) -> Self:
return TypedSecret(kind + ":" + value)
return PublicKey(self._value())
class TypedSecret(CryptoTyped):
@classmethod
def from_value(cls, kind: CryptoKind, value: SecretKey) -> Self:
return cls(f"{kind}:{value}")
def value(self) -> SecretKey:
SecretKey(self._value())
return SecretKey(self._value())
class TypedKeyPair(CryptoTyped):
@classmethod
def from_value(cls, kind: CryptoKind, value: KeyPair) -> Self:
return cls(f"{kind}:{value}")
class TypedKeyPair(CryptoTyped, str):
@staticmethod
def from_value(kind: CryptoKind, value: KeyPair) -> Self:
return TypedKeyPair(kind + ":" + value)
def value(self) -> KeyPair:
KeyPair(self._value())
return KeyPair(self._value())
class TypedSignature(CryptoTyped):
@classmethod
def from_value(cls, kind: CryptoKind, value: Signature) -> Self:
return cls(f"{kind}:{value}")
class TypedSignature(CryptoTyped, str):
@staticmethod
def from_value(kind: CryptoKind, value: Signature) -> Self:
return TypedSignature(kind + ":" + value)
def value(self) -> Signature:
Signature(self._value())
return Signature(self._value())
class ValueSubkey(int):
pass
class ValueSeqNum(int):
pass
####################################################################
class VeilidVersion:
_major: int
_minor: int
_patch: int
def __init__(self, major: int, minor: int, patch: int):
self._major = major
self._minor = minor
self._patch = patch
@property
def major(self):
return self._major
@property
def minor(self):
return self._minor
@property
def patch(self):
return self._patch
class NewPrivateRouteResult:
route_id: RouteId
blob: bytes
@ -207,95 +240,106 @@ class NewPrivateRouteResult:
def to_tuple(self) -> Tuple[RouteId, bytes]:
return (self.route_id, self.blob)
@staticmethod
def from_json(j: dict) -> Self:
return NewPrivateRouteResult(
RouteId(j['route_id']),
urlsafe_b64decode_no_pad(j['blob']))
@classmethod
def from_json(cls, j: dict) -> Self:
return cls(RouteId(j["route_id"]), urlsafe_b64decode_no_pad(j["blob"]))
class DHTSchemaSMPLMember:
m_key: PublicKey
m_cnt: int
def __init__(self, m_key: PublicKey, m_cnt: int):
self.m_key = m_key
self.m_cnt = m_cnt
@staticmethod
def from_json(j: dict) -> Self:
return DHTSchemaSMPLMember(
PublicKey(j['m_key']),
j['m_cnt'])
@classmethod
def from_json(cls, j: dict) -> Self:
return cls(PublicKey(j["m_key"]), j["m_cnt"])
def to_json(self) -> dict:
return self.__dict__
class DHTSchema:
kind: DHTSchemaKind
def __init__(self, kind: DHTSchemaKind, **kwargs):
self.kind = kind
for k, v in kwargs.items():
setattr(self, k, v)
@staticmethod
def dflt(o_cnt: int) -> Self:
Self(DHTSchemaKind.DFLT, o_cnt = o_cnt)
@staticmethod
def smpl(o_cnt: int, members: list[DHTSchemaSMPLMember]) -> Self:
Self(DHTSchemaKind.SMPL, o_cnt = o_cnt, members = members)
@staticmethod
def from_json(j: dict) -> Self:
if DHTSchemaKind(j['kind']) == DHTSchemaKind.DFLT:
return DHTSchema.dflt(j['o_cnt'])
if DHTSchemaKind(j['kind']) == DHTSchemaKind.SMPL:
return DHTSchema.smpl(
j['o_cnt'],
list(map(lambda x: DHTSchemaSMPLMember.from_json(x), j['members'])))
raise Exception("Unknown DHTSchema kind", j['kind'])
@classmethod
def dflt(cls, o_cnt: int) -> Self:
return cls(DHTSchemaKind.DFLT, o_cnt=o_cnt)
@classmethod
def smpl(cls, o_cnt: int, members: list[DHTSchemaSMPLMember]) -> Self:
return cls(DHTSchemaKind.SMPL, o_cnt=o_cnt, members=members)
@classmethod
def from_json(cls, j: dict) -> Self:
if DHTSchemaKind(j["kind"]) == DHTSchemaKind.DFLT:
return cls.dflt(j["o_cnt"])
if DHTSchemaKind(j["kind"]) == DHTSchemaKind.SMPL:
return cls.smpl(
j["o_cnt"],
[DHTSchemaSMPLMember.from_json(member) for member in j["members"]],
)
raise Exception("Unknown DHTSchema kind", j["kind"])
def to_json(self) -> dict:
return self.__dict__
class DHTRecordDescriptor:
key: TypedKey
owner: PublicKey
owner_secret: Optional[SecretKey]
schema: DHTSchema
def __init__(self, key: TypedKey, owner: PublicKey, owner_secret: Optional[SecretKey], schema: DHTSchema):
def __init__(
self,
key: TypedKey,
owner: PublicKey,
owner_secret: Optional[SecretKey],
schema: DHTSchema,
):
self.key = key
self.owner = owner
self.owner_secret = owner_secret
self.schema = schema
@staticmethod
def from_json(j: dict) -> Self:
DHTRecordDescriptor(
TypedKey(j['key']),
PublicKey(j['owner']),
None if j['owner_secret'] is None else SecretKey(j['owner_secret']),
DHTSchema.from_json(j['schema']))
@classmethod
def from_json(cls, j: dict) -> Self:
return cls(
TypedKey(j["key"]),
PublicKey(j["owner"]),
None if j["owner_secret"] is None else SecretKey(j["owner_secret"]),
DHTSchema.from_json(j["schema"]),
)
def to_json(self) -> dict:
return self.__dict__
class ValueData:
seq: ValueSeqNum
data: bytes
writer: PublicKey
def __init__(self, seq: ValueSeqNum, data: bytes, writer: PublicKey):
self.seq = seq
self.data = data
self.writer = writer
@staticmethod
def from_json(j: dict) -> Self:
DHTRecordDescriptor(
ValueSeqNum(j['seq']),
urlsafe_b64decode_no_pad(j['data']),
PublicKey(j['writer']))
@classmethod
def from_json(cls, j: dict) -> Self:
return cls(
ValueSeqNum(j["seq"]),
urlsafe_b64decode_no_pad(j["data"]),
PublicKey(j["writer"]),
)
def to_json(self) -> dict:
return self.__dict__