From 69087f2854f726557751c0238fc2e69e257c773e Mon Sep 17 00:00:00 2001 From: Teknique Date: Fri, 16 Jun 2023 23:56:47 -0700 Subject: [PATCH] Clean json_api types --- veilid-python/veilid/json_api.py | 1098 +++++++++++++++++++++--------- 1 file changed, 769 insertions(+), 329 deletions(-) diff --git a/veilid-python/veilid/json_api.py b/veilid-python/veilid/json_api.py index eb388fc3..9c706e01 100644 --- a/veilid-python/veilid/json_api.py +++ b/veilid-python/veilid/json_api.py @@ -1,20 +1,28 @@ -import json import asyncio -from jsonschema import validators, exceptions +import importlib.resources as importlib_resources +import json +from typing import Awaitable, Callable, Optional, Self -from typing import Callable, Awaitable, Mapping +from jsonschema import exceptions, validators -from .api import * -from .state import * -from .config import * -from .error import * -from .types import * -from .operations import * +from . import schema +from .api import (CryptoSystem, RoutingContext, TableDb, TableDbTransaction, + VeilidAPI, VeilidUpdate) +from .error import raise_api_result +from .operations import (CryptoSystemOperation, Operation, + RoutingContextOperation, TableDbOperation, + TableDbTransactionOperation) +from .state import VeilidState +from .types import (CryptoKey, CryptoKeyDistance, CryptoKind, + DHTRecordDescriptor, DHTSchema, HashDigest, KeyPair, + NewPrivateRouteResult, Nonce, OperationId, PublicKey, + RouteId, SecretKey, Sequencing, SharedSecret, Signature, + Stability, Timestamp, TypedKey, TypedKeyPair, + TypedSignature, ValueData, ValueSubkey, VeilidJSONEncoder, + VeilidVersion, urlsafe_b64decode_no_pad) ############################################################## -import importlib.resources as importlib_resources -from . import schema def _get_schema_validator(schema): cls = validators.validator_for(schema) @@ -22,17 +30,24 @@ def _get_schema_validator(schema): validator = cls(schema) return validator + def _schema_validate(validator, instance): error = exceptions.best_match(validator.iter_errors(instance)) if error is not None: raise error -_VALIDATOR_REQUEST = _get_schema_validator(json.loads((importlib_resources.files(schema) / 'Request.json').read_text())) -_VALIDATOR_RECV_MESSAGE = _get_schema_validator(json.loads((importlib_resources.files(schema) / 'RecvMessage.json').read_text())) + +_VALIDATOR_REQUEST = _get_schema_validator( + json.loads((importlib_resources.files(schema) / "Request.json").read_text()) +) +_VALIDATOR_RECV_MESSAGE = _get_schema_validator( + json.loads((importlib_resources.files(schema) / "RecvMessage.json").read_text()) +) ############################################################## + class _JsonVeilidAPI(VeilidAPI): reader: asyncio.StreamReader writer: asyncio.StreamWriter @@ -42,9 +57,15 @@ class _JsonVeilidAPI(VeilidAPI): # Shared Mutable State lock: asyncio.Lock next_id: int - in_flight_requests: Mapping[str, asyncio.Future] - - def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, update_callback: Callable[[VeilidUpdate], Awaitable], validate_schema: bool = True): + in_flight_requests: dict[int, asyncio.Future] + + def __init__( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + update_callback: Callable[[VeilidUpdate], Awaitable], + validate_schema: bool = True, + ): self.reader = reader self.writer = writer self.update_callback = update_callback @@ -68,7 +89,7 @@ class _JsonVeilidAPI(VeilidAPI): await self.writer.wait_closed() self.writer = None - for (reqid, reqfuture) in self.in_flight_requests.items(): + for reqid, reqfuture in self.in_flight_requests.items(): reqfuture.cancel() finally: @@ -81,7 +102,7 @@ class _JsonVeilidAPI(VeilidAPI): if self.handle_recv_messages_task is None: return handle_recv_messages_task = self.handle_recv_messages_task - self.handle_recv_messages_task = None + self.handle_recv_messages_task = None finally: self.lock.release() # Cancel it @@ -91,15 +112,19 @@ class _JsonVeilidAPI(VeilidAPI): except asyncio.CancelledError: pass - @staticmethod - async def connect(host: str, port: int, update_callback: Callable[[VeilidUpdate], Awaitable]) -> Self: + @classmethod + async def connect( + cls, host: str, port: int, update_callback: Callable[[VeilidUpdate], Awaitable] + ) -> Self: reader, writer = await asyncio.open_connection(host, port) - veilid_api = _JsonVeilidAPI(reader, writer, update_callback) - veilid_api.handle_recv_messages_task = asyncio.create_task(veilid_api.handle_recv_messages(), name = "JsonVeilidAPI.handle_recv_messages") + veilid_api = cls(reader, writer, update_callback) + veilid_api.handle_recv_messages_task = asyncio.create_task( + veilid_api.handle_recv_messages(), name="JsonVeilidAPI.handle_recv_messages" + ) return veilid_api - + async def handle_recv_message_response(self, j: dict): - id = j['id'] + id = j["id"] await self.lock.acquire() try: # Get and remove the in-flight request @@ -110,15 +135,14 @@ class _JsonVeilidAPI(VeilidAPI): if reqfuture is not None: reqfuture.set_result(j) - async def handle_recv_messages(self): # Read lines until we're done try: while True: linebytes = await self.reader.readline() - if not linebytes.endswith(b'\n'): + if not linebytes.endswith(b"\n"): break - + # Parse line as ndjson j = json.loads(linebytes.strip()) @@ -126,16 +150,16 @@ class _JsonVeilidAPI(VeilidAPI): _schema_validate(_VALIDATOR_RECV_MESSAGE, j) # Process the message - if j['type'] == "Response": + if j["type"] == "Response": await self.handle_recv_message_response(j) - elif j['type'] == "Update": + elif j["type"] == "Update": await self.update_callback(VeilidUpdate.from_json(j)) finally: await self._cleanup_close() - + async def allocate_request_future(self, id: int) -> asyncio.Future: reqfuture = asyncio.get_running_loop().create_future() - + await self.lock.acquire() try: self.in_flight_requests[id] = reqfuture @@ -143,23 +167,23 @@ class _JsonVeilidAPI(VeilidAPI): self.lock.release() return reqfuture - + async def cancel_request_future(self, id: int): await self.lock.acquire() try: reqfuture = self.in_flight_requests.pop(id, None) - reqfuture.cancel() + if reqfuture is not None: + reqfuture.cancel() finally: self.lock.release() def send_one_way_ndjson_request(self, op: Operation, **kwargs): - if self.writer is None: return - + # Make NDJSON string for request # Always use id 0 because no reply will be received for one-way requests - req = { "id": 0, "op": op } + req = {"id": 0, "op": op} for k, v in kwargs.items(): req[k] = v reqstr = VeilidJSONEncoder.dumps(req) + "\n" @@ -170,20 +194,24 @@ class _JsonVeilidAPI(VeilidAPI): # Send to socket without waitings self.writer.write(reqbytes) - - async def send_ndjson_request(self, op: Operation, validate: Optional[Callable[[dict, dict], None]] = None, **kwargs) -> dict: + async def send_ndjson_request( + self, + op: Operation, + validate: Optional[Callable[[dict, dict], None]] = None, + **kwargs + ) -> dict: # Get next id await self.lock.acquire() try: id = self.next_id self.next_id += 1 - writer = self.writer + writer = self.writer finally: self.lock.release() # Make NDJSON string for request - req = { "id": id, "op": op } + req = {"id": id, "op": op} for k, v in kwargs.items(): req[k] = v reqstr = VeilidJSONEncoder.dumps(req) + "\n" @@ -199,7 +227,7 @@ class _JsonVeilidAPI(VeilidAPI): try: writer.write(reqbytes) await writer.drain() - except: + except Exception: # Send failed, release future await self.cancel_request_future(id) raise @@ -216,193 +244,379 @@ class _JsonVeilidAPI(VeilidAPI): return response async def control(self, args: list[str]) -> str: - return raise_api_result(await self.send_ndjson_request(Operation.CONTROL, args = args)) + return raise_api_result( + await self.send_ndjson_request(Operation.CONTROL, args=args) + ) + async def get_state(self) -> VeilidState: - return VeilidState.from_json(raise_api_result(await self.send_ndjson_request(Operation.GET_STATE))) + return VeilidState.from_json( + raise_api_result(await self.send_ndjson_request(Operation.GET_STATE)) + ) + async def attach(self): raise_api_result(await self.send_ndjson_request(Operation.ATTACH)) + async def detach(self): raise_api_result(await self.send_ndjson_request(Operation.DETACH)) - async def new_private_route(self) -> Tuple[RouteId, bytes]: - return NewPrivateRouteResult.from_json(raise_api_result(await self.send_ndjson_request(Operation.NEW_PRIVATE_ROUTE))).to_tuple() - async def new_custom_private_route(self, kinds: list[CryptoKind], stability: Stability, sequencing: Sequencing) -> Tuple[RouteId, bytes]: - return NewPrivateRouteResult.from_json(raise_api_result( - await self.send_ndjson_request(Operation.NEW_CUSTOM_PRIVATE_ROUTE, - kinds = kinds, - stability = stability, - sequencing = sequencing) - )).to_tuple() + + async def new_private_route(self) -> tuple[RouteId, bytes]: + return NewPrivateRouteResult.from_json( + raise_api_result( + await self.send_ndjson_request(Operation.NEW_PRIVATE_ROUTE) + ) + ).to_tuple() + + async def new_custom_private_route( + self, kinds: list[CryptoKind], stability: Stability, sequencing: Sequencing + ) -> tuple[RouteId, bytes]: + return NewPrivateRouteResult.from_json( + raise_api_result( + await self.send_ndjson_request( + Operation.NEW_CUSTOM_PRIVATE_ROUTE, + kinds=kinds, + stability=stability, + sequencing=sequencing, + ) + ) + ).to_tuple() + async def import_remote_private_route(self, blob: bytes) -> RouteId: - return RouteId(raise_api_result( - await self.send_ndjson_request(Operation.IMPORT_REMOTE_PRIVATE_ROUTE, - blob = blob) - )) + return RouteId( + raise_api_result( + await self.send_ndjson_request( + Operation.IMPORT_REMOTE_PRIVATE_ROUTE, blob=blob + ) + ) + ) + async def release_private_route(self, route_id: RouteId): raise_api_result( - await self.send_ndjson_request(Operation.RELEASE_PRIVATE_ROUTE, - route_id = route_id) + await self.send_ndjson_request( + Operation.RELEASE_PRIVATE_ROUTE, route_id=route_id ) + ) + async def app_call_reply(self, call_id: OperationId, message: bytes): raise_api_result( - await self.send_ndjson_request(Operation.APP_CALL_REPLY, - call_id = call_id, - message = message) + await self.send_ndjson_request( + Operation.APP_CALL_REPLY, call_id=call_id, message=message ) + ) + async def new_routing_context(self) -> RoutingContext: - rc_id = raise_api_result(await self.send_ndjson_request(Operation.NEW_ROUTING_CONTEXT)) - return _JsonRoutingContext(self, rc_id) + rc_id = raise_api_result( + await self.send_ndjson_request(Operation.NEW_ROUTING_CONTEXT) + ) + return _JsonRoutingContext(self, rc_id) + async def open_table_db(self, name: str, column_count: int) -> TableDb: - db_id = raise_api_result(await self.send_ndjson_request(Operation.OPEN_TABLE_DB, - name = name, - column_count = column_count)) + db_id = raise_api_result( + await self.send_ndjson_request( + Operation.OPEN_TABLE_DB, name=name, column_count=column_count + ) + ) return _JsonTableDb(self, db_id) + async def delete_table_db(self, name: str): - return raise_api_result(await self.send_ndjson_request(Operation.DELETE_TABLE_DB, - name = name)) + return raise_api_result( + await self.send_ndjson_request(Operation.DELETE_TABLE_DB, name=name) + ) + async def get_crypto_system(self, kind: CryptoKind) -> CryptoSystem: - cs_id = raise_api_result(await self.send_ndjson_request(Operation.GET_CRYPTO_SYSTEM, - kind = kind)) + cs_id = raise_api_result( + await self.send_ndjson_request(Operation.GET_CRYPTO_SYSTEM, kind=kind) + ) return _JsonCryptoSystem(self, cs_id) + async def best_crypto_system(self) -> CryptoSystem: - cs_id = raise_api_result(await self.send_ndjson_request(Operation.BEST_CRYPTO_SYSTEM)) + cs_id = raise_api_result( + await self.send_ndjson_request(Operation.BEST_CRYPTO_SYSTEM) + ) return _JsonCryptoSystem(self, cs_id) - async def verify_signatures(self, node_ids: list[TypedKey], data: bytes, signatures: list[TypedSignature]) -> list[TypedKey]: - return list(map(lambda x: TypedKey(x), raise_api_result(await self.send_ndjson_request(Operation.VERIFY_SIGNATURES, - node_ids = node_ids, - data = data, - signatures = signatures)))) - async def generate_signatures(self, data: bytes, key_pairs: list[TypedKeyPair]) -> list[TypedSignature]: - return list(map(lambda x: TypedSignature(x), raise_api_result(await self.send_ndjson_request(Operation.GENERATE_SIGNATURES, - data = data, - key_pairs = key_pairs)))) + + async def verify_signatures( + self, node_ids: list[TypedKey], data: bytes, signatures: list[TypedSignature] + ) -> list[TypedKey]: + return list( + map( + lambda x: TypedKey(x), + raise_api_result( + await self.send_ndjson_request( + Operation.VERIFY_SIGNATURES, + node_ids=node_ids, + data=data, + signatures=signatures, + ) + ), + ) + ) + + async def generate_signatures( + self, data: bytes, key_pairs: list[TypedKeyPair] + ) -> list[TypedSignature]: + return list( + map( + lambda x: TypedSignature(x), + raise_api_result( + await self.send_ndjson_request( + Operation.GENERATE_SIGNATURES, data=data, key_pairs=key_pairs + ) + ), + ) + ) + async def generate_key_pair(self, kind: CryptoKind) -> list[TypedKeyPair]: - return list(map(lambda x: TypedKeyPair(x), raise_api_result(await self.send_ndjson_request(Operation.GENERATE_KEY_PAIR, - kind = kind)))) + return list( + map( + lambda x: TypedKeyPair(x), + raise_api_result( + await self.send_ndjson_request( + Operation.GENERATE_KEY_PAIR, kind=kind + ) + ), + ) + ) + async def now(self) -> Timestamp: - return Timestamp(raise_api_result(await self.send_ndjson_request(Operation.NOW))) + return Timestamp( + raise_api_result(await self.send_ndjson_request(Operation.NOW)) + ) + async def debug(self, command: str) -> str: - return raise_api_result(await self.send_ndjson_request(Operation.DEBUG, - command = command - )) + return raise_api_result( + await self.send_ndjson_request(Operation.DEBUG, command=command) + ) + async def veilid_version_string(self) -> str: - return raise_api_result(await self.send_ndjson_request(Operation.VEILID_VERSION_STRING)) + return raise_api_result( + await self.send_ndjson_request(Operation.VEILID_VERSION_STRING) + ) + async def veilid_version(self) -> VeilidVersion: v = await self.send_ndjson_request(Operation.VEILID_VERSION) - return VeilidVersion(v['major'], v['minor'], v['patch']) + return VeilidVersion(v["major"], v["minor"], v["patch"]) + ###################################################### + def validate_rc_op(request: dict, response: dict): - if response['rc_op'] != request['rc_op']: + if response["rc_op"] != request["rc_op"]: raise ValueError("Response rc_op does not match request rc_op") + class _JsonRoutingContext(RoutingContext): - api: _JsonVeilidAPI + api: _JsonVeilidAPI rc_id: int - + def __init__(self, api: _JsonVeilidAPI, rc_id: int): self.api = api self.rc_id = rc_id def __del__(self): - self.api.send_one_way_ndjson_request(Operation.ROUTING_CONTEXT, - rc_id = self.rc_id, - rc_op = RoutingContextOperation.RELEASE) + self.api.send_one_way_ndjson_request( + Operation.ROUTING_CONTEXT, + rc_id=self.rc_id, + rc_op=RoutingContextOperation.RELEASE, + ) async def with_privacy(self) -> Self: - new_rc_id = raise_api_result(await self.api.send_ndjson_request(Operation.ROUTING_CONTEXT, validate=validate_rc_op, - rc_id = self.rc_id, - rc_op = RoutingContextOperation.WITH_PRIVACY)) - return _JsonRoutingContext(self.api, new_rc_id) - + new_rc_id = raise_api_result( + await self.api.send_ndjson_request( + Operation.ROUTING_CONTEXT, + validate=validate_rc_op, + rc_id=self.rc_id, + rc_op=RoutingContextOperation.WITH_PRIVACY, + ) + ) + return self.__class__(self.api, new_rc_id) + async def with_custom_privacy(self, stability: Stability) -> Self: - new_rc_id = raise_api_result(await self.api.send_ndjson_request(Operation.ROUTING_CONTEXT, validate=validate_rc_op, - rc_id = self.rc_id, - rc_op = RoutingContextOperation.WITH_CUSTOM_PRIVACY, - stability = stability)) - return _JsonRoutingContext(self.api, new_rc_id) + new_rc_id = raise_api_result( + await self.api.send_ndjson_request( + Operation.ROUTING_CONTEXT, + validate=validate_rc_op, + rc_id=self.rc_id, + rc_op=RoutingContextOperation.WITH_CUSTOM_PRIVACY, + stability=stability, + ) + ) + return self.__class__(self.api, new_rc_id) + async def with_sequencing(self, sequencing: Sequencing) -> Self: - new_rc_id = raise_api_result(await self.api.send_ndjson_request(Operation.ROUTING_CONTEXT, validate=validate_rc_op, - rc_id = self.rc_id, - rc_op = RoutingContextOperation.WITH_SEQUENCING, - sequencing = sequencing)) - return _JsonRoutingContext(self.api, new_rc_id) + new_rc_id = raise_api_result( + await self.api.send_ndjson_request( + Operation.ROUTING_CONTEXT, + validate=validate_rc_op, + rc_id=self.rc_id, + rc_op=RoutingContextOperation.WITH_SEQUENCING, + sequencing=sequencing, + ) + ) + return self.__class__(self.api, new_rc_id) + async def app_call(self, target: TypedKey | RouteId, request: bytes) -> bytes: - return urlsafe_b64decode_no_pad(raise_api_result(await self.api.send_ndjson_request(Operation.ROUTING_CONTEXT, validate=validate_rc_op, - rc_id = self.rc_id, - rc_op = RoutingContextOperation.APP_CALL, - target = target, - request = request))) + return urlsafe_b64decode_no_pad( + raise_api_result( + await self.api.send_ndjson_request( + Operation.ROUTING_CONTEXT, + validate=validate_rc_op, + rc_id=self.rc_id, + rc_op=RoutingContextOperation.APP_CALL, + target=target, + request=request, + ) + ) + ) + async def app_message(self, target: TypedKey | RouteId, message: bytes): - raise_api_result(await self.api.send_ndjson_request(Operation.ROUTING_CONTEXT, validate=validate_rc_op, - rc_id = self.rc_id, - rc_op = RoutingContextOperation.APP_MESSAGE, - target = target, - message = message)) - async def create_dht_record(self, kind: CryptoKind, schema: DHTSchema) -> DHTRecordDescriptor: - return DHTRecordDescriptor.from_json(raise_api_result(await self.api.send_ndjson_request(Operation.ROUTING_CONTEXT, validate=validate_rc_op, - rc_id = self.rc_id, - rc_op = RoutingContextOperation.CREATE_DHT_RECORD, - kind = kind, - schema = schema))) - async def open_dht_record(self, key: TypedKey, writer: Optional[KeyPair]) -> DHTRecordDescriptor: - return DHTRecordDescriptor.from_json(raise_api_result(await self.api.send_ndjson_request(Operation.ROUTING_CONTEXT, validate=validate_rc_op, - rc_id = self.rc_id, - rc_op = RoutingContextOperation.OPEN_DHT_RECORD, - key = key, - writer = writer))) + raise_api_result( + await self.api.send_ndjson_request( + Operation.ROUTING_CONTEXT, + validate=validate_rc_op, + rc_id=self.rc_id, + rc_op=RoutingContextOperation.APP_MESSAGE, + target=target, + message=message, + ) + ) + + async def create_dht_record( + self, kind: CryptoKind, schema: DHTSchema + ) -> DHTRecordDescriptor: + return DHTRecordDescriptor.from_json( + raise_api_result( + await self.api.send_ndjson_request( + Operation.ROUTING_CONTEXT, + validate=validate_rc_op, + rc_id=self.rc_id, + rc_op=RoutingContextOperation.CREATE_DHT_RECORD, + kind=kind, + schema=schema, + ) + ) + ) + + async def open_dht_record( + self, key: TypedKey, writer: Optional[KeyPair] + ) -> DHTRecordDescriptor: + return DHTRecordDescriptor.from_json( + raise_api_result( + await self.api.send_ndjson_request( + Operation.ROUTING_CONTEXT, + validate=validate_rc_op, + rc_id=self.rc_id, + rc_op=RoutingContextOperation.OPEN_DHT_RECORD, + key=key, + writer=writer, + ) + ) + ) + async def close_dht_record(self, key: TypedKey): - raise_api_result(await self.api.send_ndjson_request(Operation.ROUTING_CONTEXT, validate=validate_rc_op, - rc_id = self.rc_id, - rc_op = RoutingContextOperation.CLOSE_DHT_RECORD, - key = key)) + raise_api_result( + await self.api.send_ndjson_request( + Operation.ROUTING_CONTEXT, + validate=validate_rc_op, + rc_id=self.rc_id, + rc_op=RoutingContextOperation.CLOSE_DHT_RECORD, + key=key, + ) + ) + async def delete_dht_record(self, key: TypedKey): - raise_api_result(await self.api.send_ndjson_request(Operation.ROUTING_CONTEXT, validate=validate_rc_op, - rc_id = self.rc_id, - rc_op = RoutingContextOperation.DELETE_DHT_RECORD, - key = key)) - async def get_dht_value(self, key: TypedKey, subkey: ValueSubkey, force_refresh: bool) -> Optional[ValueData]: - ret = raise_api_result(await self.api.send_ndjson_request(Operation.ROUTING_CONTEXT, validate=validate_rc_op, - rc_id = self.rc_id, - rc_op = RoutingContextOperation.GET_DHT_VALUE, - key = key, - subkey = subkey, - force_refresh = force_refresh)) + raise_api_result( + await self.api.send_ndjson_request( + Operation.ROUTING_CONTEXT, + validate=validate_rc_op, + rc_id=self.rc_id, + rc_op=RoutingContextOperation.DELETE_DHT_RECORD, + key=key, + ) + ) + + async def get_dht_value( + self, key: TypedKey, subkey: ValueSubkey, force_refresh: bool + ) -> Optional[ValueData]: + ret = raise_api_result( + await self.api.send_ndjson_request( + Operation.ROUTING_CONTEXT, + validate=validate_rc_op, + rc_id=self.rc_id, + rc_op=RoutingContextOperation.GET_DHT_VALUE, + key=key, + subkey=subkey, + force_refresh=force_refresh, + ) + ) return None if ret is None else ValueData.from_json(ret) - async def set_dht_value(self, key: TypedKey, subkey: ValueSubkey, data: bytes) -> Optional[ValueData]: - ret = raise_api_result(await self.api.send_ndjson_request(Operation.ROUTING_CONTEXT, validate=validate_rc_op, - rc_id = self.rc_id, - rc_op = RoutingContextOperation.SET_DHT_VALUE, - key = key, - subkey = subkey, - data = data)) + + async def set_dht_value( + self, key: TypedKey, subkey: ValueSubkey, data: bytes + ) -> Optional[ValueData]: + ret = raise_api_result( + await self.api.send_ndjson_request( + Operation.ROUTING_CONTEXT, + validate=validate_rc_op, + rc_id=self.rc_id, + rc_op=RoutingContextOperation.SET_DHT_VALUE, + key=key, + subkey=subkey, + data=data, + ) + ) return None if ret is None else ValueData.from_json(ret) - async def watch_dht_values(self, key: TypedKey, subkeys: list[(ValueSubkey, ValueSubkey)], expiration: Timestamp, count: int) -> Timestamp: - return Timestamp(raise_api_result(await self.api.send_ndjson_request(Operation.ROUTING_CONTEXT, validate=validate_rc_op, - rc_id = self.rc_id, - rc_op = RoutingContextOperation.WATCH_DHT_VALUES, - key = key, - subkeys = subkeys, - expiration = expiration, - count = count))) - async def cancel_dht_watch(self, key: TypedKey, subkeys: list[(ValueSubkey, ValueSubkey)]) -> bool: - return raise_api_result(await self.api.send_ndjson_request(Operation.ROUTING_CONTEXT, validate=validate_rc_op, - rc_id = self.rc_id, - rc_op = RoutingContextOperation.CANCEL_DHT_WATCH, - key = key, - subkeys = subkeys)) - + + async def watch_dht_values( + self, + key: TypedKey, + subkeys: list[tuple[ValueSubkey, ValueSubkey]], + expiration: Timestamp, + count: int, + ) -> Timestamp: + return Timestamp( + raise_api_result( + await self.api.send_ndjson_request( + Operation.ROUTING_CONTEXT, + validate=validate_rc_op, + rc_id=self.rc_id, + rc_op=RoutingContextOperation.WATCH_DHT_VALUES, + key=key, + subkeys=subkeys, + expiration=expiration, + count=count, + ) + ) + ) + + async def cancel_dht_watch( + self, key: TypedKey, subkeys: list[tuple[ValueSubkey, ValueSubkey]] + ) -> bool: + return raise_api_result( + await self.api.send_ndjson_request( + Operation.ROUTING_CONTEXT, + validate=validate_rc_op, + rc_id=self.rc_id, + rc_op=RoutingContextOperation.CANCEL_DHT_WATCH, + key=key, + subkeys=subkeys, + ) + ) + + ###################################################### + def validate_tx_op(request: dict, response: dict): - if response['tx_op'] != request['tx_op']: + if response["tx_op"] != request["tx_op"]: raise ValueError("Response tx_op does not match request tx_op") + class _JsonTableDbTransaction(TableDbTransaction): - api: _JsonVeilidAPI + api: _JsonVeilidAPI tx_id: int done: bool - + def __init__(self, api: _JsonVeilidAPI, tx_id: int): self.api = api self.tx_id = tx_id @@ -411,226 +625,452 @@ class _JsonTableDbTransaction(TableDbTransaction): def __del__(self): if not self.done: raise AssertionError("Should have committed or rolled back transaction") - + async def commit(self): - raise_api_result(await self.api.send_ndjson_request(Operation.TABLE_DB_TRANSACTION, validate=validate_tx_op, - tx_id = self.tx_id, - tx_op = TableDbTransactionOperation.COMMIT)) + raise_api_result( + await self.api.send_ndjson_request( + Operation.TABLE_DB_TRANSACTION, + validate=validate_tx_op, + tx_id=self.tx_id, + tx_op=TableDbTransactionOperation.COMMIT, + ) + ) self.done = True + async def rollback(self): - raise_api_result(await self.api.send_ndjson_request(Operation.TABLE_DB_TRANSACTION, validate=validate_tx_op, - tx_id = self.tx_id, - tx_op = TableDbTransactionOperation.ROLLBACK)) + raise_api_result( + await self.api.send_ndjson_request( + Operation.TABLE_DB_TRANSACTION, + validate=validate_tx_op, + tx_id=self.tx_id, + tx_op=TableDbTransactionOperation.ROLLBACK, + ) + ) self.done = True + async def store(self, col: int, key: bytes, value: bytes): - await self.api.send_ndjson_request(Operation.TABLE_DB_TRANSACTION, validate=validate_tx_op, - tx_id = self.tx_id, - tx_op = TableDbTransactionOperation.STORE, - col = col, - key = key, - value = value) + await self.api.send_ndjson_request( + Operation.TABLE_DB_TRANSACTION, + validate=validate_tx_op, + tx_id=self.tx_id, + tx_op=TableDbTransactionOperation.STORE, + col=col, + key=key, + value=value, + ) + async def delete(self, col: int, key: bytes): - await self.api.send_ndjson_request(Operation.TABLE_DB_TRANSACTION, validate=validate_tx_op, - tx_id = self.tx_id, - tx_op = TableDbTransactionOperation.DELETE, - col = col, - key = key) + await self.api.send_ndjson_request( + Operation.TABLE_DB_TRANSACTION, + validate=validate_tx_op, + tx_id=self.tx_id, + tx_op=TableDbTransactionOperation.DELETE, + col=col, + key=key, + ) + ###################################################### + def validate_db_op(request: dict, response: dict): - if response['db_op'] != request['db_op']: + if response["db_op"] != request["db_op"]: raise ValueError("Response db_op does not match request db_op") + class _JsonTableDb(TableDb): - api: _JsonVeilidAPI + api: _JsonVeilidAPI db_id: int - + def __init__(self, api: _JsonVeilidAPI, db_id: int): self.api = api self.db_id = db_id def __del__(self): - self.api.send_one_way_ndjson_request(Operation.TABLE_DB, - db_id = self.db_id, - rc_op = TableDbOperation.RELEASE) - + self.api.send_one_way_ndjson_request( + Operation.TABLE_DB, db_id=self.db_id, rc_op=TableDbOperation.RELEASE + ) + async def get_column_count(self) -> int: - return raise_api_result(await self.api.send_ndjson_request(Operation.TABLE_DB, validate=validate_db_op, - db_id = self.db_id, - db_op = TableDbOperation.GET_COLUMN_COUNT)) + return raise_api_result( + await self.api.send_ndjson_request( + Operation.TABLE_DB, + validate=validate_db_op, + db_id=self.db_id, + db_op=TableDbOperation.GET_COLUMN_COUNT, + ) + ) + async def get_keys(self, col: int) -> list[bytes]: - return list(map(lambda x: urlsafe_b64decode_no_pad(x), raise_api_result(await self.api.send_ndjson_request(Operation.TABLE_DB, validate=validate_db_op, - db_id = self.db_id, - db_op = TableDbOperation.GET_KEYS, - col = col)))) + return list( + map( + lambda x: urlsafe_b64decode_no_pad(x), + raise_api_result( + await self.api.send_ndjson_request( + Operation.TABLE_DB, + validate=validate_db_op, + db_id=self.db_id, + db_op=TableDbOperation.GET_KEYS, + col=col, + ) + ), + ) + ) + async def transact(self) -> TableDbTransaction: - tx_id = raise_api_result(await self.api.send_ndjson_request(Operation.TABLE_DB, validate=validate_db_op, - db_id = self.db_id, - db_op = TableDbOperation.TRANSACT)) + tx_id = raise_api_result( + await self.api.send_ndjson_request( + Operation.TABLE_DB, + validate=validate_db_op, + db_id=self.db_id, + db_op=TableDbOperation.TRANSACT, + ) + ) return _JsonTableDbTransaction(self.api, tx_id) + async def store(self, col: int, key: bytes, value: bytes): - return raise_api_result(await self.api.send_ndjson_request(Operation.TABLE_DB, validate=validate_db_op, - db_id = self.db_id, - db_op = TableDbOperation.STORE, - col = col, - key = key, - value = value)) + return raise_api_result( + await self.api.send_ndjson_request( + Operation.TABLE_DB, + validate=validate_db_op, + db_id=self.db_id, + db_op=TableDbOperation.STORE, + col=col, + key=key, + value=value, + ) + ) + async def load(self, col: int, key: bytes) -> Optional[bytes]: - res = raise_api_result(await self.api.send_ndjson_request(Operation.TABLE_DB, validate=validate_db_op, - db_id = self.db_id, - db_op = TableDbOperation.LOAD, - col = col, - key = key)) + res = raise_api_result( + await self.api.send_ndjson_request( + Operation.TABLE_DB, + validate=validate_db_op, + db_id=self.db_id, + db_op=TableDbOperation.LOAD, + col=col, + key=key, + ) + ) return None if res is None else urlsafe_b64decode_no_pad(res) + async def delete(self, col: int, key: bytes) -> Optional[bytes]: - res = raise_api_result(await self.api.send_ndjson_request(Operation.TABLE_DB, validate=validate_db_op, - db_id = self.db_id, - db_op = TableDbOperation.DELETE, - col = col, - key = key)) + res = raise_api_result( + await self.api.send_ndjson_request( + Operation.TABLE_DB, + validate=validate_db_op, + db_id=self.db_id, + db_op=TableDbOperation.DELETE, + col=col, + key=key, + ) + ) return None if res is None else urlsafe_b64decode_no_pad(res) + ###################################################### - def validate_cs_op(request: dict, response: dict): - if response['cs_op'] != request['cs_op']: + if response["cs_op"] != request["cs_op"]: raise ValueError("Response cs_op does not match request cs_op") + class _JsonCryptoSystem(CryptoSystem): - api: _JsonVeilidAPI + api: _JsonVeilidAPI cs_id: int - + def __init__(self, api: _JsonVeilidAPI, cs_id: int): self.api = api self.cs_id = cs_id def __del__(self): - self.api.send_one_way_ndjson_request(Operation.CRYPTO_SYSTEM, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.RELEASE) + self.api.send_one_way_ndjson_request( + Operation.CRYPTO_SYSTEM, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.RELEASE, + ) async def cached_dh(self, key: PublicKey, secret: SecretKey) -> SharedSecret: - return SharedSecret(raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.CACHED_DH, - key = key, - secret = secret))) + return SharedSecret( + raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.CACHED_DH, + key=key, + secret=secret, + ) + ) + ) + async def compute_dh(self, key: PublicKey, secret: SecretKey) -> SharedSecret: - return SharedSecret(raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.COMPUTE_DH, - key = key, - secret = secret))) + return SharedSecret( + raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.COMPUTE_DH, + key=key, + secret=secret, + ) + ) + ) + async def random_bytes(self, len: int) -> bytes: - return urlsafe_b64decode_no_pad(raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.RANDOM_BYTES, - len = len))) + return urlsafe_b64decode_no_pad( + raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.RANDOM_BYTES, + len=len, + ) + ) + ) + async def default_salt_length(self) -> int: - return raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.DEFAULT_SALT_LENGTH)) + return raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.DEFAULT_SALT_LENGTH, + ) + ) + async def hash_password(self, password: bytes, salt: bytes) -> str: - return raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.HASH_PASSWORD, - password = password, - salt = salt)) + return raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.HASH_PASSWORD, + password=password, + salt=salt, + ) + ) + async def verify_password(self, password: bytes, password_hash: str) -> bool: - return raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.VERIFY_PASSWORD, - password = password, - password_hash = password_hash)) + return raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.VERIFY_PASSWORD, + password=password, + password_hash=password_hash, + ) + ) + async def derive_shared_secret(self, password: bytes, salt: bytes) -> SharedSecret: - return SharedSecret(raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.DERIVE_SHARED_SECRET, - password = password, - salt = salt))) + return SharedSecret( + raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.DERIVE_SHARED_SECRET, + password=password, + salt=salt, + ) + ) + ) + async def random_nonce(self) -> Nonce: - return Nonce(raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.RANDOM_NONCE))) + return Nonce( + raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.RANDOM_NONCE, + ) + ) + ) + async def random_shared_secret(self) -> SharedSecret: - return SharedSecret(raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.RANDOM_SHARED_SECRET))) + return SharedSecret( + raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.RANDOM_SHARED_SECRET, + ) + ) + ) + async def generate_key_pair(self) -> KeyPair: - return KeyPair(raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.GENERATE_KEY_PAIR))) + return KeyPair( + raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.GENERATE_KEY_PAIR, + ) + ) + ) + async def generate_hash(self, data: bytes) -> HashDigest: - return HashDigest(raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.GENERATE_HASH, - data = data))) + return HashDigest( + raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.GENERATE_HASH, + data=data, + ) + ) + ) + async def validate_key_pair(self, key: PublicKey, secret: SecretKey) -> bool: - return raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.VALIDATE_KEY_PAIR, - key = key, - secret = secret)) + return raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.VALIDATE_KEY_PAIR, + key=key, + secret=secret, + ) + ) + async def validate_hash(self, data: bytes, hash_digest: HashDigest) -> bool: - return raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.VALIDATE_HASH, - data = data, - hash_digest = hash_digest)) + return raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.VALIDATE_HASH, + data=data, + hash_digest=hash_digest, + ) + ) + async def distance(self, key1: CryptoKey, key2: CryptoKey) -> CryptoKeyDistance: - return CryptoKeyDistance(raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.DISTANCE, - key1 = key1, - key2 = key2))) + return CryptoKeyDistance( + raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.DISTANCE, + key1=key1, + key2=key2, + ) + ) + ) + async def sign(self, key: PublicKey, secret: SecretKey, data: bytes) -> Signature: - return Signature(raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.SIGN, - key = key, - secret = secret, - data = data))) + return Signature( + raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.SIGN, + key=key, + secret=secret, + data=data, + ) + ) + ) + async def verify(self, key: PublicKey, data: bytes, signature: Signature): - raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.VERIFY, - key = key, - data = data, - signature = signature)) + raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.VERIFY, + key=key, + data=data, + signature=signature, + ) + ) + async def aead_overhead(self) -> int: - return raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.AEAD_OVERHEAD)) - async def decrypt_aead(self, body: bytes, nonce: Nonce, shared_secret: SharedSecret, associated_data: Optional[bytes]) -> bytes: - return urlsafe_b64decode_no_pad(raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.DECRYPT_AEAD, - body = body, - nonce = nonce, - shared_secret = shared_secret, - associated_data = associated_data))) - async def encrypt_aead(self, body: bytes, nonce: Nonce, shared_secret: SharedSecret, associated_data: Optional[bytes]) -> bytes: - return urlsafe_b64decode_no_pad(raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.ENCRYPT_AEAD, - body = body, - nonce = nonce, - shared_secret = shared_secret, - associated_data = associated_data))) - async def crypt_no_auth(self, body: bytes, nonce: Nonce, shared_secret: SharedSecret) -> bytes: - return urlsafe_b64decode_no_pad(raise_api_result(await self.api.send_ndjson_request(Operation.CRYPTO_SYSTEM, validate=validate_cs_op, - cs_id = self.cs_id, - cs_op = CryptoSystemOperation.CRYPT_NO_AUTH, - body = body, - nonce = nonce, - shared_secret = shared_secret))) + return raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.AEAD_OVERHEAD, + ) + ) + + async def decrypt_aead( + self, + body: bytes, + nonce: Nonce, + shared_secret: SharedSecret, + associated_data: Optional[bytes], + ) -> bytes: + return urlsafe_b64decode_no_pad( + raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.DECRYPT_AEAD, + body=body, + nonce=nonce, + shared_secret=shared_secret, + associated_data=associated_data, + ) + ) + ) + + async def encrypt_aead( + self, + body: bytes, + nonce: Nonce, + shared_secret: SharedSecret, + associated_data: Optional[bytes], + ) -> bytes: + return urlsafe_b64decode_no_pad( + raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.ENCRYPT_AEAD, + body=body, + nonce=nonce, + shared_secret=shared_secret, + associated_data=associated_data, + ) + ) + ) + + async def crypt_no_auth( + self, body: bytes, nonce: Nonce, shared_secret: SharedSecret + ) -> bytes: + return urlsafe_b64decode_no_pad( + raise_api_result( + await self.api.send_ndjson_request( + Operation.CRYPTO_SYSTEM, + validate=validate_cs_op, + cs_id=self.cs_id, + cs_op=CryptoSystemOperation.CRYPT_NO_AUTH, + body=body, + nonce=nonce, + shared_secret=shared_secret, + ) + ) + ) ###################################################### -async def json_api_connect(host:str, port:int, update_callback: Callable[[VeilidUpdate], Awaitable]) -> VeilidAPI: - return await _JsonVeilidAPI.connect(host, port, update_callback) +async def json_api_connect( + host: str, port: int, update_callback: Callable[[VeilidUpdate], Awaitable] +) -> VeilidAPI: + return await _JsonVeilidAPI.connect(host, port, update_callback)