From 67eeb87c280aa6a39cc196c371dc3d4c94c70f70 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Tue, 11 Mar 2025 09:31:15 -0400 Subject: [PATCH] validate types for python api calls add more stress tests to python fix deadlock in veilid_api duration testing correct offline_subkey_writes inflight reporting --- veilid-core/src/logging/api_tracing_layer.rs | 28 +- veilid-core/src/storage_manager/mod.rs | 105 ++++++-- veilid-python/tests/test_dht.py | 257 ++++++++++++++++--- veilid-python/veilid/json_api.py | 165 ++++++++++++ veilid-python/veilid/types.py | 34 ++- 5 files changed, 499 insertions(+), 90 deletions(-) diff --git a/veilid-core/src/logging/api_tracing_layer.rs b/veilid-core/src/logging/api_tracing_layer.rs index 812e8d08..c2498a64 100644 --- a/veilid-core/src/logging/api_tracing_layer.rs +++ b/veilid-core/src/logging/api_tracing_layer.rs @@ -193,16 +193,13 @@ impl registry::LookupSpan<'a>> Layer for ApiTracingLa attrs.record(&mut new_debug_record); if let Some(span_ref) = ctx.span(id) { - span_ref - .extensions_mut() - .insert::(new_debug_record); + let mut extensions_mut = span_ref.extensions_mut(); + extensions_mut.insert::(new_debug_record); if crate::DURATION_LOG_FACILITIES.contains(&attrs.metadata().target()) { - span_ref - .extensions_mut() - .insert::(SpanDuration { - start: Timestamp::now(), - end: Timestamp::default(), - }); + extensions_mut.insert::(SpanDuration { + start: Timestamp::now(), + end: Timestamp::default(), + }); } } } @@ -213,14 +210,14 @@ impl registry::LookupSpan<'a>> Layer for ApiTracingLa return; } if let Some(span_ref) = ctx.span(&id) { - if let Some(span_duration) = span_ref.extensions_mut().get_mut::() { + let mut extensions_mut = span_ref.extensions_mut(); + if let Some(span_duration) = extensions_mut.get_mut::() { span_duration.end = Timestamp::now(); let duration = span_duration.end.saturating_sub(span_duration.start); let meta = span_ref.metadata(); - let mut extensions = span_ref.extensions_mut(); let log_key = - if let Some(span_ksr) = extensions.get_mut::() { + if let Some(span_ksr) = extensions_mut.get_mut::() { span_ksr.log_key() } else { "" @@ -254,10 +251,9 @@ impl registry::LookupSpan<'a>> Layer for ApiTracingLa return; } if let Some(span_ref) = ctx.span(id) { - if let Some(debug_record) = span_ref - .extensions_mut() - .get_mut::() - { + let mut extensions_mut = span_ref.extensions_mut(); + + if let Some(debug_record) = extensions_mut.get_mut::() { values.record(debug_record); } } diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 722c41b9..4502bb1b 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -59,6 +59,8 @@ struct StorageManagerInner { pub remote_record_store: Option>, /// Record subkeys that have not been pushed to the network because they were written to offline pub offline_subkey_writes: HashMap, + /// Record subkeys that are currently being written to in the foreground + pub active_subkey_writes: HashMap, /// Storage manager metadata that is persistent, including copy of offline subkey writes pub metadata_db: Option, /// Background processing task (not part of attachment manager tick tree so it happens when detached too) @@ -73,6 +75,7 @@ impl fmt::Debug for StorageManagerInner { .field("local_record_store", &self.local_record_store) .field("remote_record_store", &self.remote_record_store) .field("offline_subkey_writes", &self.offline_subkey_writes) + .field("active_subkey_writes", &self.active_subkey_writes) //.field("metadata_db", &self.metadata_db) //.field("tick_future", &self.tick_future) .finish() @@ -736,7 +739,21 @@ impl StorageManager { ) .await?; - if !self.dht_is_online() { + // Note that we are writing this subkey actively + // If it appears we are already doing this, then put it to the offline queue + let already_writing = { + let asw = inner.active_subkey_writes.entry(key).or_default(); + if asw.contains(subkey) { + veilid_log!(self debug "Already writing to this subkey: {}:{}", key, subkey); + true + } else { + // Add to our list of active subkey writes + asw.insert(subkey); + false + } + }; + + if already_writing || !self.dht_is_online() { veilid_log!(self debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() ); // Add to offline writes to flush Self::add_offline_subkey_write_inner(&mut inner, key, subkey, safety_selection); @@ -764,41 +781,68 @@ impl StorageManager { // Failed to write, try again later let mut inner = self.inner.lock().await; Self::add_offline_subkey_write_inner(&mut inner, key, subkey, safety_selection); + + // Remove from active subkey writes + let asw = inner.active_subkey_writes.get_mut(&key).unwrap(); + if !asw.remove(subkey) { + panic!("missing active subkey write: {}:{}", key, subkey); + } + if asw.is_empty() { + inner.active_subkey_writes.remove(&key); + } return Err(e); } }; - // Wait for the first result - let Ok(result) = res_rx.recv_async().await else { - apibail_internal!("failed to receive results"); + let process = || async { + // Wait for the first result + let Ok(result) = res_rx.recv_async().await else { + apibail_internal!("failed to receive results"); + }; + let result = result?; + let partial = result.fanout_result.kind.is_partial(); + + // Process the returned result + let out = self + .process_outbound_set_value_result( + key, + subkey, + signed_value_data.value_data().clone(), + safety_selection, + result, + ) + .await?; + + // If there's more to process, do it in the background + if partial { + self.process_deferred_outbound_set_value_result( + res_rx, + key, + subkey, + out.clone() + .unwrap_or_else(|| signed_value_data.value_data().clone()), + safety_selection, + ); + } + + Ok(out) }; - let result = result?; - let partial = result.fanout_result.kind.is_partial(); - // Process the returned result - let out = self - .process_outbound_set_value_result( - key, - subkey, - signed_value_data.value_data().clone(), - safety_selection, - result, - ) - .await?; + let out = process().await; - // If there's more to process, do it in the background - if partial { - self.process_deferred_outbound_set_value_result( - res_rx, - key, - subkey, - out.clone() - .unwrap_or_else(|| signed_value_data.value_data().clone()), - safety_selection, - ); + // Remove active subkey write + let mut inner = self.inner.lock().await; + + // Remove from active subkey writes + let asw = inner.active_subkey_writes.get_mut(&key).unwrap(); + if !asw.remove(subkey) { + panic!("missing active subkey write: {}:{}", key, subkey); + } + if asw.is_empty() { + inner.active_subkey_writes.remove(&key); } - Ok(out) + out } /// Create,update or cancel an outbound watch to a DHT value @@ -1019,11 +1063,18 @@ impl StorageManager { ); // Get the offline subkeys for this record still only returning the ones we're inspecting + // Merge in the currently offline in-flight records and the actively written records as well + let active_subkey_writes = inner + .active_subkey_writes + .get(&key) + .cloned() + .unwrap_or_default(); let offline_subkey_writes = inner .offline_subkey_writes .get(&key) .map(|o| o.subkeys.union(&o.subkeys_in_flight)) .unwrap_or_default() + .union(&active_subkey_writes) .intersect(&subkeys); // If this is the maximum scope we're interested in, return the report diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py index ce02662b..7cd46c62 100644 --- a/veilid-python/tests/test_dht.py +++ b/veilid-python/tests/test_dht.py @@ -1,6 +1,6 @@ # Routing context veilid tests -from typing import Awaitable, Callable +from typing import Any, Awaitable, Callable, Optional import pytest import asyncio import time @@ -374,13 +374,13 @@ async def test_inspect_dht_record(api_connection: veilid.VeilidAPI): rr = await rc.inspect_dht_record(rec.key, [], veilid.DHTReportScope.LOCAL) print("rr: {}", rr.__dict__) - assert rr.subkeys == [[0,1]] + assert rr.subkeys == [(0,1)] assert rr.local_seqs == [0, 0xFFFFFFFF] assert rr.network_seqs == [] rr2 = await rc.inspect_dht_record(rec.key, [], veilid.DHTReportScope.SYNC_GET) print("rr2: {}", rr2.__dict__) - assert rr2.subkeys == [[0,1]] + assert rr2.subkeys == [(0,1)] assert rr2.local_seqs == [0, 0xFFFFFFFF] assert rr2.network_seqs == [0, 0xFFFFFFFF] @@ -390,42 +390,28 @@ async def test_inspect_dht_record(api_connection: veilid.VeilidAPI): -async def _run_test_schema_limit(api_connection: veilid.VeilidAPI, open_record: Callable[[veilid.RoutingContext, int], Awaitable[tuple[veilid.TypedKey, veilid.PublicKey, veilid.SecretKey]]], count: int, test_data: bytes, ): +async def _run_test_schema_limit(api_connection: veilid.VeilidAPI, open_record: Callable[[veilid.RoutingContext, int], Awaitable[tuple[veilid.DHTRecordDescriptor, Optional[veilid.KeyPair]]]], count: int, test_data: bytes): rc = await api_connection.new_routing_context() async with rc: - (key, owner, secret) = await open_record(rc, count) - print(f'{key} {owner}:{secret}') + (desc, writer) = await open_record(rc, count) + print(f'{desc.key} {writer}') # write dht records on server 0 records = [] print(f'writing {count} subkeys') for n in range(count): - await rc.set_dht_value(key, ValueSubkey(n), test_data) + await rc.set_dht_value(desc.key, ValueSubkey(n), test_data) print(f' {n}') - print('syncing records to the network') + await sync(rc, [desc]) - while True: - donerecords = set() - subkeysleft = 0 - - rr = await rc.inspect_dht_record(key, []) - left = 0; [left := left + (x[1]-x[0]+1) for x in rr.offline_subkeys] - if left == 0: - break - print(f' {left} subkeys left') - time.sleep(1) - - await rc.close_dht_record(key) - - await api_connection.debug("record purge local") - await api_connection.debug("record purge remote") + await rc.close_dht_record(desc.key) # read dht records on server 0 print(f'reading {count} subkeys') - desc1 = await rc.open_dht_record(key) + desc1 = await rc.open_dht_record(desc.key) for n in range(count): - vd0 = await rc.get_dht_value(key, ValueSubkey(n), force_refresh=True) + vd0 = await rc.get_dht_value(desc1.key, ValueSubkey(n)) assert vd0.data == test_data print(f' {n}') @@ -433,10 +419,10 @@ async def _run_test_schema_limit(api_connection: veilid.VeilidAPI, open_record: @pytest.mark.asyncio async def test_schema_limit_dflt(api_connection: veilid.VeilidAPI): - async def open_record(rc: veilid.RoutingContext, count: int) -> tuple[veilid.TypedKey, veilid.PublicKey, veilid.SecretKey]: + async def open_record(rc: veilid.RoutingContext, count: int) -> tuple[veilid.DHTRecordDescriptor, Optional[veilid.KeyPair]]: schema = veilid.DHTSchema.dflt(count) desc = await rc.create_dht_record(schema) - return (desc.key, desc.owner, desc.owner_secret) + return (desc, desc.owner_key_pair()) print("Test with maximum number of subkeys before lower limit hit") @@ -474,7 +460,7 @@ async def test_schema_limit_smpl(api_connection: veilid.VeilidAPI): desc = await rc.create_dht_record(schema) await rc.open_dht_record(desc.key, writer_keypair) - return (desc.key, writer_keypair.key(), writer_keypair.secret()) + return (desc, writer_keypair) print("Test with maximum number of subkeys before lower limit hit") TEST_DATA = b"A" * 32768 @@ -545,18 +531,7 @@ async def test_dht_integration_writer_reader(): await rc0.set_dht_value(desc.key, ValueSubkey(0), TEST_DATA) - print('syncing records to the network') - recleft = len(records) - for desc in records: - while True: - rr = await rc0.inspect_dht_record(desc.key, []) - left = 0; [left := left + (x[1]-x[0]+1) for x in rr.offline_subkeys] - if left == 0: - await rc0.close_dht_record(desc.key) - break - print(f' {recleft} records {left} subkeys left') - time.sleep(0.1) - recleft-=1 + await sync(rc0, records) # read dht records on server 1 print(f'reading {COUNT} records') @@ -636,6 +611,96 @@ async def test_dht_write_read_local(): print(f' {n}') n += 1 + +@pytest.mark.skipif(os.getenv("STRESS") != "1", reason="stress test takes a long time") +@pytest.mark.asyncio +async def test_dht_write_read_full_subkeys_local(): + + async def null_update_callback(update: veilid.VeilidUpdate): + pass + + try: + api0 = await veilid.api_connector(null_update_callback, 0) + except veilid.VeilidConnectionError: + pytest.skip("Unable to connect to veilid-server 0.") + + async with api0: + # purge local and remote record stores to ensure we start fresh + await api0.debug("record purge local") + await api0.debug("record purge remote") + + # make routing contexts + rc0 = await api0.new_routing_context() + async with rc0: + + # Number of records + COUNT = 8 + # Number of subkeys per record + SUBKEY_COUNT = 32 + # Nonce to encrypt test data + NONCE = veilid.Nonce.from_bytes(b"A"*24) + # Secret to encrypt test data + SECRET = veilid.SharedSecret.from_bytes(b"A"*32) + # Max subkey size + MAX_SUBKEY_SIZE = min(32768, 1024*1024/SUBKEY_COUNT) + # MAX_SUBKEY_SIZE = 256 + + # write dht records on server 0 + records = [] + subkey_data_list = [] + schema = veilid.DHTSchema.dflt(SUBKEY_COUNT) + print(f'writing {COUNT} records with full subkeys') + init_futures = set() + for n in range(COUNT): + + # Make encrypted data that is consistent and hard to compress + subkey_data = bytes(chr(ord("A")+n)*MAX_SUBKEY_SIZE, 'ascii') + print(f"subkey_data({n}):len={len(subkey_data)}") + + cs = await api0.best_crypto_system() + async with cs: + subkey_data = await cs.crypt_no_auth(subkey_data, NONCE, SECRET) + subkey_data_list.append(subkey_data) + + + desc = await rc0.create_dht_record(schema) + records.append(desc) + + for i in range(SUBKEY_COUNT): + init_futures.add(rc0.set_dht_value(desc.key, ValueSubkey(i), subkey_data)) + + print(f' {n}: {desc.key} {desc.owner}:{desc.owner_secret}') + + # Wait for all records to synchronize, with progress bars + await sync_win(rc0, records, SUBKEY_COUNT, init_futures) + + for desc0 in records: + await rc0.close_dht_record(desc0.key) + + await api0.debug("record purge local") + await api0.debug("record purge remote") + + # read dht records on server 0 + print(f'reading {COUNT} records') + for n, desc0 in enumerate(records): + desc1 = await rc0.open_dht_record(desc0.key) + + for i in range(SUBKEY_COUNT): + vd0 = None + while vd0 == None: + vd0 = await rc0.get_dht_value(desc1.key, ValueSubkey(i), force_refresh=True) + if vd0 != None: + assert vd0.data == subkey_data_list[n] + break + time.sleep(1) + print(f"retrying record {n} subkey {i}") + + + await rc0.close_dht_record(desc1.key) + + print(f' {n}') + + async def sync(rc: veilid.RoutingContext, records: list[veilid.DHTRecordDescriptor]): print('syncing records to the network') syncrecords = records.copy() @@ -646,9 +711,119 @@ async def sync(rc: veilid.RoutingContext, records: list[veilid.DHTRecordDescript rr = await rc.inspect_dht_record(desc.key, []) left = 0; [left := left + (x[1]-x[0]+1) for x in rr.offline_subkeys] if left == 0: - donerecords.add(desc) + if veilid.ValueSeqNum.NONE not in rr.local_seqs: + donerecords.add(desc) else: subkeysleft += left syncrecords = [x for x in syncrecords if x not in donerecords] print(f' {len(syncrecords)} records {subkeysleft} subkeys left') time.sleep(1) + + +async def sync_win( + rc: veilid.RoutingContext, + records: list[veilid.DHTRecordDescriptor], + subkey_count: int, + init_futures: set[Awaitable[Any]] + ): + import curses + + screen = curses.initscr() + + curses.start_color() + curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_BLUE) + curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_CYAN) + curses.init_pair(3, curses.COLOR_BLACK, curses.COLOR_YELLOW) + curses.init_pair(4, curses.COLOR_BLACK, curses.COLOR_GREEN) + + HEIGHT=len(records) + 3 + GRAPHWIDTH = subkey_count + WIDTH=GRAPHWIDTH + 4 + 1 + 43 + 2 + + cur_lines = curses.LINES + cur_cols = curses.COLS + win = curses.newwin(HEIGHT, WIDTH, + max(0, int(cur_lines/2) - int(HEIGHT/2)), + max(0, int(cur_cols/2) - int(WIDTH/2))) + win.clear() + win.border(0,0,0,0) + win.nodelay(True) + + # Record inspection and completion state + + # Records we are done inspecting and have finished sync + donerecords: set[veilid.TypedKey] = set() + # Records we are currently inspecting that are in the futures set + futurerecords: set[veilid.TypedKey] = set() + # All the futures we are waiting for + futures = set() + # The record report state + recordreports: dict[veilid.TypedKey, veilid.DHTRecordReport] = dict() + + # Add initial futures with None key + for fut in init_futures: + async def _do_init_fut(fut): + return (None, await fut) + futures.add(asyncio.create_task(_do_init_fut(fut))) + + # Loop until all records are completed + while len(donerecords) != len(records): + + # Update the futures with inspects for unfinished records + for n, desc in enumerate(records): + if desc.key in donerecords or desc.key in futurerecords: + continue + async def _do_inspect(key: veilid.TypedKey): + return (key, await rc.inspect_dht_record(key, [])) + futures.add(asyncio.create_task(_do_inspect(desc.key))) + futurerecords.add(desc.key) + + # Wait for some futures to finish + done, futures = await asyncio.wait(futures, return_when = asyncio.FIRST_COMPLETED) + + # Process finished futures into the state + for rr_fut in done: + key: veilid.TypedKey + rr: veilid.DHTRecordReport + key, rr = await rr_fut + if key is not None: + futurerecords.remove(key) + + if len(rr.subkeys) == 1 and rr.subkeys[0] == (0, subkey_count-1) and veilid.ValueSeqNum.NONE not in rr.local_seqs and len(rr.offline_subkeys) == 0: + if key in recordreports: + del recordreports[key] + donerecords.add(key) + else: + recordreports[key] = rr + + # Re-render the state + if cur_lines != curses.LINES or cur_cols != curses.COLS: + cur_lines = curses.LINES + cur_cols = curses.COLS + win.move( + max(0, int(cur_lines/2) - int(HEIGHT/2)), + max(0, int(cur_cols/2) - int(WIDTH/2))) + win.border(0,0,0,0) + win.addstr(1, 1, "syncing records to the network", curses.color_pair(0)) + for n, rr in enumerate(records): + key = rr.key + win.addstr(n+2, GRAPHWIDTH+1, key, curses.color_pair(0)) + + if key in donerecords: + win.addstr(n+2, 1, " " * subkey_count, curses.color_pair(4)) + elif key in recordreports: + rr = recordreports[key] + win.addstr(n+2, 1, " " * subkey_count, curses.color_pair(1)) + for (a,b) in rr.subkeys: + for m in range(a, b+1): + if rr.local_seqs[m] != veilid.ValueSeqNum.NONE: + win.addstr(n+2, m+1, " ", curses.color_pair(2)) + for (a,b) in rr.offline_subkeys: + win.addstr(n+2, a+1, " " * (b-a+1), curses.color_pair(3)) + else: + win.addstr(n+2, 1, " " * subkey_count, curses.color_pair(1)) + + win.refresh() + + curses.endwin() + diff --git a/veilid-python/veilid/json_api.py b/veilid-python/veilid/json_api.py index 7b75d6ed..0a88051c 100644 --- a/veilid-python/veilid/json_api.py +++ b/veilid-python/veilid/json_api.py @@ -338,6 +338,12 @@ class _JsonVeilidAPI(VeilidAPI): async def new_custom_private_route( self, kinds: list[CryptoKind], stability: Stability, sequencing: Sequencing ) -> tuple[RouteId, bytes]: + assert isinstance(kinds, list) + for k in kinds: + assert isinstance(k, CryptoKind) + assert isinstance(stability, Stability) + assert isinstance(sequencing, Sequencing) + return NewPrivateRouteResult.from_json( raise_api_result( await self.send_ndjson_request( @@ -350,6 +356,8 @@ class _JsonVeilidAPI(VeilidAPI): ).to_tuple() async def import_remote_private_route(self, blob: bytes) -> RouteId: + assert isinstance(blob, bytes) + return RouteId( raise_api_result( await self.send_ndjson_request(Operation.IMPORT_REMOTE_PRIVATE_ROUTE, blob=blob) @@ -357,11 +365,16 @@ class _JsonVeilidAPI(VeilidAPI): ) async def release_private_route(self, route_id: RouteId): + assert isinstance(route_id, RouteId) + raise_api_result( await self.send_ndjson_request(Operation.RELEASE_PRIVATE_ROUTE, route_id=route_id) ) async def app_call_reply(self, call_id: OperationId, message: bytes): + assert isinstance(call_id, OperationId) + assert isinstance(message, bytes) + raise_api_result( await self.send_ndjson_request( Operation.APP_CALL_REPLY, call_id=call_id, message=message @@ -373,6 +386,9 @@ class _JsonVeilidAPI(VeilidAPI): return _JsonRoutingContext(self, rc_id) async def open_table_db(self, name: str, column_count: int) -> TableDb: + assert isinstance(name, str) + assert isinstance(column_count, int) + db_id = raise_api_result( await self.send_ndjson_request( Operation.OPEN_TABLE_DB, name=name, column_count=column_count @@ -381,11 +397,15 @@ class _JsonVeilidAPI(VeilidAPI): return _JsonTableDb(self, db_id) async def delete_table_db(self, name: str) -> bool: + assert isinstance(name, str) + return raise_api_result( await self.send_ndjson_request(Operation.DELETE_TABLE_DB, name=name) ) async def get_crypto_system(self, kind: CryptoKind) -> CryptoSystem: + assert isinstance(kind, CryptoKind) + cs_id = raise_api_result( await self.send_ndjson_request(Operation.GET_CRYPTO_SYSTEM, kind=kind) ) @@ -398,6 +418,13 @@ class _JsonVeilidAPI(VeilidAPI): async def verify_signatures( self, node_ids: list[TypedKey], data: bytes, signatures: list[TypedSignature] ) -> Optional[list[TypedKey]]: + assert isinstance(node_ids, list) + for ni in node_ids: + assert isinstance(ni, TypedKey) + assert isinstance(data, bytes) + for sig in signatures: + assert isinstance(sig, TypedSignature) + out = raise_api_result( await self.send_ndjson_request( Operation.VERIFY_SIGNATURES, @@ -418,6 +445,11 @@ class _JsonVeilidAPI(VeilidAPI): async def generate_signatures( self, data: bytes, key_pairs: list[TypedKeyPair] ) -> list[TypedSignature]: + assert isinstance(data, bytes) + assert isinstance(key_pairs, list) + for kp in key_pairs: + assert isinstance(kp, TypedKeyPair) + return list( map( lambda x: TypedSignature(x), @@ -430,6 +462,8 @@ class _JsonVeilidAPI(VeilidAPI): ) async def generate_key_pair(self, kind: CryptoKind) -> list[TypedKeyPair]: + assert isinstance(kind, CryptoKind) + return list( map( lambda x: TypedKeyPair(x), @@ -443,6 +477,7 @@ class _JsonVeilidAPI(VeilidAPI): return Timestamp(raise_api_result(await self.send_ndjson_request(Operation.NOW))) async def debug(self, command: str) -> str: + assert isinstance(command, str) return raise_api_result(await self.send_ndjson_request(Operation.DEBUG, command=command)) async def veilid_version_string(self) -> str: @@ -501,6 +536,8 @@ class _JsonRoutingContext(RoutingContext): self.done = True async def with_default_safety(self, release=True) -> Self: + assert isinstance(release, bool) + new_rc_id = raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -514,6 +551,9 @@ class _JsonRoutingContext(RoutingContext): return self.__class__(self.api, new_rc_id) async def with_safety(self, safety_selection: SafetySelection, release=True) -> Self: + assert isinstance(safety_selection, SafetySelection) + assert isinstance(release, bool) + new_rc_id = raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -528,6 +568,9 @@ class _JsonRoutingContext(RoutingContext): return self.__class__(self.api, new_rc_id) async def with_sequencing(self, sequencing: Sequencing, release=True) -> Self: + assert isinstance(sequencing, Sequencing) + assert isinstance(release, bool) + new_rc_id = raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -555,6 +598,9 @@ class _JsonRoutingContext(RoutingContext): ) ) async def app_call(self, target: TypedKey | RouteId, message: bytes) -> bytes: + assert isinstance(target, TypedKey) or isinstance(target, RouteId) + assert isinstance(message, bytes) + return urlsafe_b64decode_no_pad( raise_api_result( await self.api.send_ndjson_request( @@ -569,6 +615,9 @@ class _JsonRoutingContext(RoutingContext): ) async def app_message(self, target: TypedKey | RouteId, message: bytes): + assert isinstance(target, TypedKey) or isinstance(target, RouteId) + assert isinstance(message, bytes) + raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -583,6 +632,10 @@ class _JsonRoutingContext(RoutingContext): async def create_dht_record( self, schema: DHTSchema, owner: Optional[KeyPair] = None, kind: Optional[CryptoKind] = None ) -> DHTRecordDescriptor: + assert isinstance(schema, DHTSchema) + assert owner is None or isinstance(owner, KeyPair) + assert kind is None or isinstance(kind, CryptoKind) + return DHTRecordDescriptor.from_json( raise_api_result( await self.api.send_ndjson_request( @@ -600,6 +653,9 @@ class _JsonRoutingContext(RoutingContext): async def open_dht_record( self, key: TypedKey, writer: Optional[KeyPair] = None ) -> DHTRecordDescriptor: + assert isinstance(key, TypedKey) + assert writer is None or isinstance(writer, KeyPair) + return DHTRecordDescriptor.from_json( raise_api_result( await self.api.send_ndjson_request( @@ -614,6 +670,8 @@ class _JsonRoutingContext(RoutingContext): ) async def close_dht_record(self, key: TypedKey): + assert isinstance(key, TypedKey) + raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -625,6 +683,8 @@ class _JsonRoutingContext(RoutingContext): ) async def delete_dht_record(self, key: TypedKey): + assert isinstance(key, TypedKey) + raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -638,6 +698,10 @@ class _JsonRoutingContext(RoutingContext): async def get_dht_value( self, key: TypedKey, subkey: ValueSubkey, force_refresh: bool = False ) -> Optional[ValueData]: + assert isinstance(key, TypedKey) + assert isinstance(subkey, ValueSubkey) + assert isinstance(force_refresh, bool) + ret = raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -654,6 +718,11 @@ class _JsonRoutingContext(RoutingContext): async def set_dht_value( self, key: TypedKey, subkey: ValueSubkey, data: bytes, writer: Optional[KeyPair] = None ) -> Optional[ValueData]: + assert isinstance(key, TypedKey) + assert isinstance(subkey, ValueSubkey) + assert isinstance(data, bytes) + assert writer is None or isinstance(writer, KeyPair) + ret = raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -675,6 +744,15 @@ class _JsonRoutingContext(RoutingContext): expiration: Timestamp = 0, count: int = 0xFFFFFFFF, ) -> Timestamp: + assert isinstance(key, TypedKey) + assert isinstance(subkeys, list) + for s in subkeys: + assert isinstance(s, tuple) + assert isinstance(s[0], ValueSubkey) + assert isinstance(s[1], ValueSubkey) + assert isinstance(expiration, Timestamp) + assert isinstance(count, int) + return Timestamp( raise_api_result( await self.api.send_ndjson_request( @@ -693,6 +771,13 @@ class _JsonRoutingContext(RoutingContext): async def cancel_dht_watch( self, key: TypedKey, subkeys: list[tuple[ValueSubkey, ValueSubkey]] ) -> bool: + assert isinstance(key, TypedKey) + assert isinstance(subkeys, list) + for s in subkeys: + assert isinstance(s, tuple) + assert isinstance(s[0], ValueSubkey) + assert isinstance(s[1], ValueSubkey) + return raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -710,6 +795,14 @@ class _JsonRoutingContext(RoutingContext): subkeys: list[tuple[ValueSubkey, ValueSubkey]], scope: DHTReportScope = DHTReportScope.LOCAL, ) -> DHTRecordReport: + assert isinstance(key, TypedKey) + assert isinstance(subkeys, list) + for s in subkeys: + assert isinstance(s, tuple) + assert isinstance(s[0], ValueSubkey) + assert isinstance(s[1], ValueSubkey) + assert isinstance(scope, DHTReportScope) + return DHTRecordReport.from_json( raise_api_result( await self.api.send_ndjson_request( @@ -790,6 +883,10 @@ class _JsonTableDbTransaction(TableDbTransaction): self.done = True async def store(self, key: bytes, value: bytes, col: int = 0): + assert isinstance(key, bytes) + assert isinstance(value, bytes) + assert isinstance(col, int) + await self.api.send_ndjson_request( Operation.TABLE_DB_TRANSACTION, validate=validate_tx_op, @@ -801,6 +898,9 @@ class _JsonTableDbTransaction(TableDbTransaction): ) async def delete(self, key: bytes, col: int = 0): + assert isinstance(key, bytes) + assert isinstance(col, int) + await self.api.send_ndjson_request( Operation.TABLE_DB_TRANSACTION, validate=validate_tx_op, @@ -866,6 +966,8 @@ class _JsonTableDb(TableDb): ) async def get_keys(self, col: int = 0) -> list[bytes]: + assert isinstance(col, int) + return list( map( lambda x: urlsafe_b64decode_no_pad(x), @@ -893,6 +995,10 @@ class _JsonTableDb(TableDb): return _JsonTableDbTransaction(self.api, tx_id) async def store(self, key: bytes, value: bytes, col: int = 0): + assert isinstance(key, bytes) + assert isinstance(value, bytes) + assert isinstance(col, int) + return raise_api_result( await self.api.send_ndjson_request( Operation.TABLE_DB, @@ -906,6 +1012,9 @@ class _JsonTableDb(TableDb): ) async def load(self, key: bytes, col: int = 0) -> Optional[bytes]: + assert isinstance(key, bytes) + assert isinstance(col, int) + res = raise_api_result( await self.api.send_ndjson_request( Operation.TABLE_DB, @@ -919,6 +1028,9 @@ class _JsonTableDb(TableDb): return None if res is None else urlsafe_b64decode_no_pad(res) async def delete(self, key: bytes, col: int = 0) -> Optional[bytes]: + assert isinstance(key, bytes) + assert isinstance(col, int) + res = raise_api_result( await self.api.send_ndjson_request( Operation.TABLE_DB, @@ -989,6 +1101,9 @@ class _JsonCryptoSystem(CryptoSystem): self.done = True async def cached_dh(self, key: PublicKey, secret: SecretKey) -> SharedSecret: + assert isinstance(key, PublicKey) + assert isinstance(secret, SecretKey) + return SharedSecret( raise_api_result( await self.api.send_ndjson_request( @@ -1003,6 +1118,9 @@ class _JsonCryptoSystem(CryptoSystem): ) async def compute_dh(self, key: PublicKey, secret: SecretKey) -> SharedSecret: + assert isinstance(key, PublicKey) + assert isinstance(secret, SecretKey) + return SharedSecret( raise_api_result( await self.api.send_ndjson_request( @@ -1017,6 +1135,10 @@ class _JsonCryptoSystem(CryptoSystem): ) async def generate_shared_secret(self, key: PublicKey, secret: SecretKey, domain: bytes) -> SharedSecret: + assert isinstance(key, PublicKey) + assert isinstance(secret, SecretKey) + assert isinstance(domain, bytes) + return SharedSecret( raise_api_result( await self.api.send_ndjson_request( @@ -1032,6 +1154,8 @@ class _JsonCryptoSystem(CryptoSystem): ) async def random_bytes(self, len: int) -> bytes: + assert isinstance(len, int) + return urlsafe_b64decode_no_pad( raise_api_result( await self.api.send_ndjson_request( @@ -1055,6 +1179,9 @@ class _JsonCryptoSystem(CryptoSystem): ) async def hash_password(self, password: bytes, salt: bytes) -> str: + assert isinstance(password, bytes) + assert isinstance(salt, bytes) + return raise_api_result( await self.api.send_ndjson_request( Operation.CRYPTO_SYSTEM, @@ -1067,6 +1194,9 @@ class _JsonCryptoSystem(CryptoSystem): ) async def verify_password(self, password: bytes, password_hash: str) -> bool: + assert isinstance(password, bytes) + assert isinstance(password_hash, str) + return raise_api_result( await self.api.send_ndjson_request( Operation.CRYPTO_SYSTEM, @@ -1079,6 +1209,9 @@ class _JsonCryptoSystem(CryptoSystem): ) async def derive_shared_secret(self, password: bytes, salt: bytes) -> SharedSecret: + assert isinstance(password, bytes) + assert isinstance(salt, bytes) + return SharedSecret( raise_api_result( await self.api.send_ndjson_request( @@ -1129,6 +1262,8 @@ class _JsonCryptoSystem(CryptoSystem): ) async def generate_hash(self, data: bytes) -> HashDigest: + assert isinstance(data, bytes) + return HashDigest( raise_api_result( await self.api.send_ndjson_request( @@ -1142,6 +1277,9 @@ class _JsonCryptoSystem(CryptoSystem): ) async def validate_key_pair(self, key: PublicKey, secret: SecretKey) -> bool: + assert isinstance(key, PublicKey) + assert isinstance(secret, SecretKey) + return raise_api_result( await self.api.send_ndjson_request( Operation.CRYPTO_SYSTEM, @@ -1154,6 +1292,9 @@ class _JsonCryptoSystem(CryptoSystem): ) async def validate_hash(self, data: bytes, hash_digest: HashDigest) -> bool: + assert isinstance(data, bytes) + assert isinstance(hash_digest, HashDigest) + return raise_api_result( await self.api.send_ndjson_request( Operation.CRYPTO_SYSTEM, @@ -1166,6 +1307,9 @@ class _JsonCryptoSystem(CryptoSystem): ) async def distance(self, key1: CryptoKey, key2: CryptoKey) -> CryptoKeyDistance: + assert isinstance(key1, CryptoKey) + assert isinstance(key2, CryptoKey) + return CryptoKeyDistance( raise_api_result( await self.api.send_ndjson_request( @@ -1180,6 +1324,10 @@ class _JsonCryptoSystem(CryptoSystem): ) async def sign(self, key: PublicKey, secret: SecretKey, data: bytes) -> Signature: + assert isinstance(key, PublicKey) + assert isinstance(secret, SecretKey) + assert isinstance(data, bytes) + return Signature( raise_api_result( await self.api.send_ndjson_request( @@ -1195,6 +1343,10 @@ class _JsonCryptoSystem(CryptoSystem): ) async def verify(self, key: PublicKey, data: bytes, signature: Signature): + assert isinstance(key, PublicKey) + assert isinstance(data, bytes) + assert isinstance(signature, Signature) + return raise_api_result( await self.api.send_ndjson_request( Operation.CRYPTO_SYSTEM, @@ -1224,6 +1376,11 @@ class _JsonCryptoSystem(CryptoSystem): shared_secret: SharedSecret, associated_data: Optional[bytes], ) -> bytes: + assert isinstance(body, bytes) + assert isinstance(nonce, Nonce) + assert isinstance(shared_secret, SharedSecret) + assert associated_data is None or isinstance(associated_data, bytes) + return urlsafe_b64decode_no_pad( raise_api_result( await self.api.send_ndjson_request( @@ -1246,6 +1403,11 @@ class _JsonCryptoSystem(CryptoSystem): shared_secret: SharedSecret, associated_data: Optional[bytes], ) -> bytes: + assert isinstance(body, bytes) + assert isinstance(nonce, Nonce) + assert isinstance(shared_secret, SharedSecret) + assert associated_data is None or isinstance(associated_data, bytes) + return urlsafe_b64decode_no_pad( raise_api_result( await self.api.send_ndjson_request( @@ -1262,6 +1424,9 @@ class _JsonCryptoSystem(CryptoSystem): ) async def crypt_no_auth(self, body: bytes, nonce: Nonce, shared_secret: SharedSecret) -> bytes: + assert isinstance(body, bytes) + assert isinstance(nonce, Nonce) + assert isinstance(shared_secret, SharedSecret) return urlsafe_b64decode_no_pad( raise_api_result( await self.api.send_ndjson_request( diff --git a/veilid-python/veilid/types.py b/veilid-python/veilid/types.py index 8ebad0fa..8ec6a782 100644 --- a/veilid-python/veilid/types.py +++ b/veilid-python/veilid/types.py @@ -2,7 +2,7 @@ import base64 import json from enum import StrEnum from functools import total_ordering -from typing import Any, Optional, Self, Tuple +from typing import Any, Optional, Self #################################################################### @@ -122,6 +122,7 @@ class EncodedString(str): @classmethod def from_bytes(cls, b: bytes) -> Self: + assert isinstance(b, bytes) return cls(urlsafe_b64encode_no_pad(b)) @@ -160,6 +161,8 @@ class Nonce(EncodedString): class KeyPair(str): @classmethod def from_parts(cls, key: PublicKey, secret: SecretKey) -> Self: + assert isinstance(key, PublicKey) + assert isinstance(secret, SecretKey) return cls(f"{key}:{secret}") def key(self) -> PublicKey: @@ -168,7 +171,7 @@ class KeyPair(str): def secret(self) -> SecretKey: return SecretKey(self.split(":", 1)[1]) - def to_parts(self) -> Tuple[PublicKey, SecretKey]: + def to_parts(self) -> tuple[PublicKey, SecretKey]: public, secret = self.split(":", 1) return (PublicKey(public), SecretKey(secret)) @@ -188,6 +191,8 @@ class CryptoTyped(str): class TypedKey(CryptoTyped): @classmethod def from_value(cls, kind: CryptoKind, value: PublicKey) -> Self: + assert isinstance(kind, CryptoKind) + assert isinstance(value, PublicKey) return cls(f"{kind}:{value}") def value(self) -> PublicKey: @@ -197,6 +202,8 @@ class TypedKey(CryptoTyped): class TypedSecret(CryptoTyped): @classmethod def from_value(cls, kind: CryptoKind, value: SecretKey) -> Self: + assert isinstance(kind, CryptoKind) + assert isinstance(value, SecretKey) return cls(f"{kind}:{value}") def value(self) -> SecretKey: @@ -206,6 +213,8 @@ class TypedSecret(CryptoTyped): class TypedKeyPair(CryptoTyped): @classmethod def from_value(cls, kind: CryptoKind, value: KeyPair) -> Self: + assert isinstance(kind, CryptoKind) + assert isinstance(value, KeyPair) return cls(f"{kind}:{value}") def value(self) -> KeyPair: @@ -215,6 +224,8 @@ class TypedKeyPair(CryptoTyped): class TypedSignature(CryptoTyped): @classmethod def from_value(cls, kind: CryptoKind, value: Signature) -> Self: + assert isinstance(kind, CryptoKind) + assert isinstance(value, Signature) return cls(f"{kind}:{value}") def value(self) -> Signature: @@ -226,7 +237,7 @@ class ValueSubkey(int): class ValueSeqNum(int): - pass + NONE = 4294967295 #################################################################### @@ -284,10 +295,13 @@ class NewPrivateRouteResult: blob: bytes def __init__(self, route_id: RouteId, blob: bytes): + assert isinstance(route_id, RouteId) + assert isinstance(blob, bytes) + self.route_id = route_id self.blob = blob - def to_tuple(self) -> Tuple[RouteId, bytes]: + def to_tuple(self) -> tuple[RouteId, bytes]: return (self.route_id, self.blob) @classmethod @@ -300,6 +314,9 @@ class DHTSchemaSMPLMember: m_cnt: int def __init__(self, m_key: PublicKey, m_cnt: int): + assert isinstance(m_key, PublicKey) + assert isinstance(m_cnt, int) + self.m_key = m_key self.m_cnt = m_cnt @@ -321,10 +338,15 @@ class DHTSchema: @classmethod def dflt(cls, o_cnt: int) -> Self: + assert isinstance(o_cnt, int) return cls(DHTSchemaKind.DFLT, o_cnt=o_cnt) @classmethod def smpl(cls, o_cnt: int, members: list[DHTSchemaSMPLMember]) -> Self: + assert isinstance(o_cnt, int) + assert isinstance(members, list) + for m in members: + assert isinstance(m, DHTSchemaSMPLMember) return cls(DHTSchemaKind.SMPL, o_cnt=o_cnt, members=members) @classmethod @@ -404,8 +426,8 @@ class DHTRecordReport: @classmethod def from_json(cls, j: dict) -> Self: return cls( - [[p[0], p[1]] for p in j["subkeys"]], - [[p[0], p[1]] for p in j["offline_subkeys"]], + [(p[0], p[1]) for p in j["subkeys"]], + [(p[0], p[1]) for p in j["offline_subkeys"]], [ValueSeqNum(s) for s in j["local_seqs"]], [ValueSeqNum(s) for s in j["network_seqs"]], )