From 67eeb87c280aa6a39cc196c371dc3d4c94c70f70 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Tue, 11 Mar 2025 09:31:15 -0400 Subject: [PATCH 1/3] validate types for python api calls add more stress tests to python fix deadlock in veilid_api duration testing correct offline_subkey_writes inflight reporting --- veilid-core/src/logging/api_tracing_layer.rs | 28 +- veilid-core/src/storage_manager/mod.rs | 105 ++++++-- veilid-python/tests/test_dht.py | 257 ++++++++++++++++--- veilid-python/veilid/json_api.py | 165 ++++++++++++ veilid-python/veilid/types.py | 34 ++- 5 files changed, 499 insertions(+), 90 deletions(-) diff --git a/veilid-core/src/logging/api_tracing_layer.rs b/veilid-core/src/logging/api_tracing_layer.rs index 812e8d08..c2498a64 100644 --- a/veilid-core/src/logging/api_tracing_layer.rs +++ b/veilid-core/src/logging/api_tracing_layer.rs @@ -193,16 +193,13 @@ impl registry::LookupSpan<'a>> Layer for ApiTracingLa attrs.record(&mut new_debug_record); if let Some(span_ref) = ctx.span(id) { - span_ref - .extensions_mut() - .insert::(new_debug_record); + let mut extensions_mut = span_ref.extensions_mut(); + extensions_mut.insert::(new_debug_record); if crate::DURATION_LOG_FACILITIES.contains(&attrs.metadata().target()) { - span_ref - .extensions_mut() - .insert::(SpanDuration { - start: Timestamp::now(), - end: Timestamp::default(), - }); + extensions_mut.insert::(SpanDuration { + start: Timestamp::now(), + end: Timestamp::default(), + }); } } } @@ -213,14 +210,14 @@ impl registry::LookupSpan<'a>> Layer for ApiTracingLa return; } if let Some(span_ref) = ctx.span(&id) { - if let Some(span_duration) = span_ref.extensions_mut().get_mut::() { + let mut extensions_mut = span_ref.extensions_mut(); + if let Some(span_duration) = extensions_mut.get_mut::() { span_duration.end = Timestamp::now(); let duration = span_duration.end.saturating_sub(span_duration.start); let meta = span_ref.metadata(); - let mut extensions = span_ref.extensions_mut(); let log_key = - if let Some(span_ksr) = extensions.get_mut::() { + if let Some(span_ksr) = extensions_mut.get_mut::() { span_ksr.log_key() } else { "" @@ -254,10 +251,9 @@ impl registry::LookupSpan<'a>> Layer for ApiTracingLa return; } if let Some(span_ref) = ctx.span(id) { - if let Some(debug_record) = span_ref - .extensions_mut() - .get_mut::() - { + let mut extensions_mut = span_ref.extensions_mut(); + + if let Some(debug_record) = extensions_mut.get_mut::() { values.record(debug_record); } } diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 722c41b9..4502bb1b 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -59,6 +59,8 @@ struct StorageManagerInner { pub remote_record_store: Option>, /// Record subkeys that have not been pushed to the network because they were written to offline pub offline_subkey_writes: HashMap, + /// Record subkeys that are currently being written to in the foreground + pub active_subkey_writes: HashMap, /// Storage manager metadata that is persistent, including copy of offline subkey writes pub metadata_db: Option, /// Background processing task (not part of attachment manager tick tree so it happens when detached too) @@ -73,6 +75,7 @@ impl fmt::Debug for StorageManagerInner { .field("local_record_store", &self.local_record_store) .field("remote_record_store", &self.remote_record_store) .field("offline_subkey_writes", &self.offline_subkey_writes) + .field("active_subkey_writes", &self.active_subkey_writes) //.field("metadata_db", &self.metadata_db) //.field("tick_future", &self.tick_future) .finish() @@ -736,7 +739,21 @@ impl StorageManager { ) .await?; - if !self.dht_is_online() { + // Note that we are writing this subkey actively + // If it appears we are already doing this, then put it to the offline queue + let already_writing = { + let asw = inner.active_subkey_writes.entry(key).or_default(); + if asw.contains(subkey) { + veilid_log!(self debug "Already writing to this subkey: {}:{}", key, subkey); + true + } else { + // Add to our list of active subkey writes + asw.insert(subkey); + false + } + }; + + if already_writing || !self.dht_is_online() { veilid_log!(self debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() ); // Add to offline writes to flush Self::add_offline_subkey_write_inner(&mut inner, key, subkey, safety_selection); @@ -764,41 +781,68 @@ impl StorageManager { // Failed to write, try again later let mut inner = self.inner.lock().await; Self::add_offline_subkey_write_inner(&mut inner, key, subkey, safety_selection); + + // Remove from active subkey writes + let asw = inner.active_subkey_writes.get_mut(&key).unwrap(); + if !asw.remove(subkey) { + panic!("missing active subkey write: {}:{}", key, subkey); + } + if asw.is_empty() { + inner.active_subkey_writes.remove(&key); + } return Err(e); } }; - // Wait for the first result - let Ok(result) = res_rx.recv_async().await else { - apibail_internal!("failed to receive results"); + let process = || async { + // Wait for the first result + let Ok(result) = res_rx.recv_async().await else { + apibail_internal!("failed to receive results"); + }; + let result = result?; + let partial = result.fanout_result.kind.is_partial(); + + // Process the returned result + let out = self + .process_outbound_set_value_result( + key, + subkey, + signed_value_data.value_data().clone(), + safety_selection, + result, + ) + .await?; + + // If there's more to process, do it in the background + if partial { + self.process_deferred_outbound_set_value_result( + res_rx, + key, + subkey, + out.clone() + .unwrap_or_else(|| signed_value_data.value_data().clone()), + safety_selection, + ); + } + + Ok(out) }; - let result = result?; - let partial = result.fanout_result.kind.is_partial(); - // Process the returned result - let out = self - .process_outbound_set_value_result( - key, - subkey, - signed_value_data.value_data().clone(), - safety_selection, - result, - ) - .await?; + let out = process().await; - // If there's more to process, do it in the background - if partial { - self.process_deferred_outbound_set_value_result( - res_rx, - key, - subkey, - out.clone() - .unwrap_or_else(|| signed_value_data.value_data().clone()), - safety_selection, - ); + // Remove active subkey write + let mut inner = self.inner.lock().await; + + // Remove from active subkey writes + let asw = inner.active_subkey_writes.get_mut(&key).unwrap(); + if !asw.remove(subkey) { + panic!("missing active subkey write: {}:{}", key, subkey); + } + if asw.is_empty() { + inner.active_subkey_writes.remove(&key); } - Ok(out) + out } /// Create,update or cancel an outbound watch to a DHT value @@ -1019,11 +1063,18 @@ impl StorageManager { ); // Get the offline subkeys for this record still only returning the ones we're inspecting + // Merge in the currently offline in-flight records and the actively written records as well + let active_subkey_writes = inner + .active_subkey_writes + .get(&key) + .cloned() + .unwrap_or_default(); let offline_subkey_writes = inner .offline_subkey_writes .get(&key) .map(|o| o.subkeys.union(&o.subkeys_in_flight)) .unwrap_or_default() + .union(&active_subkey_writes) .intersect(&subkeys); // If this is the maximum scope we're interested in, return the report diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py index ce02662b..7cd46c62 100644 --- a/veilid-python/tests/test_dht.py +++ b/veilid-python/tests/test_dht.py @@ -1,6 +1,6 @@ # Routing context veilid tests -from typing import Awaitable, Callable +from typing import Any, Awaitable, Callable, Optional import pytest import asyncio import time @@ -374,13 +374,13 @@ async def test_inspect_dht_record(api_connection: veilid.VeilidAPI): rr = await rc.inspect_dht_record(rec.key, [], veilid.DHTReportScope.LOCAL) print("rr: {}", rr.__dict__) - assert rr.subkeys == [[0,1]] + assert rr.subkeys == [(0,1)] assert rr.local_seqs == [0, 0xFFFFFFFF] assert rr.network_seqs == [] rr2 = await rc.inspect_dht_record(rec.key, [], veilid.DHTReportScope.SYNC_GET) print("rr2: {}", rr2.__dict__) - assert rr2.subkeys == [[0,1]] + assert rr2.subkeys == [(0,1)] assert rr2.local_seqs == [0, 0xFFFFFFFF] assert rr2.network_seqs == [0, 0xFFFFFFFF] @@ -390,42 +390,28 @@ async def test_inspect_dht_record(api_connection: veilid.VeilidAPI): -async def _run_test_schema_limit(api_connection: veilid.VeilidAPI, open_record: Callable[[veilid.RoutingContext, int], Awaitable[tuple[veilid.TypedKey, veilid.PublicKey, veilid.SecretKey]]], count: int, test_data: bytes, ): +async def _run_test_schema_limit(api_connection: veilid.VeilidAPI, open_record: Callable[[veilid.RoutingContext, int], Awaitable[tuple[veilid.DHTRecordDescriptor, Optional[veilid.KeyPair]]]], count: int, test_data: bytes): rc = await api_connection.new_routing_context() async with rc: - (key, owner, secret) = await open_record(rc, count) - print(f'{key} {owner}:{secret}') + (desc, writer) = await open_record(rc, count) + print(f'{desc.key} {writer}') # write dht records on server 0 records = [] print(f'writing {count} subkeys') for n in range(count): - await rc.set_dht_value(key, ValueSubkey(n), test_data) + await rc.set_dht_value(desc.key, ValueSubkey(n), test_data) print(f' {n}') - print('syncing records to the network') + await sync(rc, [desc]) - while True: - donerecords = set() - subkeysleft = 0 - - rr = await rc.inspect_dht_record(key, []) - left = 0; [left := left + (x[1]-x[0]+1) for x in rr.offline_subkeys] - if left == 0: - break - print(f' {left} subkeys left') - time.sleep(1) - - await rc.close_dht_record(key) - - await api_connection.debug("record purge local") - await api_connection.debug("record purge remote") + await rc.close_dht_record(desc.key) # read dht records on server 0 print(f'reading {count} subkeys') - desc1 = await rc.open_dht_record(key) + desc1 = await rc.open_dht_record(desc.key) for n in range(count): - vd0 = await rc.get_dht_value(key, ValueSubkey(n), force_refresh=True) + vd0 = await rc.get_dht_value(desc1.key, ValueSubkey(n)) assert vd0.data == test_data print(f' {n}') @@ -433,10 +419,10 @@ async def _run_test_schema_limit(api_connection: veilid.VeilidAPI, open_record: @pytest.mark.asyncio async def test_schema_limit_dflt(api_connection: veilid.VeilidAPI): - async def open_record(rc: veilid.RoutingContext, count: int) -> tuple[veilid.TypedKey, veilid.PublicKey, veilid.SecretKey]: + async def open_record(rc: veilid.RoutingContext, count: int) -> tuple[veilid.DHTRecordDescriptor, Optional[veilid.KeyPair]]: schema = veilid.DHTSchema.dflt(count) desc = await rc.create_dht_record(schema) - return (desc.key, desc.owner, desc.owner_secret) + return (desc, desc.owner_key_pair()) print("Test with maximum number of subkeys before lower limit hit") @@ -474,7 +460,7 @@ async def test_schema_limit_smpl(api_connection: veilid.VeilidAPI): desc = await rc.create_dht_record(schema) await rc.open_dht_record(desc.key, writer_keypair) - return (desc.key, writer_keypair.key(), writer_keypair.secret()) + return (desc, writer_keypair) print("Test with maximum number of subkeys before lower limit hit") TEST_DATA = b"A" * 32768 @@ -545,18 +531,7 @@ async def test_dht_integration_writer_reader(): await rc0.set_dht_value(desc.key, ValueSubkey(0), TEST_DATA) - print('syncing records to the network') - recleft = len(records) - for desc in records: - while True: - rr = await rc0.inspect_dht_record(desc.key, []) - left = 0; [left := left + (x[1]-x[0]+1) for x in rr.offline_subkeys] - if left == 0: - await rc0.close_dht_record(desc.key) - break - print(f' {recleft} records {left} subkeys left') - time.sleep(0.1) - recleft-=1 + await sync(rc0, records) # read dht records on server 1 print(f'reading {COUNT} records') @@ -636,6 +611,96 @@ async def test_dht_write_read_local(): print(f' {n}') n += 1 + +@pytest.mark.skipif(os.getenv("STRESS") != "1", reason="stress test takes a long time") +@pytest.mark.asyncio +async def test_dht_write_read_full_subkeys_local(): + + async def null_update_callback(update: veilid.VeilidUpdate): + pass + + try: + api0 = await veilid.api_connector(null_update_callback, 0) + except veilid.VeilidConnectionError: + pytest.skip("Unable to connect to veilid-server 0.") + + async with api0: + # purge local and remote record stores to ensure we start fresh + await api0.debug("record purge local") + await api0.debug("record purge remote") + + # make routing contexts + rc0 = await api0.new_routing_context() + async with rc0: + + # Number of records + COUNT = 8 + # Number of subkeys per record + SUBKEY_COUNT = 32 + # Nonce to encrypt test data + NONCE = veilid.Nonce.from_bytes(b"A"*24) + # Secret to encrypt test data + SECRET = veilid.SharedSecret.from_bytes(b"A"*32) + # Max subkey size + MAX_SUBKEY_SIZE = min(32768, 1024*1024/SUBKEY_COUNT) + # MAX_SUBKEY_SIZE = 256 + + # write dht records on server 0 + records = [] + subkey_data_list = [] + schema = veilid.DHTSchema.dflt(SUBKEY_COUNT) + print(f'writing {COUNT} records with full subkeys') + init_futures = set() + for n in range(COUNT): + + # Make encrypted data that is consistent and hard to compress + subkey_data = bytes(chr(ord("A")+n)*MAX_SUBKEY_SIZE, 'ascii') + print(f"subkey_data({n}):len={len(subkey_data)}") + + cs = await api0.best_crypto_system() + async with cs: + subkey_data = await cs.crypt_no_auth(subkey_data, NONCE, SECRET) + subkey_data_list.append(subkey_data) + + + desc = await rc0.create_dht_record(schema) + records.append(desc) + + for i in range(SUBKEY_COUNT): + init_futures.add(rc0.set_dht_value(desc.key, ValueSubkey(i), subkey_data)) + + print(f' {n}: {desc.key} {desc.owner}:{desc.owner_secret}') + + # Wait for all records to synchronize, with progress bars + await sync_win(rc0, records, SUBKEY_COUNT, init_futures) + + for desc0 in records: + await rc0.close_dht_record(desc0.key) + + await api0.debug("record purge local") + await api0.debug("record purge remote") + + # read dht records on server 0 + print(f'reading {COUNT} records') + for n, desc0 in enumerate(records): + desc1 = await rc0.open_dht_record(desc0.key) + + for i in range(SUBKEY_COUNT): + vd0 = None + while vd0 == None: + vd0 = await rc0.get_dht_value(desc1.key, ValueSubkey(i), force_refresh=True) + if vd0 != None: + assert vd0.data == subkey_data_list[n] + break + time.sleep(1) + print(f"retrying record {n} subkey {i}") + + + await rc0.close_dht_record(desc1.key) + + print(f' {n}') + + async def sync(rc: veilid.RoutingContext, records: list[veilid.DHTRecordDescriptor]): print('syncing records to the network') syncrecords = records.copy() @@ -646,9 +711,119 @@ async def sync(rc: veilid.RoutingContext, records: list[veilid.DHTRecordDescript rr = await rc.inspect_dht_record(desc.key, []) left = 0; [left := left + (x[1]-x[0]+1) for x in rr.offline_subkeys] if left == 0: - donerecords.add(desc) + if veilid.ValueSeqNum.NONE not in rr.local_seqs: + donerecords.add(desc) else: subkeysleft += left syncrecords = [x for x in syncrecords if x not in donerecords] print(f' {len(syncrecords)} records {subkeysleft} subkeys left') time.sleep(1) + + +async def sync_win( + rc: veilid.RoutingContext, + records: list[veilid.DHTRecordDescriptor], + subkey_count: int, + init_futures: set[Awaitable[Any]] + ): + import curses + + screen = curses.initscr() + + curses.start_color() + curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_BLUE) + curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_CYAN) + curses.init_pair(3, curses.COLOR_BLACK, curses.COLOR_YELLOW) + curses.init_pair(4, curses.COLOR_BLACK, curses.COLOR_GREEN) + + HEIGHT=len(records) + 3 + GRAPHWIDTH = subkey_count + WIDTH=GRAPHWIDTH + 4 + 1 + 43 + 2 + + cur_lines = curses.LINES + cur_cols = curses.COLS + win = curses.newwin(HEIGHT, WIDTH, + max(0, int(cur_lines/2) - int(HEIGHT/2)), + max(0, int(cur_cols/2) - int(WIDTH/2))) + win.clear() + win.border(0,0,0,0) + win.nodelay(True) + + # Record inspection and completion state + + # Records we are done inspecting and have finished sync + donerecords: set[veilid.TypedKey] = set() + # Records we are currently inspecting that are in the futures set + futurerecords: set[veilid.TypedKey] = set() + # All the futures we are waiting for + futures = set() + # The record report state + recordreports: dict[veilid.TypedKey, veilid.DHTRecordReport] = dict() + + # Add initial futures with None key + for fut in init_futures: + async def _do_init_fut(fut): + return (None, await fut) + futures.add(asyncio.create_task(_do_init_fut(fut))) + + # Loop until all records are completed + while len(donerecords) != len(records): + + # Update the futures with inspects for unfinished records + for n, desc in enumerate(records): + if desc.key in donerecords or desc.key in futurerecords: + continue + async def _do_inspect(key: veilid.TypedKey): + return (key, await rc.inspect_dht_record(key, [])) + futures.add(asyncio.create_task(_do_inspect(desc.key))) + futurerecords.add(desc.key) + + # Wait for some futures to finish + done, futures = await asyncio.wait(futures, return_when = asyncio.FIRST_COMPLETED) + + # Process finished futures into the state + for rr_fut in done: + key: veilid.TypedKey + rr: veilid.DHTRecordReport + key, rr = await rr_fut + if key is not None: + futurerecords.remove(key) + + if len(rr.subkeys) == 1 and rr.subkeys[0] == (0, subkey_count-1) and veilid.ValueSeqNum.NONE not in rr.local_seqs and len(rr.offline_subkeys) == 0: + if key in recordreports: + del recordreports[key] + donerecords.add(key) + else: + recordreports[key] = rr + + # Re-render the state + if cur_lines != curses.LINES or cur_cols != curses.COLS: + cur_lines = curses.LINES + cur_cols = curses.COLS + win.move( + max(0, int(cur_lines/2) - int(HEIGHT/2)), + max(0, int(cur_cols/2) - int(WIDTH/2))) + win.border(0,0,0,0) + win.addstr(1, 1, "syncing records to the network", curses.color_pair(0)) + for n, rr in enumerate(records): + key = rr.key + win.addstr(n+2, GRAPHWIDTH+1, key, curses.color_pair(0)) + + if key in donerecords: + win.addstr(n+2, 1, " " * subkey_count, curses.color_pair(4)) + elif key in recordreports: + rr = recordreports[key] + win.addstr(n+2, 1, " " * subkey_count, curses.color_pair(1)) + for (a,b) in rr.subkeys: + for m in range(a, b+1): + if rr.local_seqs[m] != veilid.ValueSeqNum.NONE: + win.addstr(n+2, m+1, " ", curses.color_pair(2)) + for (a,b) in rr.offline_subkeys: + win.addstr(n+2, a+1, " " * (b-a+1), curses.color_pair(3)) + else: + win.addstr(n+2, 1, " " * subkey_count, curses.color_pair(1)) + + win.refresh() + + curses.endwin() + diff --git a/veilid-python/veilid/json_api.py b/veilid-python/veilid/json_api.py index 7b75d6ed..0a88051c 100644 --- a/veilid-python/veilid/json_api.py +++ b/veilid-python/veilid/json_api.py @@ -338,6 +338,12 @@ class _JsonVeilidAPI(VeilidAPI): async def new_custom_private_route( self, kinds: list[CryptoKind], stability: Stability, sequencing: Sequencing ) -> tuple[RouteId, bytes]: + assert isinstance(kinds, list) + for k in kinds: + assert isinstance(k, CryptoKind) + assert isinstance(stability, Stability) + assert isinstance(sequencing, Sequencing) + return NewPrivateRouteResult.from_json( raise_api_result( await self.send_ndjson_request( @@ -350,6 +356,8 @@ class _JsonVeilidAPI(VeilidAPI): ).to_tuple() async def import_remote_private_route(self, blob: bytes) -> RouteId: + assert isinstance(blob, bytes) + return RouteId( raise_api_result( await self.send_ndjson_request(Operation.IMPORT_REMOTE_PRIVATE_ROUTE, blob=blob) @@ -357,11 +365,16 @@ class _JsonVeilidAPI(VeilidAPI): ) async def release_private_route(self, route_id: RouteId): + assert isinstance(route_id, RouteId) + raise_api_result( await self.send_ndjson_request(Operation.RELEASE_PRIVATE_ROUTE, route_id=route_id) ) async def app_call_reply(self, call_id: OperationId, message: bytes): + assert isinstance(call_id, OperationId) + assert isinstance(message, bytes) + raise_api_result( await self.send_ndjson_request( Operation.APP_CALL_REPLY, call_id=call_id, message=message @@ -373,6 +386,9 @@ class _JsonVeilidAPI(VeilidAPI): return _JsonRoutingContext(self, rc_id) async def open_table_db(self, name: str, column_count: int) -> TableDb: + assert isinstance(name, str) + assert isinstance(column_count, int) + db_id = raise_api_result( await self.send_ndjson_request( Operation.OPEN_TABLE_DB, name=name, column_count=column_count @@ -381,11 +397,15 @@ class _JsonVeilidAPI(VeilidAPI): return _JsonTableDb(self, db_id) async def delete_table_db(self, name: str) -> bool: + assert isinstance(name, str) + return raise_api_result( await self.send_ndjson_request(Operation.DELETE_TABLE_DB, name=name) ) async def get_crypto_system(self, kind: CryptoKind) -> CryptoSystem: + assert isinstance(kind, CryptoKind) + cs_id = raise_api_result( await self.send_ndjson_request(Operation.GET_CRYPTO_SYSTEM, kind=kind) ) @@ -398,6 +418,13 @@ class _JsonVeilidAPI(VeilidAPI): async def verify_signatures( self, node_ids: list[TypedKey], data: bytes, signatures: list[TypedSignature] ) -> Optional[list[TypedKey]]: + assert isinstance(node_ids, list) + for ni in node_ids: + assert isinstance(ni, TypedKey) + assert isinstance(data, bytes) + for sig in signatures: + assert isinstance(sig, TypedSignature) + out = raise_api_result( await self.send_ndjson_request( Operation.VERIFY_SIGNATURES, @@ -418,6 +445,11 @@ class _JsonVeilidAPI(VeilidAPI): async def generate_signatures( self, data: bytes, key_pairs: list[TypedKeyPair] ) -> list[TypedSignature]: + assert isinstance(data, bytes) + assert isinstance(key_pairs, list) + for kp in key_pairs: + assert isinstance(kp, TypedKeyPair) + return list( map( lambda x: TypedSignature(x), @@ -430,6 +462,8 @@ class _JsonVeilidAPI(VeilidAPI): ) async def generate_key_pair(self, kind: CryptoKind) -> list[TypedKeyPair]: + assert isinstance(kind, CryptoKind) + return list( map( lambda x: TypedKeyPair(x), @@ -443,6 +477,7 @@ class _JsonVeilidAPI(VeilidAPI): return Timestamp(raise_api_result(await self.send_ndjson_request(Operation.NOW))) async def debug(self, command: str) -> str: + assert isinstance(command, str) return raise_api_result(await self.send_ndjson_request(Operation.DEBUG, command=command)) async def veilid_version_string(self) -> str: @@ -501,6 +536,8 @@ class _JsonRoutingContext(RoutingContext): self.done = True async def with_default_safety(self, release=True) -> Self: + assert isinstance(release, bool) + new_rc_id = raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -514,6 +551,9 @@ class _JsonRoutingContext(RoutingContext): return self.__class__(self.api, new_rc_id) async def with_safety(self, safety_selection: SafetySelection, release=True) -> Self: + assert isinstance(safety_selection, SafetySelection) + assert isinstance(release, bool) + new_rc_id = raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -528,6 +568,9 @@ class _JsonRoutingContext(RoutingContext): return self.__class__(self.api, new_rc_id) async def with_sequencing(self, sequencing: Sequencing, release=True) -> Self: + assert isinstance(sequencing, Sequencing) + assert isinstance(release, bool) + new_rc_id = raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -555,6 +598,9 @@ class _JsonRoutingContext(RoutingContext): ) ) async def app_call(self, target: TypedKey | RouteId, message: bytes) -> bytes: + assert isinstance(target, TypedKey) or isinstance(target, RouteId) + assert isinstance(message, bytes) + return urlsafe_b64decode_no_pad( raise_api_result( await self.api.send_ndjson_request( @@ -569,6 +615,9 @@ class _JsonRoutingContext(RoutingContext): ) async def app_message(self, target: TypedKey | RouteId, message: bytes): + assert isinstance(target, TypedKey) or isinstance(target, RouteId) + assert isinstance(message, bytes) + raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -583,6 +632,10 @@ class _JsonRoutingContext(RoutingContext): async def create_dht_record( self, schema: DHTSchema, owner: Optional[KeyPair] = None, kind: Optional[CryptoKind] = None ) -> DHTRecordDescriptor: + assert isinstance(schema, DHTSchema) + assert owner is None or isinstance(owner, KeyPair) + assert kind is None or isinstance(kind, CryptoKind) + return DHTRecordDescriptor.from_json( raise_api_result( await self.api.send_ndjson_request( @@ -600,6 +653,9 @@ class _JsonRoutingContext(RoutingContext): async def open_dht_record( self, key: TypedKey, writer: Optional[KeyPair] = None ) -> DHTRecordDescriptor: + assert isinstance(key, TypedKey) + assert writer is None or isinstance(writer, KeyPair) + return DHTRecordDescriptor.from_json( raise_api_result( await self.api.send_ndjson_request( @@ -614,6 +670,8 @@ class _JsonRoutingContext(RoutingContext): ) async def close_dht_record(self, key: TypedKey): + assert isinstance(key, TypedKey) + raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -625,6 +683,8 @@ class _JsonRoutingContext(RoutingContext): ) async def delete_dht_record(self, key: TypedKey): + assert isinstance(key, TypedKey) + raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -638,6 +698,10 @@ class _JsonRoutingContext(RoutingContext): async def get_dht_value( self, key: TypedKey, subkey: ValueSubkey, force_refresh: bool = False ) -> Optional[ValueData]: + assert isinstance(key, TypedKey) + assert isinstance(subkey, ValueSubkey) + assert isinstance(force_refresh, bool) + ret = raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -654,6 +718,11 @@ class _JsonRoutingContext(RoutingContext): async def set_dht_value( self, key: TypedKey, subkey: ValueSubkey, data: bytes, writer: Optional[KeyPair] = None ) -> Optional[ValueData]: + assert isinstance(key, TypedKey) + assert isinstance(subkey, ValueSubkey) + assert isinstance(data, bytes) + assert writer is None or isinstance(writer, KeyPair) + ret = raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -675,6 +744,15 @@ class _JsonRoutingContext(RoutingContext): expiration: Timestamp = 0, count: int = 0xFFFFFFFF, ) -> Timestamp: + assert isinstance(key, TypedKey) + assert isinstance(subkeys, list) + for s in subkeys: + assert isinstance(s, tuple) + assert isinstance(s[0], ValueSubkey) + assert isinstance(s[1], ValueSubkey) + assert isinstance(expiration, Timestamp) + assert isinstance(count, int) + return Timestamp( raise_api_result( await self.api.send_ndjson_request( @@ -693,6 +771,13 @@ class _JsonRoutingContext(RoutingContext): async def cancel_dht_watch( self, key: TypedKey, subkeys: list[tuple[ValueSubkey, ValueSubkey]] ) -> bool: + assert isinstance(key, TypedKey) + assert isinstance(subkeys, list) + for s in subkeys: + assert isinstance(s, tuple) + assert isinstance(s[0], ValueSubkey) + assert isinstance(s[1], ValueSubkey) + return raise_api_result( await self.api.send_ndjson_request( Operation.ROUTING_CONTEXT, @@ -710,6 +795,14 @@ class _JsonRoutingContext(RoutingContext): subkeys: list[tuple[ValueSubkey, ValueSubkey]], scope: DHTReportScope = DHTReportScope.LOCAL, ) -> DHTRecordReport: + assert isinstance(key, TypedKey) + assert isinstance(subkeys, list) + for s in subkeys: + assert isinstance(s, tuple) + assert isinstance(s[0], ValueSubkey) + assert isinstance(s[1], ValueSubkey) + assert isinstance(scope, DHTReportScope) + return DHTRecordReport.from_json( raise_api_result( await self.api.send_ndjson_request( @@ -790,6 +883,10 @@ class _JsonTableDbTransaction(TableDbTransaction): self.done = True async def store(self, key: bytes, value: bytes, col: int = 0): + assert isinstance(key, bytes) + assert isinstance(value, bytes) + assert isinstance(col, int) + await self.api.send_ndjson_request( Operation.TABLE_DB_TRANSACTION, validate=validate_tx_op, @@ -801,6 +898,9 @@ class _JsonTableDbTransaction(TableDbTransaction): ) async def delete(self, key: bytes, col: int = 0): + assert isinstance(key, bytes) + assert isinstance(col, int) + await self.api.send_ndjson_request( Operation.TABLE_DB_TRANSACTION, validate=validate_tx_op, @@ -866,6 +966,8 @@ class _JsonTableDb(TableDb): ) async def get_keys(self, col: int = 0) -> list[bytes]: + assert isinstance(col, int) + return list( map( lambda x: urlsafe_b64decode_no_pad(x), @@ -893,6 +995,10 @@ class _JsonTableDb(TableDb): return _JsonTableDbTransaction(self.api, tx_id) async def store(self, key: bytes, value: bytes, col: int = 0): + assert isinstance(key, bytes) + assert isinstance(value, bytes) + assert isinstance(col, int) + return raise_api_result( await self.api.send_ndjson_request( Operation.TABLE_DB, @@ -906,6 +1012,9 @@ class _JsonTableDb(TableDb): ) async def load(self, key: bytes, col: int = 0) -> Optional[bytes]: + assert isinstance(key, bytes) + assert isinstance(col, int) + res = raise_api_result( await self.api.send_ndjson_request( Operation.TABLE_DB, @@ -919,6 +1028,9 @@ class _JsonTableDb(TableDb): return None if res is None else urlsafe_b64decode_no_pad(res) async def delete(self, key: bytes, col: int = 0) -> Optional[bytes]: + assert isinstance(key, bytes) + assert isinstance(col, int) + res = raise_api_result( await self.api.send_ndjson_request( Operation.TABLE_DB, @@ -989,6 +1101,9 @@ class _JsonCryptoSystem(CryptoSystem): self.done = True async def cached_dh(self, key: PublicKey, secret: SecretKey) -> SharedSecret: + assert isinstance(key, PublicKey) + assert isinstance(secret, SecretKey) + return SharedSecret( raise_api_result( await self.api.send_ndjson_request( @@ -1003,6 +1118,9 @@ class _JsonCryptoSystem(CryptoSystem): ) async def compute_dh(self, key: PublicKey, secret: SecretKey) -> SharedSecret: + assert isinstance(key, PublicKey) + assert isinstance(secret, SecretKey) + return SharedSecret( raise_api_result( await self.api.send_ndjson_request( @@ -1017,6 +1135,10 @@ class _JsonCryptoSystem(CryptoSystem): ) async def generate_shared_secret(self, key: PublicKey, secret: SecretKey, domain: bytes) -> SharedSecret: + assert isinstance(key, PublicKey) + assert isinstance(secret, SecretKey) + assert isinstance(domain, bytes) + return SharedSecret( raise_api_result( await self.api.send_ndjson_request( @@ -1032,6 +1154,8 @@ class _JsonCryptoSystem(CryptoSystem): ) async def random_bytes(self, len: int) -> bytes: + assert isinstance(len, int) + return urlsafe_b64decode_no_pad( raise_api_result( await self.api.send_ndjson_request( @@ -1055,6 +1179,9 @@ class _JsonCryptoSystem(CryptoSystem): ) async def hash_password(self, password: bytes, salt: bytes) -> str: + assert isinstance(password, bytes) + assert isinstance(salt, bytes) + return raise_api_result( await self.api.send_ndjson_request( Operation.CRYPTO_SYSTEM, @@ -1067,6 +1194,9 @@ class _JsonCryptoSystem(CryptoSystem): ) async def verify_password(self, password: bytes, password_hash: str) -> bool: + assert isinstance(password, bytes) + assert isinstance(password_hash, str) + return raise_api_result( await self.api.send_ndjson_request( Operation.CRYPTO_SYSTEM, @@ -1079,6 +1209,9 @@ class _JsonCryptoSystem(CryptoSystem): ) async def derive_shared_secret(self, password: bytes, salt: bytes) -> SharedSecret: + assert isinstance(password, bytes) + assert isinstance(salt, bytes) + return SharedSecret( raise_api_result( await self.api.send_ndjson_request( @@ -1129,6 +1262,8 @@ class _JsonCryptoSystem(CryptoSystem): ) async def generate_hash(self, data: bytes) -> HashDigest: + assert isinstance(data, bytes) + return HashDigest( raise_api_result( await self.api.send_ndjson_request( @@ -1142,6 +1277,9 @@ class _JsonCryptoSystem(CryptoSystem): ) async def validate_key_pair(self, key: PublicKey, secret: SecretKey) -> bool: + assert isinstance(key, PublicKey) + assert isinstance(secret, SecretKey) + return raise_api_result( await self.api.send_ndjson_request( Operation.CRYPTO_SYSTEM, @@ -1154,6 +1292,9 @@ class _JsonCryptoSystem(CryptoSystem): ) async def validate_hash(self, data: bytes, hash_digest: HashDigest) -> bool: + assert isinstance(data, bytes) + assert isinstance(hash_digest, HashDigest) + return raise_api_result( await self.api.send_ndjson_request( Operation.CRYPTO_SYSTEM, @@ -1166,6 +1307,9 @@ class _JsonCryptoSystem(CryptoSystem): ) async def distance(self, key1: CryptoKey, key2: CryptoKey) -> CryptoKeyDistance: + assert isinstance(key1, CryptoKey) + assert isinstance(key2, CryptoKey) + return CryptoKeyDistance( raise_api_result( await self.api.send_ndjson_request( @@ -1180,6 +1324,10 @@ class _JsonCryptoSystem(CryptoSystem): ) async def sign(self, key: PublicKey, secret: SecretKey, data: bytes) -> Signature: + assert isinstance(key, PublicKey) + assert isinstance(secret, SecretKey) + assert isinstance(data, bytes) + return Signature( raise_api_result( await self.api.send_ndjson_request( @@ -1195,6 +1343,10 @@ class _JsonCryptoSystem(CryptoSystem): ) async def verify(self, key: PublicKey, data: bytes, signature: Signature): + assert isinstance(key, PublicKey) + assert isinstance(data, bytes) + assert isinstance(signature, Signature) + return raise_api_result( await self.api.send_ndjson_request( Operation.CRYPTO_SYSTEM, @@ -1224,6 +1376,11 @@ class _JsonCryptoSystem(CryptoSystem): shared_secret: SharedSecret, associated_data: Optional[bytes], ) -> bytes: + assert isinstance(body, bytes) + assert isinstance(nonce, Nonce) + assert isinstance(shared_secret, SharedSecret) + assert associated_data is None or isinstance(associated_data, bytes) + return urlsafe_b64decode_no_pad( raise_api_result( await self.api.send_ndjson_request( @@ -1246,6 +1403,11 @@ class _JsonCryptoSystem(CryptoSystem): shared_secret: SharedSecret, associated_data: Optional[bytes], ) -> bytes: + assert isinstance(body, bytes) + assert isinstance(nonce, Nonce) + assert isinstance(shared_secret, SharedSecret) + assert associated_data is None or isinstance(associated_data, bytes) + return urlsafe_b64decode_no_pad( raise_api_result( await self.api.send_ndjson_request( @@ -1262,6 +1424,9 @@ class _JsonCryptoSystem(CryptoSystem): ) async def crypt_no_auth(self, body: bytes, nonce: Nonce, shared_secret: SharedSecret) -> bytes: + assert isinstance(body, bytes) + assert isinstance(nonce, Nonce) + assert isinstance(shared_secret, SharedSecret) return urlsafe_b64decode_no_pad( raise_api_result( await self.api.send_ndjson_request( diff --git a/veilid-python/veilid/types.py b/veilid-python/veilid/types.py index 8ebad0fa..8ec6a782 100644 --- a/veilid-python/veilid/types.py +++ b/veilid-python/veilid/types.py @@ -2,7 +2,7 @@ import base64 import json from enum import StrEnum from functools import total_ordering -from typing import Any, Optional, Self, Tuple +from typing import Any, Optional, Self #################################################################### @@ -122,6 +122,7 @@ class EncodedString(str): @classmethod def from_bytes(cls, b: bytes) -> Self: + assert isinstance(b, bytes) return cls(urlsafe_b64encode_no_pad(b)) @@ -160,6 +161,8 @@ class Nonce(EncodedString): class KeyPair(str): @classmethod def from_parts(cls, key: PublicKey, secret: SecretKey) -> Self: + assert isinstance(key, PublicKey) + assert isinstance(secret, SecretKey) return cls(f"{key}:{secret}") def key(self) -> PublicKey: @@ -168,7 +171,7 @@ class KeyPair(str): def secret(self) -> SecretKey: return SecretKey(self.split(":", 1)[1]) - def to_parts(self) -> Tuple[PublicKey, SecretKey]: + def to_parts(self) -> tuple[PublicKey, SecretKey]: public, secret = self.split(":", 1) return (PublicKey(public), SecretKey(secret)) @@ -188,6 +191,8 @@ class CryptoTyped(str): class TypedKey(CryptoTyped): @classmethod def from_value(cls, kind: CryptoKind, value: PublicKey) -> Self: + assert isinstance(kind, CryptoKind) + assert isinstance(value, PublicKey) return cls(f"{kind}:{value}") def value(self) -> PublicKey: @@ -197,6 +202,8 @@ class TypedKey(CryptoTyped): class TypedSecret(CryptoTyped): @classmethod def from_value(cls, kind: CryptoKind, value: SecretKey) -> Self: + assert isinstance(kind, CryptoKind) + assert isinstance(value, SecretKey) return cls(f"{kind}:{value}") def value(self) -> SecretKey: @@ -206,6 +213,8 @@ class TypedSecret(CryptoTyped): class TypedKeyPair(CryptoTyped): @classmethod def from_value(cls, kind: CryptoKind, value: KeyPair) -> Self: + assert isinstance(kind, CryptoKind) + assert isinstance(value, KeyPair) return cls(f"{kind}:{value}") def value(self) -> KeyPair: @@ -215,6 +224,8 @@ class TypedKeyPair(CryptoTyped): class TypedSignature(CryptoTyped): @classmethod def from_value(cls, kind: CryptoKind, value: Signature) -> Self: + assert isinstance(kind, CryptoKind) + assert isinstance(value, Signature) return cls(f"{kind}:{value}") def value(self) -> Signature: @@ -226,7 +237,7 @@ class ValueSubkey(int): class ValueSeqNum(int): - pass + NONE = 4294967295 #################################################################### @@ -284,10 +295,13 @@ class NewPrivateRouteResult: blob: bytes def __init__(self, route_id: RouteId, blob: bytes): + assert isinstance(route_id, RouteId) + assert isinstance(blob, bytes) + self.route_id = route_id self.blob = blob - def to_tuple(self) -> Tuple[RouteId, bytes]: + def to_tuple(self) -> tuple[RouteId, bytes]: return (self.route_id, self.blob) @classmethod @@ -300,6 +314,9 @@ class DHTSchemaSMPLMember: m_cnt: int def __init__(self, m_key: PublicKey, m_cnt: int): + assert isinstance(m_key, PublicKey) + assert isinstance(m_cnt, int) + self.m_key = m_key self.m_cnt = m_cnt @@ -321,10 +338,15 @@ class DHTSchema: @classmethod def dflt(cls, o_cnt: int) -> Self: + assert isinstance(o_cnt, int) return cls(DHTSchemaKind.DFLT, o_cnt=o_cnt) @classmethod def smpl(cls, o_cnt: int, members: list[DHTSchemaSMPLMember]) -> Self: + assert isinstance(o_cnt, int) + assert isinstance(members, list) + for m in members: + assert isinstance(m, DHTSchemaSMPLMember) return cls(DHTSchemaKind.SMPL, o_cnt=o_cnt, members=members) @classmethod @@ -404,8 +426,8 @@ class DHTRecordReport: @classmethod def from_json(cls, j: dict) -> Self: return cls( - [[p[0], p[1]] for p in j["subkeys"]], - [[p[0], p[1]] for p in j["offline_subkeys"]], + [(p[0], p[1]) for p in j["subkeys"]], + [(p[0], p[1]) for p in j["offline_subkeys"]], [ValueSeqNum(s) for s in j["local_seqs"]], [ValueSeqNum(s) for s in j["network_seqs"]], ) From 6018d385e84143309a064643dfcd5713a77be29d Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Tue, 11 Mar 2025 13:30:12 -0400 Subject: [PATCH 2/3] [ci skip] fix veilid_api duration logging, add keyed spans to veilid-api tracing so they show up --- veilid-core/src/veilid_api/api.rs | 59 ++++++++------- veilid-core/src/veilid_api/routing_context.rs | 75 ++++++++++--------- veilid-python/tests/test_dht.py | 2 + 3 files changed, 77 insertions(+), 59 deletions(-) diff --git a/veilid-core/src/veilid_api/api.rs b/veilid-core/src/veilid_api/api.rs index e0a0790d..5e1fb4bb 100644 --- a/veilid-core/src/veilid_api/api.rs +++ b/veilid-core/src/veilid_api/api.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("veilid_api"); + ///////////////////////////////////////////////////////////////////////////////////////////////////// pub(super) struct VeilidAPIInner { @@ -41,10 +43,9 @@ pub struct VeilidAPI { } impl VeilidAPI { - #[instrument(target = "veilid_api", level = "debug", skip_all)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = context.log_key()), skip_all)] pub(crate) fn new(context: VeilidCoreContext) -> Self { - event!(target: "veilid_api", Level::DEBUG, - "VeilidAPI::new()"); + veilid_log!(context debug "VeilidAPI::new()"); Self { inner: Arc::new(Mutex::new(VeilidAPIInner { context: Some(context), @@ -59,10 +60,9 @@ impl VeilidAPI { } /// Shut down Veilid and terminate the API. - #[instrument(target = "veilid_api", level = "debug", skip_all)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip_all)] pub async fn shutdown(self) { - event!(target: "veilid_api", Level::DEBUG, - "VeilidAPI::shutdown()"); + veilid_log!(self debug "VeilidAPI::shutdown()"); let context = { self.inner.lock().context.take() }; if let Some(context) = context { api_shutdown(context).await; @@ -152,6 +152,15 @@ impl VeilidAPI { callback(&mut inner.debug_cache) } + #[must_use] + pub(crate) fn log_key(&self) -> &str { + let inner = self.inner.lock(); + let Some(context) = &inner.context else { + return ""; + }; + context.log_key() + } + //////////////////////////////////////////////////////////////// // Attach/Detach @@ -174,9 +183,9 @@ impl VeilidAPI { } /// Connect to the network. - #[instrument(target = "veilid_api", level = "debug", skip_all, ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip_all, ret, err)] pub async fn attach(&self) -> VeilidAPIResult<()> { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "VeilidAPI::attach()"); let attachment_manager = self.core_context()?.attachment_manager(); @@ -187,9 +196,9 @@ impl VeilidAPI { } /// Disconnect from the network. - #[instrument(target = "veilid_api", level = "debug", skip_all, ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip_all, ret, err)] pub async fn detach(&self) -> VeilidAPIResult<()> { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "VeilidAPI::detach()"); let attachment_manager = self.core_context()?.attachment_manager(); @@ -203,9 +212,9 @@ impl VeilidAPI { // Routing Context /// Get a new `RoutingContext` object to use to send messages over the Veilid network with default safety, sequencing, and stability parameters. - #[instrument(target = "veilid_api", level = "debug", skip_all, err, ret)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip_all, err, ret)] pub fn routing_context(&self) -> VeilidAPIResult { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "VeilidAPI::routing_context()"); RoutingContext::try_new(self.clone()) @@ -218,11 +227,11 @@ impl VeilidAPI { /// `VLD0:XmnGyJrjMJBRC5ayJZRPXWTBspdX36-pbLb98H3UMeE` but if the prefix is left off /// `XmnGyJrjMJBRC5ayJZRPXWTBspdX36-pbLb98H3UMeE` will be parsed with the 'best' cryptosystem /// available (at the time of this writing this is `VLD0`). - #[instrument(target = "veilid_api", level = "debug", skip(self), fields(s=s.to_string()), ret, err)] + #[instrument(target = "veilid_api", level = "debug", skip(self), fields(__VEILID_LOG_KEY = self.log_key(), s=s.to_string()), ret, err)] pub fn parse_as_target(&self, s: S) -> VeilidAPIResult { let s = s.to_string(); - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "VeilidAPI::parse_as_target(s: {:?})", s); // Is this a route id? @@ -272,14 +281,14 @@ impl VeilidAPI { /// /// Returns a route id and 'blob' that can be published over some means (DHT or otherwise) to be /// imported by another Veilid node. - #[instrument(target = "veilid_api", level = "debug", skip(self), ret)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip(self), ret)] pub async fn new_custom_private_route( &self, crypto_kinds: &[CryptoKind], stability: Stability, sequencing: Sequencing, ) -> VeilidAPIResult<(RouteId, Vec)> { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "VeilidAPI::new_custom_private_route(crypto_kinds: {:?}, stability: {:?}, sequencing: {:?})", crypto_kinds, stability, @@ -336,9 +345,9 @@ impl VeilidAPI { /// Import a private route blob as a remote private route. /// /// Returns a route id that can be used to send private messages to the node creating this route. - #[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip(self), ret, err)] pub fn import_remote_private_route(&self, blob: Vec) -> VeilidAPIResult { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "VeilidAPI::import_remote_private_route(blob: {:?})", blob); let routing_table = self.core_context()?.routing_table(); let rss = routing_table.route_spec_store(); @@ -349,9 +358,9 @@ impl VeilidAPI { /// /// This will deactivate the route and free its resources and it can no longer be sent to /// or received from. - #[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip(self), ret, err)] pub fn release_private_route(&self, route_id: RouteId) -> VeilidAPIResult<()> { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "VeilidAPI::release_private_route(route_id: {:?})", route_id); let routing_table = self.core_context()?.routing_table(); let rss = routing_table.route_spec_store(); @@ -368,13 +377,13 @@ impl VeilidAPI { /// /// * `call_id` - specifies which call to reply to, and it comes from a [VeilidUpdate::AppCall], specifically the [VeilidAppCall::id()] value. /// * `message` - is an answer blob to be returned by the remote node's [RoutingContext::app_call()] function, and may be up to 32768 bytes. - #[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip(self), ret, err)] pub async fn app_call_reply( &self, call_id: OperationId, message: Vec, ) -> VeilidAPIResult<()> { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "VeilidAPI::app_call_reply(call_id: {:?}, message: {:?})", call_id, message); let rpc_processor = self.core_context()?.rpc_processor(); @@ -387,7 +396,7 @@ impl VeilidAPI { // Tunnel Building #[cfg(feature = "unstable-tunnels")] - #[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip(self), ret, err)] pub async fn start_tunnel( &self, _endpoint_mode: TunnelMode, @@ -397,7 +406,7 @@ impl VeilidAPI { } #[cfg(feature = "unstable-tunnels")] - #[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip(self), ret, err)] pub async fn complete_tunnel( &self, _endpoint_mode: TunnelMode, @@ -408,7 +417,7 @@ impl VeilidAPI { } #[cfg(feature = "unstable-tunnels")] - #[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip(self), ret, err)] pub async fn cancel_tunnel(&self, _tunnel_id: TunnelId) -> VeilidAPIResult { panic!("unimplemented"); } diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index 5e212bf4..07d90a85 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -1,5 +1,7 @@ use super::*; +impl_veilid_log_facility!("veilid_api"); + /////////////////////////////////////////////////////////////////////////////////////// /// Valid destinations for a message sent over a routing context. @@ -62,6 +64,11 @@ impl RoutingContext { }) } + #[must_use] + pub(crate) fn log_key(&self) -> &str { + self.api.log_key() + } + /// Turn on sender privacy, enabling the use of safety routes. This is the default and /// calling this function is only necessary if you have previously disable safety or used other parameters. /// @@ -72,9 +79,9 @@ impl RoutingContext { /// * Sequencing default is to prefer ordered before unordered message delivery. /// /// To customize the safety selection in use, use [RoutingContext::with_safety()]. - #[instrument(target = "veilid_api", level = "debug", ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)] pub fn with_default_safety(self) -> VeilidAPIResult { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "RoutingContext::with_default_safety(self: {:?})", self); let config = self.api.config()?; @@ -89,9 +96,9 @@ impl RoutingContext { } /// Use a custom [SafetySelection]. Can be used to disable safety via [SafetySelection::Unsafe]. - #[instrument(target = "veilid_api", level = "debug", ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)] pub fn with_safety(self, safety_selection: SafetySelection) -> VeilidAPIResult { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "RoutingContext::with_safety(self: {:?}, safety_selection: {:?})", self, safety_selection); Ok(Self { @@ -101,9 +108,9 @@ impl RoutingContext { } /// Use a specified [Sequencing] preference, with or without privacy. - #[instrument(target = "veilid_api", level = "debug", ret)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret)] pub fn with_sequencing(self, sequencing: Sequencing) -> Self { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "RoutingContext::with_sequencing(self: {:?}, sequencing: {:?})", self, sequencing); Self { @@ -140,9 +147,9 @@ impl RoutingContext { self.api.clone() } - #[instrument(target = "veilid_api", level = "debug", ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)] async fn get_destination(&self, target: Target) -> VeilidAPIResult { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "RoutingContext::get_destination(self: {:?}, target: {:?})", self, target); let rpc_processor = self.api.core_context()?.rpc_processor(); @@ -165,9 +172,9 @@ impl RoutingContext { /// * `message` - an arbitrary message blob of up to 32768 bytes. /// /// Returns an answer blob of up to 32768 bytes. - #[instrument(target = "veilid_api", level = "debug", ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)] pub async fn app_call(&self, target: Target, message: Vec) -> VeilidAPIResult> { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "RoutingContext::app_call(self: {:?}, target: {:?}, message: {:?})", self, target, message); let rpc_processor = self.api.core_context()?.rpc_processor(); @@ -199,9 +206,9 @@ impl RoutingContext { /// /// * `target` - can be either a direct node id or a private route. /// * `message` - an arbitrary message blob of up to 32768 bytes. - #[instrument(target = "veilid_api", level = "debug", ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)] pub async fn app_message(&self, target: Target, message: Vec) -> VeilidAPIResult<()> { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "RoutingContext::app_message(self: {:?}, target: {:?}, message: {:?})", self, target, message); let rpc_processor = self.api.core_context()?.rpc_processor(); @@ -230,14 +237,14 @@ impl RoutingContext { /// DHT Records /// Deterministicly builds the record key for a given schema and owner public key - #[instrument(target = "veilid_api", level = "debug", ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)] pub fn get_dht_record_key( &self, schema: DHTSchema, owner_key: &PublicKey, kind: Option, ) -> VeilidAPIResult { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "RoutingContext::get_dht_record_key(self: {:?}, schema: {:?}, owner_key: {:?}, kind: {:?})", self, schema, owner_key, kind); schema.validate()?; @@ -256,14 +263,14 @@ impl RoutingContext { /// Returns the newly allocated DHT record's key if successful. /// /// Note: if you pass in an owner keypair this call is a deterministic! This means that if you try to create a new record for a given owner and schema that already exists it *will* fail. - #[instrument(target = "veilid_api", level = "debug", ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)] pub async fn create_dht_record( &self, schema: DHTSchema, owner: Option, kind: Option, ) -> VeilidAPIResult { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "RoutingContext::create_dht_record(self: {:?}, schema: {:?}, owner: {:?}, kind: {:?})", self, schema, owner, kind); schema.validate()?; @@ -291,13 +298,13 @@ impl RoutingContext { /// safety selection. /// /// Returns the DHT record descriptor for the opened record if successful. - #[instrument(target = "veilid_api", level = "debug", ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)] pub async fn open_dht_record( &self, key: TypedKey, default_writer: Option, ) -> VeilidAPIResult { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "RoutingContext::open_dht_record(self: {:?}, key: {:?}, default_writer: {:?})", self, key, default_writer); Crypto::validate_crypto_kind(key.kind)?; @@ -311,9 +318,9 @@ impl RoutingContext { /// Closes a DHT record at a specific key that was opened with create_dht_record or open_dht_record. /// /// Closing a record allows you to re-open it with a different routing context. - #[instrument(target = "veilid_api", level = "debug", ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)] pub async fn close_dht_record(&self, key: TypedKey) -> VeilidAPIResult<()> { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "RoutingContext::close_dht_record(self: {:?}, key: {:?})", self, key); Crypto::validate_crypto_kind(key.kind)?; @@ -327,9 +334,9 @@ impl RoutingContext { /// If the record is opened, it must be closed before it is deleted. /// Deleting a record does not delete it from the network, but will remove the storage of the record /// locally, and will prevent its value from being refreshed on the network by this node. - #[instrument(target = "veilid_api", level = "debug", ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)] pub async fn delete_dht_record(&self, key: TypedKey) -> VeilidAPIResult<()> { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "RoutingContext::delete_dht_record(self: {:?}, key: {:?})", self, key); Crypto::validate_crypto_kind(key.kind)?; @@ -344,14 +351,14 @@ impl RoutingContext { /// /// Returns `None` if the value subkey has not yet been set. /// Returns `Some(data)` if the value subkey has valid data. - #[instrument(target = "veilid_api", level = "debug", ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)] pub async fn get_dht_value( &self, key: TypedKey, subkey: ValueSubkey, force_refresh: bool, ) -> VeilidAPIResult> { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "RoutingContext::get_dht_value(self: {:?}, key: {:?}, subkey: {:?}, force_refresh: {:?})", self, key, subkey, force_refresh); Crypto::validate_crypto_kind(key.kind)?; @@ -367,7 +374,7 @@ impl RoutingContext { /// /// Returns `None` if the value was successfully put. /// Returns `Some(data)` if the value put was older than the one available on the network. - #[instrument(target = "veilid_api", level = "debug", skip(data), fields(data = print_data(&data, Some(64))), ret, err)] + #[instrument(target = "veilid_api", level = "debug", skip(data), fields(__VEILID_LOG_KEY = self.log_key(), data = print_data(&data, Some(64))), ret, err)] pub async fn set_dht_value( &self, key: TypedKey, @@ -375,7 +382,7 @@ impl RoutingContext { data: Vec, writer: Option, ) -> VeilidAPIResult> { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "RoutingContext::set_dht_value(self: {:?}, key: {:?}, subkey: {:?}, data: len={}, writer: {:?})", self, key, subkey, data.len(), writer); Crypto::validate_crypto_kind(key.kind)?; @@ -404,7 +411,7 @@ impl RoutingContext { /// * If a member (either the owner or a SMPL schema member) has opened the key for writing (even if no writing is performed) then the watch will be signed and guaranteed network.dht.member_watch_limit per writer. /// /// Members can be specified via the SMPL schema and do not need to allocate writable subkeys in order to offer a member watch capability. - #[instrument(target = "veilid_api", level = "debug", ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)] pub async fn watch_dht_values( &self, key: TypedKey, @@ -412,7 +419,7 @@ impl RoutingContext { expiration: Timestamp, count: u32, ) -> VeilidAPIResult { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "RoutingContext::watch_dht_values(self: {:?}, key: {:?}, subkeys: {:?}, expiration: {}, count: {})", self, key, subkeys, expiration, count); Crypto::validate_crypto_kind(key.kind)?; @@ -430,13 +437,13 @@ impl RoutingContext { /// /// Returns Ok(true) if there is any remaining watch for this record. /// Returns Ok(false) if the entire watch has been cancelled. - #[instrument(target = "veilid_api", level = "debug", ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)] pub async fn cancel_dht_watch( &self, key: TypedKey, subkeys: ValueSubkeyRangeSet, ) -> VeilidAPIResult { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "RoutingContext::cancel_dht_watch(self: {:?}, key: {:?}, subkeys: {:?}", self, key, subkeys); Crypto::validate_crypto_kind(key.kind)?; @@ -484,14 +491,14 @@ impl RoutingContext { /// Useful for determine which subkeys would change with an SetValue operation. /// /// Returns a DHTRecordReport with the subkey ranges that were returned that overlapped the schema, and sequence numbers for each of the subkeys in the range. - #[instrument(target = "veilid_api", level = "debug", ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)] pub async fn inspect_dht_record( &self, key: TypedKey, subkeys: ValueSubkeyRangeSet, scope: DHTReportScope, ) -> VeilidAPIResult { - event!(target: "veilid_api", Level::DEBUG, + veilid_log!(self debug "RoutingContext::inspect_dht_record(self: {:?}, key: {:?}, subkeys: {:?}, scope: {:?})", self, key, subkeys, scope); Crypto::validate_crypto_kind(key.kind)?; @@ -504,13 +511,13 @@ impl RoutingContext { /// Block Store #[cfg(feature = "unstable-blockstore")] - #[instrument(target = "veilid_api", level = "debug", ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)] pub async fn find_block(&self, _block_id: PublicKey) -> VeilidAPIResult> { panic!("unimplemented"); } #[cfg(feature = "unstable-blockstore")] - #[instrument(target = "veilid_api", level = "debug", ret, err)] + #[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)] pub async fn supply_block(&self, _block_id: PublicKey) -> VeilidAPIResult { panic!("unimplemented"); } diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py index 7cd46c62..7d9a2eae 100644 --- a/veilid-python/tests/test_dht.py +++ b/veilid-python/tests/test_dht.py @@ -824,6 +824,8 @@ async def sync_win( win.addstr(n+2, 1, " " * subkey_count, curses.color_pair(1)) win.refresh() + time.sleep(.5) + curses.endwin() From 3a8150c0627cbf5ca240762770db8d4bb18c6b87 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Tue, 11 Mar 2025 21:39:56 -0400 Subject: [PATCH 3/3] [ci skip] fix wasm unit tests --- Cargo.lock | 15 +------ veilid-core/Cargo.toml | 2 +- .../record_store/inspect_cache.rs | 4 +- .../src/table_store/tests/test_table_store.rs | 45 +++++++++++++++++++ veilid-core/tests/web.rs | 4 +- veilid-tools/Cargo.toml | 2 +- veilid-tools/tests/web.rs | 4 +- 7 files changed, 55 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f1b9df07..a72c3ffe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6090,17 +6090,6 @@ dependencies = [ "tracing-log 0.2.0", ] -[[package]] -name = "tracing-wasm" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4575c663a174420fa2d78f4108ff68f65bf2fbb7dd89f33749b6e826b3626e07" -dependencies = [ - "tracing", - "tracing-subscriber", - "wasm-bindgen", -] - [[package]] name = "triomphe" version = "0.1.14" @@ -6483,12 +6472,12 @@ dependencies = [ "tracing-error", "tracing-oslog", "tracing-subscriber", - "tracing-wasm", "tsify", "veilid-bugsalot", "veilid-hashlink", "veilid-igd", "veilid-tools", + "veilid-tracing-wasm", "wasm-bindgen", "wasm-bindgen-futures", "wasm-bindgen-test", @@ -6680,9 +6669,9 @@ dependencies = [ "tracing", "tracing-oslog", "tracing-subscriber", - "tracing-wasm", "validator", "veilid-bugsalot", + "veilid-tracing-wasm", "wasm-bindgen", "wasm-bindgen-futures", "wasm-bindgen-test", diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index df969792..e37c70dd 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -216,7 +216,6 @@ ws_stream_wasm = "0.7.4" # Logging wasm-logger = "0.2.0" -tracing-wasm = "0.2.1" # Data Structures keyvaluedb-web = "0.1.2" @@ -272,6 +271,7 @@ serial_test = { version = "2.0.0", default-features = false, features = [ wasm-bindgen-test = "0.3.50" console_error_panic_hook = "0.1.7" wasm-logger = "0.2.0" +veilid-tracing-wasm = "^0" ### BUILD OPTIONS diff --git a/veilid-core/src/storage_manager/record_store/inspect_cache.rs b/veilid-core/src/storage_manager/record_store/inspect_cache.rs index 12e2136e..01e28f5f 100644 --- a/veilid-core/src/storage_manager/record_store/inspect_cache.rs +++ b/veilid-core/src/storage_manager/record_store/inspect_cache.rs @@ -68,9 +68,9 @@ impl InspectCache { }; if idx < entry.1.seqs.len() { entry.1.seqs[idx] = seq; - } else if idx > entry.1.seqs.len() { + } else { panic!( - "representational error in l2 inspect cache: {} > {}", + "representational error in l2 inspect cache: {} >= {}", idx, entry.1.seqs.len() ) diff --git a/veilid-core/src/table_store/tests/test_table_store.rs b/veilid-core/src/table_store/tests/test_table_store.rs index edf72ae0..207b9e48 100644 --- a/veilid-core/src/table_store/tests/test_table_store.rs +++ b/veilid-core/src/table_store/tests/test_table_store.rs @@ -1,5 +1,6 @@ use crate::tests::test_veilid_config::*; use crate::*; +use futures_util::StreamExt as _; async fn startup() -> VeilidAPI { trace!("test_table_store: starting"); @@ -266,11 +267,55 @@ pub async fn test_protect_unprotect(vcrypto: &AsyncCryptoSystemGuard<'_>, ts: &T } } +pub async fn test_store_load_json_many(ts: &TableStore) { + trace!("test_json"); + + let _ = ts.delete("test").await; + let db = ts.open("test", 3).await.expect("should have opened"); + + let rows = 16; + let valuesize = 32768; + let parallel = 10; + + let value = vec!["ABCD".to_string(); valuesize]; + + let mut unord = FuturesUnordered::new(); + + let mut r = 0; + let start_ts = Timestamp::now(); + loop { + while r < rows && unord.len() < parallel { + let key = format!("key_{}", r); + r += 1; + + unord.push(Box::pin(async { + let key = key; + db.store_json(0, key.as_bytes(), &value) + .await + .expect("should store"); + let value2 = db + .load_json::>(0, key.as_bytes()) + .await + .expect("should load") + .expect("should exist"); + assert_eq!(value, value2); + })); + } + if unord.next().await.is_none() { + break; + } + } + let end_ts = Timestamp::now(); + trace!("test_store_load_json_many duration={}", (end_ts - start_ts)); +} + pub async fn test_all() { let api = startup().await; let crypto = api.crypto().unwrap(); let ts = api.table_store().unwrap(); + test_store_load_json_many(&ts).await; + for ck in VALID_CRYPTO_KINDS { let vcrypto = crypto.get_async(ck).unwrap(); test_protect_unprotect(&vcrypto, &ts).await; diff --git a/veilid-core/tests/web.rs b/veilid-core/tests/web.rs index 2bbfbac5..ff142536 100644 --- a/veilid-core/tests/web.rs +++ b/veilid-core/tests/web.rs @@ -18,8 +18,8 @@ pub fn setup() -> () { let config = veilid_tracing_wasm::WASMLayerConfig::new() .with_report_logs_in_timings(false) .with_max_level(Level::TRACE) - .with_console_config(tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor); - tracing_wasm::set_as_global_default_with_config(config); + .with_console_config(veilid_tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor); + veilid_tracing_wasm::set_as_global_default_with_config(config); }); } diff --git a/veilid-tools/Cargo.toml b/veilid-tools/Cargo.toml index 60a4fde7..68c6d5cb 100644 --- a/veilid-tools/Cargo.toml +++ b/veilid-tools/Cargo.toml @@ -185,7 +185,7 @@ serial_test = { version = "2.0.0", default-features = false, features = [ console_error_panic_hook = "0.1.7" wasm-bindgen-test = "0.3.50" wasm-logger = "0.2.0" -tracing-wasm = { version = "0.2.1" } +veilid-tracing-wasm = "^0" ### BUILD OPTIONS diff --git a/veilid-tools/tests/web.rs b/veilid-tools/tests/web.rs index f54309fd..0ff48b5b 100644 --- a/veilid-tools/tests/web.rs +++ b/veilid-tools/tests/web.rs @@ -18,8 +18,8 @@ pub fn setup() -> () { let config = veilid_tracing_wasm::WASMLayerConfig::new() .with_report_logs_in_timings(false); .with_max_level(Level::TRACE); - .with_console_config(tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor); - tracing_wasm::set_as_global_default_with_config(config); + .with_console_config(veilid_tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor); + veilid_tracing_wasm::set_as_global_default_with_config(config); } else { wasm_logger::init(wasm_logger::Config::default()); }