Merge branch 'dht-performance' into 'main'

validate types for python api calls

See merge request veilid/veilid!366
This commit is contained in:
Christien Rioux 2025-03-13 10:43:10 -04:00
commit cf6dfe375d
14 changed files with 631 additions and 170 deletions

15
Cargo.lock generated
View File

@ -6090,17 +6090,6 @@ dependencies = [
"tracing-log 0.2.0",
]
[[package]]
name = "tracing-wasm"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4575c663a174420fa2d78f4108ff68f65bf2fbb7dd89f33749b6e826b3626e07"
dependencies = [
"tracing",
"tracing-subscriber",
"wasm-bindgen",
]
[[package]]
name = "triomphe"
version = "0.1.14"
@ -6483,12 +6472,12 @@ dependencies = [
"tracing-error",
"tracing-oslog",
"tracing-subscriber",
"tracing-wasm",
"tsify",
"veilid-bugsalot",
"veilid-hashlink",
"veilid-igd",
"veilid-tools",
"veilid-tracing-wasm",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-bindgen-test",
@ -6680,9 +6669,9 @@ dependencies = [
"tracing",
"tracing-oslog",
"tracing-subscriber",
"tracing-wasm",
"validator",
"veilid-bugsalot",
"veilid-tracing-wasm",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-bindgen-test",

View File

@ -216,7 +216,6 @@ ws_stream_wasm = "0.7.4"
# Logging
wasm-logger = "0.2.0"
tracing-wasm = "0.2.1"
# Data Structures
keyvaluedb-web = "0.1.2"
@ -272,6 +271,7 @@ serial_test = { version = "2.0.0", default-features = false, features = [
wasm-bindgen-test = "0.3.50"
console_error_panic_hook = "0.1.7"
wasm-logger = "0.2.0"
veilid-tracing-wasm = "^0"
### BUILD OPTIONS

View File

@ -193,16 +193,13 @@ impl<S: Subscriber + for<'a> registry::LookupSpan<'a>> Layer<S> for ApiTracingLa
attrs.record(&mut new_debug_record);
if let Some(span_ref) = ctx.span(id) {
span_ref
.extensions_mut()
.insert::<VeilidKeyedStringRecorder>(new_debug_record);
let mut extensions_mut = span_ref.extensions_mut();
extensions_mut.insert::<VeilidKeyedStringRecorder>(new_debug_record);
if crate::DURATION_LOG_FACILITIES.contains(&attrs.metadata().target()) {
span_ref
.extensions_mut()
.insert::<SpanDuration>(SpanDuration {
start: Timestamp::now(),
end: Timestamp::default(),
});
extensions_mut.insert::<SpanDuration>(SpanDuration {
start: Timestamp::now(),
end: Timestamp::default(),
});
}
}
}
@ -213,14 +210,14 @@ impl<S: Subscriber + for<'a> registry::LookupSpan<'a>> Layer<S> for ApiTracingLa
return;
}
if let Some(span_ref) = ctx.span(&id) {
if let Some(span_duration) = span_ref.extensions_mut().get_mut::<SpanDuration>() {
let mut extensions_mut = span_ref.extensions_mut();
if let Some(span_duration) = extensions_mut.get_mut::<SpanDuration>() {
span_duration.end = Timestamp::now();
let duration = span_duration.end.saturating_sub(span_duration.start);
let meta = span_ref.metadata();
let mut extensions = span_ref.extensions_mut();
let log_key =
if let Some(span_ksr) = extensions.get_mut::<VeilidKeyedStringRecorder>() {
if let Some(span_ksr) = extensions_mut.get_mut::<VeilidKeyedStringRecorder>() {
span_ksr.log_key()
} else {
""
@ -254,10 +251,9 @@ impl<S: Subscriber + for<'a> registry::LookupSpan<'a>> Layer<S> for ApiTracingLa
return;
}
if let Some(span_ref) = ctx.span(id) {
if let Some(debug_record) = span_ref
.extensions_mut()
.get_mut::<VeilidKeyedStringRecorder>()
{
let mut extensions_mut = span_ref.extensions_mut();
if let Some(debug_record) = extensions_mut.get_mut::<VeilidKeyedStringRecorder>() {
values.record(debug_record);
}
}

View File

@ -59,6 +59,8 @@ struct StorageManagerInner {
pub remote_record_store: Option<RecordStore<RemoteRecordDetail>>,
/// Record subkeys that have not been pushed to the network because they were written to offline
pub offline_subkey_writes: HashMap<TypedKey, tasks::offline_subkey_writes::OfflineSubkeyWrite>,
/// Record subkeys that are currently being written to in the foreground
pub active_subkey_writes: HashMap<TypedKey, ValueSubkeyRangeSet>,
/// Storage manager metadata that is persistent, including copy of offline subkey writes
pub metadata_db: Option<TableDB>,
/// Background processing task (not part of attachment manager tick tree so it happens when detached too)
@ -73,6 +75,7 @@ impl fmt::Debug for StorageManagerInner {
.field("local_record_store", &self.local_record_store)
.field("remote_record_store", &self.remote_record_store)
.field("offline_subkey_writes", &self.offline_subkey_writes)
.field("active_subkey_writes", &self.active_subkey_writes)
//.field("metadata_db", &self.metadata_db)
//.field("tick_future", &self.tick_future)
.finish()
@ -736,7 +739,21 @@ impl StorageManager {
)
.await?;
if !self.dht_is_online() {
// Note that we are writing this subkey actively
// If it appears we are already doing this, then put it to the offline queue
let already_writing = {
let asw = inner.active_subkey_writes.entry(key).or_default();
if asw.contains(subkey) {
veilid_log!(self debug "Already writing to this subkey: {}:{}", key, subkey);
true
} else {
// Add to our list of active subkey writes
asw.insert(subkey);
false
}
};
if already_writing || !self.dht_is_online() {
veilid_log!(self debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() );
// Add to offline writes to flush
Self::add_offline_subkey_write_inner(&mut inner, key, subkey, safety_selection);
@ -764,41 +781,68 @@ impl StorageManager {
// Failed to write, try again later
let mut inner = self.inner.lock().await;
Self::add_offline_subkey_write_inner(&mut inner, key, subkey, safety_selection);
// Remove from active subkey writes
let asw = inner.active_subkey_writes.get_mut(&key).unwrap();
if !asw.remove(subkey) {
panic!("missing active subkey write: {}:{}", key, subkey);
}
if asw.is_empty() {
inner.active_subkey_writes.remove(&key);
}
return Err(e);
}
};
// Wait for the first result
let Ok(result) = res_rx.recv_async().await else {
apibail_internal!("failed to receive results");
let process = || async {
// Wait for the first result
let Ok(result) = res_rx.recv_async().await else {
apibail_internal!("failed to receive results");
};
let result = result?;
let partial = result.fanout_result.kind.is_partial();
// Process the returned result
let out = self
.process_outbound_set_value_result(
key,
subkey,
signed_value_data.value_data().clone(),
safety_selection,
result,
)
.await?;
// If there's more to process, do it in the background
if partial {
self.process_deferred_outbound_set_value_result(
res_rx,
key,
subkey,
out.clone()
.unwrap_or_else(|| signed_value_data.value_data().clone()),
safety_selection,
);
}
Ok(out)
};
let result = result?;
let partial = result.fanout_result.kind.is_partial();
// Process the returned result
let out = self
.process_outbound_set_value_result(
key,
subkey,
signed_value_data.value_data().clone(),
safety_selection,
result,
)
.await?;
let out = process().await;
// If there's more to process, do it in the background
if partial {
self.process_deferred_outbound_set_value_result(
res_rx,
key,
subkey,
out.clone()
.unwrap_or_else(|| signed_value_data.value_data().clone()),
safety_selection,
);
// Remove active subkey write
let mut inner = self.inner.lock().await;
// Remove from active subkey writes
let asw = inner.active_subkey_writes.get_mut(&key).unwrap();
if !asw.remove(subkey) {
panic!("missing active subkey write: {}:{}", key, subkey);
}
if asw.is_empty() {
inner.active_subkey_writes.remove(&key);
}
Ok(out)
out
}
/// Create,update or cancel an outbound watch to a DHT value
@ -1019,11 +1063,18 @@ impl StorageManager {
);
// Get the offline subkeys for this record still only returning the ones we're inspecting
// Merge in the currently offline in-flight records and the actively written records as well
let active_subkey_writes = inner
.active_subkey_writes
.get(&key)
.cloned()
.unwrap_or_default();
let offline_subkey_writes = inner
.offline_subkey_writes
.get(&key)
.map(|o| o.subkeys.union(&o.subkeys_in_flight))
.unwrap_or_default()
.union(&active_subkey_writes)
.intersect(&subkeys);
// If this is the maximum scope we're interested in, return the report

View File

@ -68,9 +68,9 @@ impl InspectCache {
};
if idx < entry.1.seqs.len() {
entry.1.seqs[idx] = seq;
} else if idx > entry.1.seqs.len() {
} else {
panic!(
"representational error in l2 inspect cache: {} > {}",
"representational error in l2 inspect cache: {} >= {}",
idx,
entry.1.seqs.len()
)

View File

@ -1,5 +1,6 @@
use crate::tests::test_veilid_config::*;
use crate::*;
use futures_util::StreamExt as _;
async fn startup() -> VeilidAPI {
trace!("test_table_store: starting");
@ -266,11 +267,55 @@ pub async fn test_protect_unprotect(vcrypto: &AsyncCryptoSystemGuard<'_>, ts: &T
}
}
pub async fn test_store_load_json_many(ts: &TableStore) {
trace!("test_json");
let _ = ts.delete("test").await;
let db = ts.open("test", 3).await.expect("should have opened");
let rows = 16;
let valuesize = 32768;
let parallel = 10;
let value = vec!["ABCD".to_string(); valuesize];
let mut unord = FuturesUnordered::new();
let mut r = 0;
let start_ts = Timestamp::now();
loop {
while r < rows && unord.len() < parallel {
let key = format!("key_{}", r);
r += 1;
unord.push(Box::pin(async {
let key = key;
db.store_json(0, key.as_bytes(), &value)
.await
.expect("should store");
let value2 = db
.load_json::<Vec<String>>(0, key.as_bytes())
.await
.expect("should load")
.expect("should exist");
assert_eq!(value, value2);
}));
}
if unord.next().await.is_none() {
break;
}
}
let end_ts = Timestamp::now();
trace!("test_store_load_json_many duration={}", (end_ts - start_ts));
}
pub async fn test_all() {
let api = startup().await;
let crypto = api.crypto().unwrap();
let ts = api.table_store().unwrap();
test_store_load_json_many(&ts).await;
for ck in VALID_CRYPTO_KINDS {
let vcrypto = crypto.get_async(ck).unwrap();
test_protect_unprotect(&vcrypto, &ts).await;

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("veilid_api");
/////////////////////////////////////////////////////////////////////////////////////////////////////
pub(super) struct VeilidAPIInner {
@ -41,10 +43,9 @@ pub struct VeilidAPI {
}
impl VeilidAPI {
#[instrument(target = "veilid_api", level = "debug", skip_all)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = context.log_key()), skip_all)]
pub(crate) fn new(context: VeilidCoreContext) -> Self {
event!(target: "veilid_api", Level::DEBUG,
"VeilidAPI::new()");
veilid_log!(context debug "VeilidAPI::new()");
Self {
inner: Arc::new(Mutex::new(VeilidAPIInner {
context: Some(context),
@ -59,10 +60,9 @@ impl VeilidAPI {
}
/// Shut down Veilid and terminate the API.
#[instrument(target = "veilid_api", level = "debug", skip_all)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip_all)]
pub async fn shutdown(self) {
event!(target: "veilid_api", Level::DEBUG,
"VeilidAPI::shutdown()");
veilid_log!(self debug "VeilidAPI::shutdown()");
let context = { self.inner.lock().context.take() };
if let Some(context) = context {
api_shutdown(context).await;
@ -152,6 +152,15 @@ impl VeilidAPI {
callback(&mut inner.debug_cache)
}
#[must_use]
pub(crate) fn log_key(&self) -> &str {
let inner = self.inner.lock();
let Some(context) = &inner.context else {
return "";
};
context.log_key()
}
////////////////////////////////////////////////////////////////
// Attach/Detach
@ -174,9 +183,9 @@ impl VeilidAPI {
}
/// Connect to the network.
#[instrument(target = "veilid_api", level = "debug", skip_all, ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip_all, ret, err)]
pub async fn attach(&self) -> VeilidAPIResult<()> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"VeilidAPI::attach()");
let attachment_manager = self.core_context()?.attachment_manager();
@ -187,9 +196,9 @@ impl VeilidAPI {
}
/// Disconnect from the network.
#[instrument(target = "veilid_api", level = "debug", skip_all, ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip_all, ret, err)]
pub async fn detach(&self) -> VeilidAPIResult<()> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"VeilidAPI::detach()");
let attachment_manager = self.core_context()?.attachment_manager();
@ -203,9 +212,9 @@ impl VeilidAPI {
// Routing Context
/// Get a new `RoutingContext` object to use to send messages over the Veilid network with default safety, sequencing, and stability parameters.
#[instrument(target = "veilid_api", level = "debug", skip_all, err, ret)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip_all, err, ret)]
pub fn routing_context(&self) -> VeilidAPIResult<RoutingContext> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"VeilidAPI::routing_context()");
RoutingContext::try_new(self.clone())
@ -218,11 +227,11 @@ impl VeilidAPI {
/// `VLD0:XmnGyJrjMJBRC5ayJZRPXWTBspdX36-pbLb98H3UMeE` but if the prefix is left off
/// `XmnGyJrjMJBRC5ayJZRPXWTBspdX36-pbLb98H3UMeE` will be parsed with the 'best' cryptosystem
/// available (at the time of this writing this is `VLD0`).
#[instrument(target = "veilid_api", level = "debug", skip(self), fields(s=s.to_string()), ret, err)]
#[instrument(target = "veilid_api", level = "debug", skip(self), fields(__VEILID_LOG_KEY = self.log_key(), s=s.to_string()), ret, err)]
pub fn parse_as_target<S: ToString>(&self, s: S) -> VeilidAPIResult<Target> {
let s = s.to_string();
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"VeilidAPI::parse_as_target(s: {:?})", s);
// Is this a route id?
@ -272,14 +281,14 @@ impl VeilidAPI {
///
/// Returns a route id and 'blob' that can be published over some means (DHT or otherwise) to be
/// imported by another Veilid node.
#[instrument(target = "veilid_api", level = "debug", skip(self), ret)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip(self), ret)]
pub async fn new_custom_private_route(
&self,
crypto_kinds: &[CryptoKind],
stability: Stability,
sequencing: Sequencing,
) -> VeilidAPIResult<(RouteId, Vec<u8>)> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"VeilidAPI::new_custom_private_route(crypto_kinds: {:?}, stability: {:?}, sequencing: {:?})",
crypto_kinds,
stability,
@ -336,9 +345,9 @@ impl VeilidAPI {
/// Import a private route blob as a remote private route.
///
/// Returns a route id that can be used to send private messages to the node creating this route.
#[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip(self), ret, err)]
pub fn import_remote_private_route(&self, blob: Vec<u8>) -> VeilidAPIResult<RouteId> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"VeilidAPI::import_remote_private_route(blob: {:?})", blob);
let routing_table = self.core_context()?.routing_table();
let rss = routing_table.route_spec_store();
@ -349,9 +358,9 @@ impl VeilidAPI {
///
/// This will deactivate the route and free its resources and it can no longer be sent to
/// or received from.
#[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip(self), ret, err)]
pub fn release_private_route(&self, route_id: RouteId) -> VeilidAPIResult<()> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"VeilidAPI::release_private_route(route_id: {:?})", route_id);
let routing_table = self.core_context()?.routing_table();
let rss = routing_table.route_spec_store();
@ -368,13 +377,13 @@ impl VeilidAPI {
///
/// * `call_id` - specifies which call to reply to, and it comes from a [VeilidUpdate::AppCall], specifically the [VeilidAppCall::id()] value.
/// * `message` - is an answer blob to be returned by the remote node's [RoutingContext::app_call()] function, and may be up to 32768 bytes.
#[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip(self), ret, err)]
pub async fn app_call_reply(
&self,
call_id: OperationId,
message: Vec<u8>,
) -> VeilidAPIResult<()> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"VeilidAPI::app_call_reply(call_id: {:?}, message: {:?})", call_id, message);
let rpc_processor = self.core_context()?.rpc_processor();
@ -387,7 +396,7 @@ impl VeilidAPI {
// Tunnel Building
#[cfg(feature = "unstable-tunnels")]
#[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip(self), ret, err)]
pub async fn start_tunnel(
&self,
_endpoint_mode: TunnelMode,
@ -397,7 +406,7 @@ impl VeilidAPI {
}
#[cfg(feature = "unstable-tunnels")]
#[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip(self), ret, err)]
pub async fn complete_tunnel(
&self,
_endpoint_mode: TunnelMode,
@ -408,7 +417,7 @@ impl VeilidAPI {
}
#[cfg(feature = "unstable-tunnels")]
#[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), skip(self), ret, err)]
pub async fn cancel_tunnel(&self, _tunnel_id: TunnelId) -> VeilidAPIResult<bool> {
panic!("unimplemented");
}

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("veilid_api");
///////////////////////////////////////////////////////////////////////////////////////
/// Valid destinations for a message sent over a routing context.
@ -62,6 +64,11 @@ impl RoutingContext {
})
}
#[must_use]
pub(crate) fn log_key(&self) -> &str {
self.api.log_key()
}
/// Turn on sender privacy, enabling the use of safety routes. This is the default and
/// calling this function is only necessary if you have previously disable safety or used other parameters.
///
@ -72,9 +79,9 @@ impl RoutingContext {
/// * Sequencing default is to prefer ordered before unordered message delivery.
///
/// To customize the safety selection in use, use [RoutingContext::with_safety()].
#[instrument(target = "veilid_api", level = "debug", ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)]
pub fn with_default_safety(self) -> VeilidAPIResult<Self> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"RoutingContext::with_default_safety(self: {:?})", self);
let config = self.api.config()?;
@ -89,9 +96,9 @@ impl RoutingContext {
}
/// Use a custom [SafetySelection]. Can be used to disable safety via [SafetySelection::Unsafe].
#[instrument(target = "veilid_api", level = "debug", ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)]
pub fn with_safety(self, safety_selection: SafetySelection) -> VeilidAPIResult<Self> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"RoutingContext::with_safety(self: {:?}, safety_selection: {:?})", self, safety_selection);
Ok(Self {
@ -101,9 +108,9 @@ impl RoutingContext {
}
/// Use a specified [Sequencing] preference, with or without privacy.
#[instrument(target = "veilid_api", level = "debug", ret)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret)]
pub fn with_sequencing(self, sequencing: Sequencing) -> Self {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"RoutingContext::with_sequencing(self: {:?}, sequencing: {:?})", self, sequencing);
Self {
@ -140,9 +147,9 @@ impl RoutingContext {
self.api.clone()
}
#[instrument(target = "veilid_api", level = "debug", ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)]
async fn get_destination(&self, target: Target) -> VeilidAPIResult<rpc_processor::Destination> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"RoutingContext::get_destination(self: {:?}, target: {:?})", self, target);
let rpc_processor = self.api.core_context()?.rpc_processor();
@ -165,9 +172,9 @@ impl RoutingContext {
/// * `message` - an arbitrary message blob of up to 32768 bytes.
///
/// Returns an answer blob of up to 32768 bytes.
#[instrument(target = "veilid_api", level = "debug", ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)]
pub async fn app_call(&self, target: Target, message: Vec<u8>) -> VeilidAPIResult<Vec<u8>> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"RoutingContext::app_call(self: {:?}, target: {:?}, message: {:?})", self, target, message);
let rpc_processor = self.api.core_context()?.rpc_processor();
@ -199,9 +206,9 @@ impl RoutingContext {
///
/// * `target` - can be either a direct node id or a private route.
/// * `message` - an arbitrary message blob of up to 32768 bytes.
#[instrument(target = "veilid_api", level = "debug", ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)]
pub async fn app_message(&self, target: Target, message: Vec<u8>) -> VeilidAPIResult<()> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"RoutingContext::app_message(self: {:?}, target: {:?}, message: {:?})", self, target, message);
let rpc_processor = self.api.core_context()?.rpc_processor();
@ -230,14 +237,14 @@ impl RoutingContext {
/// DHT Records
/// Deterministicly builds the record key for a given schema and owner public key
#[instrument(target = "veilid_api", level = "debug", ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)]
pub fn get_dht_record_key(
&self,
schema: DHTSchema,
owner_key: &PublicKey,
kind: Option<CryptoKind>,
) -> VeilidAPIResult<TypedKey> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"RoutingContext::get_dht_record_key(self: {:?}, schema: {:?}, owner_key: {:?}, kind: {:?})", self, schema, owner_key, kind);
schema.validate()?;
@ -256,14 +263,14 @@ impl RoutingContext {
/// Returns the newly allocated DHT record's key if successful.
///
/// Note: if you pass in an owner keypair this call is a deterministic! This means that if you try to create a new record for a given owner and schema that already exists it *will* fail.
#[instrument(target = "veilid_api", level = "debug", ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)]
pub async fn create_dht_record(
&self,
schema: DHTSchema,
owner: Option<KeyPair>,
kind: Option<CryptoKind>,
) -> VeilidAPIResult<DHTRecordDescriptor> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"RoutingContext::create_dht_record(self: {:?}, schema: {:?}, owner: {:?}, kind: {:?})", self, schema, owner, kind);
schema.validate()?;
@ -291,13 +298,13 @@ impl RoutingContext {
/// safety selection.
///
/// Returns the DHT record descriptor for the opened record if successful.
#[instrument(target = "veilid_api", level = "debug", ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)]
pub async fn open_dht_record(
&self,
key: TypedKey,
default_writer: Option<KeyPair>,
) -> VeilidAPIResult<DHTRecordDescriptor> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"RoutingContext::open_dht_record(self: {:?}, key: {:?}, default_writer: {:?})", self, key, default_writer);
Crypto::validate_crypto_kind(key.kind)?;
@ -311,9 +318,9 @@ impl RoutingContext {
/// Closes a DHT record at a specific key that was opened with create_dht_record or open_dht_record.
///
/// Closing a record allows you to re-open it with a different routing context.
#[instrument(target = "veilid_api", level = "debug", ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)]
pub async fn close_dht_record(&self, key: TypedKey) -> VeilidAPIResult<()> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"RoutingContext::close_dht_record(self: {:?}, key: {:?})", self, key);
Crypto::validate_crypto_kind(key.kind)?;
@ -327,9 +334,9 @@ impl RoutingContext {
/// If the record is opened, it must be closed before it is deleted.
/// Deleting a record does not delete it from the network, but will remove the storage of the record
/// locally, and will prevent its value from being refreshed on the network by this node.
#[instrument(target = "veilid_api", level = "debug", ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)]
pub async fn delete_dht_record(&self, key: TypedKey) -> VeilidAPIResult<()> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"RoutingContext::delete_dht_record(self: {:?}, key: {:?})", self, key);
Crypto::validate_crypto_kind(key.kind)?;
@ -344,14 +351,14 @@ impl RoutingContext {
///
/// Returns `None` if the value subkey has not yet been set.
/// Returns `Some(data)` if the value subkey has valid data.
#[instrument(target = "veilid_api", level = "debug", ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)]
pub async fn get_dht_value(
&self,
key: TypedKey,
subkey: ValueSubkey,
force_refresh: bool,
) -> VeilidAPIResult<Option<ValueData>> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"RoutingContext::get_dht_value(self: {:?}, key: {:?}, subkey: {:?}, force_refresh: {:?})", self, key, subkey, force_refresh);
Crypto::validate_crypto_kind(key.kind)?;
@ -367,7 +374,7 @@ impl RoutingContext {
///
/// Returns `None` if the value was successfully put.
/// Returns `Some(data)` if the value put was older than the one available on the network.
#[instrument(target = "veilid_api", level = "debug", skip(data), fields(data = print_data(&data, Some(64))), ret, err)]
#[instrument(target = "veilid_api", level = "debug", skip(data), fields(__VEILID_LOG_KEY = self.log_key(), data = print_data(&data, Some(64))), ret, err)]
pub async fn set_dht_value(
&self,
key: TypedKey,
@ -375,7 +382,7 @@ impl RoutingContext {
data: Vec<u8>,
writer: Option<KeyPair>,
) -> VeilidAPIResult<Option<ValueData>> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"RoutingContext::set_dht_value(self: {:?}, key: {:?}, subkey: {:?}, data: len={}, writer: {:?})", self, key, subkey, data.len(), writer);
Crypto::validate_crypto_kind(key.kind)?;
@ -404,7 +411,7 @@ impl RoutingContext {
/// * If a member (either the owner or a SMPL schema member) has opened the key for writing (even if no writing is performed) then the watch will be signed and guaranteed network.dht.member_watch_limit per writer.
///
/// Members can be specified via the SMPL schema and do not need to allocate writable subkeys in order to offer a member watch capability.
#[instrument(target = "veilid_api", level = "debug", ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)]
pub async fn watch_dht_values(
&self,
key: TypedKey,
@ -412,7 +419,7 @@ impl RoutingContext {
expiration: Timestamp,
count: u32,
) -> VeilidAPIResult<Timestamp> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"RoutingContext::watch_dht_values(self: {:?}, key: {:?}, subkeys: {:?}, expiration: {}, count: {})", self, key, subkeys, expiration, count);
Crypto::validate_crypto_kind(key.kind)?;
@ -430,13 +437,13 @@ impl RoutingContext {
///
/// Returns Ok(true) if there is any remaining watch for this record.
/// Returns Ok(false) if the entire watch has been cancelled.
#[instrument(target = "veilid_api", level = "debug", ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)]
pub async fn cancel_dht_watch(
&self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
) -> VeilidAPIResult<bool> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"RoutingContext::cancel_dht_watch(self: {:?}, key: {:?}, subkeys: {:?}", self, key, subkeys);
Crypto::validate_crypto_kind(key.kind)?;
@ -484,14 +491,14 @@ impl RoutingContext {
/// Useful for determine which subkeys would change with an SetValue operation.
///
/// Returns a DHTRecordReport with the subkey ranges that were returned that overlapped the schema, and sequence numbers for each of the subkeys in the range.
#[instrument(target = "veilid_api", level = "debug", ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)]
pub async fn inspect_dht_record(
&self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
scope: DHTReportScope,
) -> VeilidAPIResult<DHTRecordReport> {
event!(target: "veilid_api", Level::DEBUG,
veilid_log!(self debug
"RoutingContext::inspect_dht_record(self: {:?}, key: {:?}, subkeys: {:?}, scope: {:?})", self, key, subkeys, scope);
Crypto::validate_crypto_kind(key.kind)?;
@ -504,13 +511,13 @@ impl RoutingContext {
/// Block Store
#[cfg(feature = "unstable-blockstore")]
#[instrument(target = "veilid_api", level = "debug", ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)]
pub async fn find_block(&self, _block_id: PublicKey) -> VeilidAPIResult<Vec<u8>> {
panic!("unimplemented");
}
#[cfg(feature = "unstable-blockstore")]
#[instrument(target = "veilid_api", level = "debug", ret, err)]
#[instrument(target = "veilid_api", level = "debug", fields(__VEILID_LOG_KEY = self.log_key()), ret, err)]
pub async fn supply_block(&self, _block_id: PublicKey) -> VeilidAPIResult<bool> {
panic!("unimplemented");
}

View File

@ -18,8 +18,8 @@ pub fn setup() -> () {
let config = veilid_tracing_wasm::WASMLayerConfig::new()
.with_report_logs_in_timings(false)
.with_max_level(Level::TRACE)
.with_console_config(tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor);
tracing_wasm::set_as_global_default_with_config(config);
.with_console_config(veilid_tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor);
veilid_tracing_wasm::set_as_global_default_with_config(config);
});
}

View File

@ -1,6 +1,6 @@
# Routing context veilid tests
from typing import Awaitable, Callable
from typing import Any, Awaitable, Callable, Optional
import pytest
import asyncio
import time
@ -374,13 +374,13 @@ async def test_inspect_dht_record(api_connection: veilid.VeilidAPI):
rr = await rc.inspect_dht_record(rec.key, [], veilid.DHTReportScope.LOCAL)
print("rr: {}", rr.__dict__)
assert rr.subkeys == [[0,1]]
assert rr.subkeys == [(0,1)]
assert rr.local_seqs == [0, 0xFFFFFFFF]
assert rr.network_seqs == []
rr2 = await rc.inspect_dht_record(rec.key, [], veilid.DHTReportScope.SYNC_GET)
print("rr2: {}", rr2.__dict__)
assert rr2.subkeys == [[0,1]]
assert rr2.subkeys == [(0,1)]
assert rr2.local_seqs == [0, 0xFFFFFFFF]
assert rr2.network_seqs == [0, 0xFFFFFFFF]
@ -390,42 +390,28 @@ async def test_inspect_dht_record(api_connection: veilid.VeilidAPI):
async def _run_test_schema_limit(api_connection: veilid.VeilidAPI, open_record: Callable[[veilid.RoutingContext, int], Awaitable[tuple[veilid.TypedKey, veilid.PublicKey, veilid.SecretKey]]], count: int, test_data: bytes, ):
async def _run_test_schema_limit(api_connection: veilid.VeilidAPI, open_record: Callable[[veilid.RoutingContext, int], Awaitable[tuple[veilid.DHTRecordDescriptor, Optional[veilid.KeyPair]]]], count: int, test_data: bytes):
rc = await api_connection.new_routing_context()
async with rc:
(key, owner, secret) = await open_record(rc, count)
print(f'{key} {owner}:{secret}')
(desc, writer) = await open_record(rc, count)
print(f'{desc.key} {writer}')
# write dht records on server 0
records = []
print(f'writing {count} subkeys')
for n in range(count):
await rc.set_dht_value(key, ValueSubkey(n), test_data)
await rc.set_dht_value(desc.key, ValueSubkey(n), test_data)
print(f' {n}')
print('syncing records to the network')
await sync(rc, [desc])
while True:
donerecords = set()
subkeysleft = 0
rr = await rc.inspect_dht_record(key, [])
left = 0; [left := left + (x[1]-x[0]+1) for x in rr.offline_subkeys]
if left == 0:
break
print(f' {left} subkeys left')
time.sleep(1)
await rc.close_dht_record(key)
await api_connection.debug("record purge local")
await api_connection.debug("record purge remote")
await rc.close_dht_record(desc.key)
# read dht records on server 0
print(f'reading {count} subkeys')
desc1 = await rc.open_dht_record(key)
desc1 = await rc.open_dht_record(desc.key)
for n in range(count):
vd0 = await rc.get_dht_value(key, ValueSubkey(n), force_refresh=True)
vd0 = await rc.get_dht_value(desc1.key, ValueSubkey(n))
assert vd0.data == test_data
print(f' {n}')
@ -433,10 +419,10 @@ async def _run_test_schema_limit(api_connection: veilid.VeilidAPI, open_record:
@pytest.mark.asyncio
async def test_schema_limit_dflt(api_connection: veilid.VeilidAPI):
async def open_record(rc: veilid.RoutingContext, count: int) -> tuple[veilid.TypedKey, veilid.PublicKey, veilid.SecretKey]:
async def open_record(rc: veilid.RoutingContext, count: int) -> tuple[veilid.DHTRecordDescriptor, Optional[veilid.KeyPair]]:
schema = veilid.DHTSchema.dflt(count)
desc = await rc.create_dht_record(schema)
return (desc.key, desc.owner, desc.owner_secret)
return (desc, desc.owner_key_pair())
print("Test with maximum number of subkeys before lower limit hit")
@ -474,7 +460,7 @@ async def test_schema_limit_smpl(api_connection: veilid.VeilidAPI):
desc = await rc.create_dht_record(schema)
await rc.open_dht_record(desc.key, writer_keypair)
return (desc.key, writer_keypair.key(), writer_keypair.secret())
return (desc, writer_keypair)
print("Test with maximum number of subkeys before lower limit hit")
TEST_DATA = b"A" * 32768
@ -545,18 +531,7 @@ async def test_dht_integration_writer_reader():
await rc0.set_dht_value(desc.key, ValueSubkey(0), TEST_DATA)
print('syncing records to the network')
recleft = len(records)
for desc in records:
while True:
rr = await rc0.inspect_dht_record(desc.key, [])
left = 0; [left := left + (x[1]-x[0]+1) for x in rr.offline_subkeys]
if left == 0:
await rc0.close_dht_record(desc.key)
break
print(f' {recleft} records {left} subkeys left')
time.sleep(0.1)
recleft-=1
await sync(rc0, records)
# read dht records on server 1
print(f'reading {COUNT} records')
@ -636,6 +611,96 @@ async def test_dht_write_read_local():
print(f' {n}')
n += 1
@pytest.mark.skipif(os.getenv("STRESS") != "1", reason="stress test takes a long time")
@pytest.mark.asyncio
async def test_dht_write_read_full_subkeys_local():
async def null_update_callback(update: veilid.VeilidUpdate):
pass
try:
api0 = await veilid.api_connector(null_update_callback, 0)
except veilid.VeilidConnectionError:
pytest.skip("Unable to connect to veilid-server 0.")
async with api0:
# purge local and remote record stores to ensure we start fresh
await api0.debug("record purge local")
await api0.debug("record purge remote")
# make routing contexts
rc0 = await api0.new_routing_context()
async with rc0:
# Number of records
COUNT = 8
# Number of subkeys per record
SUBKEY_COUNT = 32
# Nonce to encrypt test data
NONCE = veilid.Nonce.from_bytes(b"A"*24)
# Secret to encrypt test data
SECRET = veilid.SharedSecret.from_bytes(b"A"*32)
# Max subkey size
MAX_SUBKEY_SIZE = min(32768, 1024*1024/SUBKEY_COUNT)
# MAX_SUBKEY_SIZE = 256
# write dht records on server 0
records = []
subkey_data_list = []
schema = veilid.DHTSchema.dflt(SUBKEY_COUNT)
print(f'writing {COUNT} records with full subkeys')
init_futures = set()
for n in range(COUNT):
# Make encrypted data that is consistent and hard to compress
subkey_data = bytes(chr(ord("A")+n)*MAX_SUBKEY_SIZE, 'ascii')
print(f"subkey_data({n}):len={len(subkey_data)}")
cs = await api0.best_crypto_system()
async with cs:
subkey_data = await cs.crypt_no_auth(subkey_data, NONCE, SECRET)
subkey_data_list.append(subkey_data)
desc = await rc0.create_dht_record(schema)
records.append(desc)
for i in range(SUBKEY_COUNT):
init_futures.add(rc0.set_dht_value(desc.key, ValueSubkey(i), subkey_data))
print(f' {n}: {desc.key} {desc.owner}:{desc.owner_secret}')
# Wait for all records to synchronize, with progress bars
await sync_win(rc0, records, SUBKEY_COUNT, init_futures)
for desc0 in records:
await rc0.close_dht_record(desc0.key)
await api0.debug("record purge local")
await api0.debug("record purge remote")
# read dht records on server 0
print(f'reading {COUNT} records')
for n, desc0 in enumerate(records):
desc1 = await rc0.open_dht_record(desc0.key)
for i in range(SUBKEY_COUNT):
vd0 = None
while vd0 == None:
vd0 = await rc0.get_dht_value(desc1.key, ValueSubkey(i), force_refresh=True)
if vd0 != None:
assert vd0.data == subkey_data_list[n]
break
time.sleep(1)
print(f"retrying record {n} subkey {i}")
await rc0.close_dht_record(desc1.key)
print(f' {n}')
async def sync(rc: veilid.RoutingContext, records: list[veilid.DHTRecordDescriptor]):
print('syncing records to the network')
syncrecords = records.copy()
@ -646,9 +711,121 @@ async def sync(rc: veilid.RoutingContext, records: list[veilid.DHTRecordDescript
rr = await rc.inspect_dht_record(desc.key, [])
left = 0; [left := left + (x[1]-x[0]+1) for x in rr.offline_subkeys]
if left == 0:
donerecords.add(desc)
if veilid.ValueSeqNum.NONE not in rr.local_seqs:
donerecords.add(desc)
else:
subkeysleft += left
syncrecords = [x for x in syncrecords if x not in donerecords]
print(f' {len(syncrecords)} records {subkeysleft} subkeys left')
time.sleep(1)
async def sync_win(
rc: veilid.RoutingContext,
records: list[veilid.DHTRecordDescriptor],
subkey_count: int,
init_futures: set[Awaitable[Any]]
):
import curses
screen = curses.initscr()
curses.start_color()
curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_BLUE)
curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_CYAN)
curses.init_pair(3, curses.COLOR_BLACK, curses.COLOR_YELLOW)
curses.init_pair(4, curses.COLOR_BLACK, curses.COLOR_GREEN)
HEIGHT=len(records) + 3
GRAPHWIDTH = subkey_count
WIDTH=GRAPHWIDTH + 4 + 1 + 43 + 2
cur_lines = curses.LINES
cur_cols = curses.COLS
win = curses.newwin(HEIGHT, WIDTH,
max(0, int(cur_lines/2) - int(HEIGHT/2)),
max(0, int(cur_cols/2) - int(WIDTH/2)))
win.clear()
win.border(0,0,0,0)
win.nodelay(True)
# Record inspection and completion state
# Records we are done inspecting and have finished sync
donerecords: set[veilid.TypedKey] = set()
# Records we are currently inspecting that are in the futures set
futurerecords: set[veilid.TypedKey] = set()
# All the futures we are waiting for
futures = set()
# The record report state
recordreports: dict[veilid.TypedKey, veilid.DHTRecordReport] = dict()
# Add initial futures with None key
for fut in init_futures:
async def _do_init_fut(fut):
return (None, await fut)
futures.add(asyncio.create_task(_do_init_fut(fut)))
# Loop until all records are completed
while len(donerecords) != len(records):
# Update the futures with inspects for unfinished records
for n, desc in enumerate(records):
if desc.key in donerecords or desc.key in futurerecords:
continue
async def _do_inspect(key: veilid.TypedKey):
return (key, await rc.inspect_dht_record(key, []))
futures.add(asyncio.create_task(_do_inspect(desc.key)))
futurerecords.add(desc.key)
# Wait for some futures to finish
done, futures = await asyncio.wait(futures, return_when = asyncio.FIRST_COMPLETED)
# Process finished futures into the state
for rr_fut in done:
key: veilid.TypedKey
rr: veilid.DHTRecordReport
key, rr = await rr_fut
if key is not None:
futurerecords.remove(key)
if len(rr.subkeys) == 1 and rr.subkeys[0] == (0, subkey_count-1) and veilid.ValueSeqNum.NONE not in rr.local_seqs and len(rr.offline_subkeys) == 0:
if key in recordreports:
del recordreports[key]
donerecords.add(key)
else:
recordreports[key] = rr
# Re-render the state
if cur_lines != curses.LINES or cur_cols != curses.COLS:
cur_lines = curses.LINES
cur_cols = curses.COLS
win.move(
max(0, int(cur_lines/2) - int(HEIGHT/2)),
max(0, int(cur_cols/2) - int(WIDTH/2)))
win.border(0,0,0,0)
win.addstr(1, 1, "syncing records to the network", curses.color_pair(0))
for n, rr in enumerate(records):
key = rr.key
win.addstr(n+2, GRAPHWIDTH+1, key, curses.color_pair(0))
if key in donerecords:
win.addstr(n+2, 1, " " * subkey_count, curses.color_pair(4))
elif key in recordreports:
rr = recordreports[key]
win.addstr(n+2, 1, " " * subkey_count, curses.color_pair(1))
for (a,b) in rr.subkeys:
for m in range(a, b+1):
if rr.local_seqs[m] != veilid.ValueSeqNum.NONE:
win.addstr(n+2, m+1, " ", curses.color_pair(2))
for (a,b) in rr.offline_subkeys:
win.addstr(n+2, a+1, " " * (b-a+1), curses.color_pair(3))
else:
win.addstr(n+2, 1, " " * subkey_count, curses.color_pair(1))
win.refresh()
time.sleep(.5)
curses.endwin()

View File

@ -338,6 +338,12 @@ class _JsonVeilidAPI(VeilidAPI):
async def new_custom_private_route(
self, kinds: list[CryptoKind], stability: Stability, sequencing: Sequencing
) -> tuple[RouteId, bytes]:
assert isinstance(kinds, list)
for k in kinds:
assert isinstance(k, CryptoKind)
assert isinstance(stability, Stability)
assert isinstance(sequencing, Sequencing)
return NewPrivateRouteResult.from_json(
raise_api_result(
await self.send_ndjson_request(
@ -350,6 +356,8 @@ class _JsonVeilidAPI(VeilidAPI):
).to_tuple()
async def import_remote_private_route(self, blob: bytes) -> RouteId:
assert isinstance(blob, bytes)
return RouteId(
raise_api_result(
await self.send_ndjson_request(Operation.IMPORT_REMOTE_PRIVATE_ROUTE, blob=blob)
@ -357,11 +365,16 @@ class _JsonVeilidAPI(VeilidAPI):
)
async def release_private_route(self, route_id: RouteId):
assert isinstance(route_id, RouteId)
raise_api_result(
await self.send_ndjson_request(Operation.RELEASE_PRIVATE_ROUTE, route_id=route_id)
)
async def app_call_reply(self, call_id: OperationId, message: bytes):
assert isinstance(call_id, OperationId)
assert isinstance(message, bytes)
raise_api_result(
await self.send_ndjson_request(
Operation.APP_CALL_REPLY, call_id=call_id, message=message
@ -373,6 +386,9 @@ class _JsonVeilidAPI(VeilidAPI):
return _JsonRoutingContext(self, rc_id)
async def open_table_db(self, name: str, column_count: int) -> TableDb:
assert isinstance(name, str)
assert isinstance(column_count, int)
db_id = raise_api_result(
await self.send_ndjson_request(
Operation.OPEN_TABLE_DB, name=name, column_count=column_count
@ -381,11 +397,15 @@ class _JsonVeilidAPI(VeilidAPI):
return _JsonTableDb(self, db_id)
async def delete_table_db(self, name: str) -> bool:
assert isinstance(name, str)
return raise_api_result(
await self.send_ndjson_request(Operation.DELETE_TABLE_DB, name=name)
)
async def get_crypto_system(self, kind: CryptoKind) -> CryptoSystem:
assert isinstance(kind, CryptoKind)
cs_id = raise_api_result(
await self.send_ndjson_request(Operation.GET_CRYPTO_SYSTEM, kind=kind)
)
@ -398,6 +418,13 @@ class _JsonVeilidAPI(VeilidAPI):
async def verify_signatures(
self, node_ids: list[TypedKey], data: bytes, signatures: list[TypedSignature]
) -> Optional[list[TypedKey]]:
assert isinstance(node_ids, list)
for ni in node_ids:
assert isinstance(ni, TypedKey)
assert isinstance(data, bytes)
for sig in signatures:
assert isinstance(sig, TypedSignature)
out = raise_api_result(
await self.send_ndjson_request(
Operation.VERIFY_SIGNATURES,
@ -418,6 +445,11 @@ class _JsonVeilidAPI(VeilidAPI):
async def generate_signatures(
self, data: bytes, key_pairs: list[TypedKeyPair]
) -> list[TypedSignature]:
assert isinstance(data, bytes)
assert isinstance(key_pairs, list)
for kp in key_pairs:
assert isinstance(kp, TypedKeyPair)
return list(
map(
lambda x: TypedSignature(x),
@ -430,6 +462,8 @@ class _JsonVeilidAPI(VeilidAPI):
)
async def generate_key_pair(self, kind: CryptoKind) -> list[TypedKeyPair]:
assert isinstance(kind, CryptoKind)
return list(
map(
lambda x: TypedKeyPair(x),
@ -443,6 +477,7 @@ class _JsonVeilidAPI(VeilidAPI):
return Timestamp(raise_api_result(await self.send_ndjson_request(Operation.NOW)))
async def debug(self, command: str) -> str:
assert isinstance(command, str)
return raise_api_result(await self.send_ndjson_request(Operation.DEBUG, command=command))
async def veilid_version_string(self) -> str:
@ -501,6 +536,8 @@ class _JsonRoutingContext(RoutingContext):
self.done = True
async def with_default_safety(self, release=True) -> Self:
assert isinstance(release, bool)
new_rc_id = raise_api_result(
await self.api.send_ndjson_request(
Operation.ROUTING_CONTEXT,
@ -514,6 +551,9 @@ class _JsonRoutingContext(RoutingContext):
return self.__class__(self.api, new_rc_id)
async def with_safety(self, safety_selection: SafetySelection, release=True) -> Self:
assert isinstance(safety_selection, SafetySelection)
assert isinstance(release, bool)
new_rc_id = raise_api_result(
await self.api.send_ndjson_request(
Operation.ROUTING_CONTEXT,
@ -528,6 +568,9 @@ class _JsonRoutingContext(RoutingContext):
return self.__class__(self.api, new_rc_id)
async def with_sequencing(self, sequencing: Sequencing, release=True) -> Self:
assert isinstance(sequencing, Sequencing)
assert isinstance(release, bool)
new_rc_id = raise_api_result(
await self.api.send_ndjson_request(
Operation.ROUTING_CONTEXT,
@ -555,6 +598,9 @@ class _JsonRoutingContext(RoutingContext):
)
)
async def app_call(self, target: TypedKey | RouteId, message: bytes) -> bytes:
assert isinstance(target, TypedKey) or isinstance(target, RouteId)
assert isinstance(message, bytes)
return urlsafe_b64decode_no_pad(
raise_api_result(
await self.api.send_ndjson_request(
@ -569,6 +615,9 @@ class _JsonRoutingContext(RoutingContext):
)
async def app_message(self, target: TypedKey | RouteId, message: bytes):
assert isinstance(target, TypedKey) or isinstance(target, RouteId)
assert isinstance(message, bytes)
raise_api_result(
await self.api.send_ndjson_request(
Operation.ROUTING_CONTEXT,
@ -583,6 +632,10 @@ class _JsonRoutingContext(RoutingContext):
async def create_dht_record(
self, schema: DHTSchema, owner: Optional[KeyPair] = None, kind: Optional[CryptoKind] = None
) -> DHTRecordDescriptor:
assert isinstance(schema, DHTSchema)
assert owner is None or isinstance(owner, KeyPair)
assert kind is None or isinstance(kind, CryptoKind)
return DHTRecordDescriptor.from_json(
raise_api_result(
await self.api.send_ndjson_request(
@ -600,6 +653,9 @@ class _JsonRoutingContext(RoutingContext):
async def open_dht_record(
self, key: TypedKey, writer: Optional[KeyPair] = None
) -> DHTRecordDescriptor:
assert isinstance(key, TypedKey)
assert writer is None or isinstance(writer, KeyPair)
return DHTRecordDescriptor.from_json(
raise_api_result(
await self.api.send_ndjson_request(
@ -614,6 +670,8 @@ class _JsonRoutingContext(RoutingContext):
)
async def close_dht_record(self, key: TypedKey):
assert isinstance(key, TypedKey)
raise_api_result(
await self.api.send_ndjson_request(
Operation.ROUTING_CONTEXT,
@ -625,6 +683,8 @@ class _JsonRoutingContext(RoutingContext):
)
async def delete_dht_record(self, key: TypedKey):
assert isinstance(key, TypedKey)
raise_api_result(
await self.api.send_ndjson_request(
Operation.ROUTING_CONTEXT,
@ -638,6 +698,10 @@ class _JsonRoutingContext(RoutingContext):
async def get_dht_value(
self, key: TypedKey, subkey: ValueSubkey, force_refresh: bool = False
) -> Optional[ValueData]:
assert isinstance(key, TypedKey)
assert isinstance(subkey, ValueSubkey)
assert isinstance(force_refresh, bool)
ret = raise_api_result(
await self.api.send_ndjson_request(
Operation.ROUTING_CONTEXT,
@ -654,6 +718,11 @@ class _JsonRoutingContext(RoutingContext):
async def set_dht_value(
self, key: TypedKey, subkey: ValueSubkey, data: bytes, writer: Optional[KeyPair] = None
) -> Optional[ValueData]:
assert isinstance(key, TypedKey)
assert isinstance(subkey, ValueSubkey)
assert isinstance(data, bytes)
assert writer is None or isinstance(writer, KeyPair)
ret = raise_api_result(
await self.api.send_ndjson_request(
Operation.ROUTING_CONTEXT,
@ -675,6 +744,15 @@ class _JsonRoutingContext(RoutingContext):
expiration: Timestamp = 0,
count: int = 0xFFFFFFFF,
) -> Timestamp:
assert isinstance(key, TypedKey)
assert isinstance(subkeys, list)
for s in subkeys:
assert isinstance(s, tuple)
assert isinstance(s[0], ValueSubkey)
assert isinstance(s[1], ValueSubkey)
assert isinstance(expiration, Timestamp)
assert isinstance(count, int)
return Timestamp(
raise_api_result(
await self.api.send_ndjson_request(
@ -693,6 +771,13 @@ class _JsonRoutingContext(RoutingContext):
async def cancel_dht_watch(
self, key: TypedKey, subkeys: list[tuple[ValueSubkey, ValueSubkey]]
) -> bool:
assert isinstance(key, TypedKey)
assert isinstance(subkeys, list)
for s in subkeys:
assert isinstance(s, tuple)
assert isinstance(s[0], ValueSubkey)
assert isinstance(s[1], ValueSubkey)
return raise_api_result(
await self.api.send_ndjson_request(
Operation.ROUTING_CONTEXT,
@ -710,6 +795,14 @@ class _JsonRoutingContext(RoutingContext):
subkeys: list[tuple[ValueSubkey, ValueSubkey]],
scope: DHTReportScope = DHTReportScope.LOCAL,
) -> DHTRecordReport:
assert isinstance(key, TypedKey)
assert isinstance(subkeys, list)
for s in subkeys:
assert isinstance(s, tuple)
assert isinstance(s[0], ValueSubkey)
assert isinstance(s[1], ValueSubkey)
assert isinstance(scope, DHTReportScope)
return DHTRecordReport.from_json(
raise_api_result(
await self.api.send_ndjson_request(
@ -790,6 +883,10 @@ class _JsonTableDbTransaction(TableDbTransaction):
self.done = True
async def store(self, key: bytes, value: bytes, col: int = 0):
assert isinstance(key, bytes)
assert isinstance(value, bytes)
assert isinstance(col, int)
await self.api.send_ndjson_request(
Operation.TABLE_DB_TRANSACTION,
validate=validate_tx_op,
@ -801,6 +898,9 @@ class _JsonTableDbTransaction(TableDbTransaction):
)
async def delete(self, key: bytes, col: int = 0):
assert isinstance(key, bytes)
assert isinstance(col, int)
await self.api.send_ndjson_request(
Operation.TABLE_DB_TRANSACTION,
validate=validate_tx_op,
@ -866,6 +966,8 @@ class _JsonTableDb(TableDb):
)
async def get_keys(self, col: int = 0) -> list[bytes]:
assert isinstance(col, int)
return list(
map(
lambda x: urlsafe_b64decode_no_pad(x),
@ -893,6 +995,10 @@ class _JsonTableDb(TableDb):
return _JsonTableDbTransaction(self.api, tx_id)
async def store(self, key: bytes, value: bytes, col: int = 0):
assert isinstance(key, bytes)
assert isinstance(value, bytes)
assert isinstance(col, int)
return raise_api_result(
await self.api.send_ndjson_request(
Operation.TABLE_DB,
@ -906,6 +1012,9 @@ class _JsonTableDb(TableDb):
)
async def load(self, key: bytes, col: int = 0) -> Optional[bytes]:
assert isinstance(key, bytes)
assert isinstance(col, int)
res = raise_api_result(
await self.api.send_ndjson_request(
Operation.TABLE_DB,
@ -919,6 +1028,9 @@ class _JsonTableDb(TableDb):
return None if res is None else urlsafe_b64decode_no_pad(res)
async def delete(self, key: bytes, col: int = 0) -> Optional[bytes]:
assert isinstance(key, bytes)
assert isinstance(col, int)
res = raise_api_result(
await self.api.send_ndjson_request(
Operation.TABLE_DB,
@ -989,6 +1101,9 @@ class _JsonCryptoSystem(CryptoSystem):
self.done = True
async def cached_dh(self, key: PublicKey, secret: SecretKey) -> SharedSecret:
assert isinstance(key, PublicKey)
assert isinstance(secret, SecretKey)
return SharedSecret(
raise_api_result(
await self.api.send_ndjson_request(
@ -1003,6 +1118,9 @@ class _JsonCryptoSystem(CryptoSystem):
)
async def compute_dh(self, key: PublicKey, secret: SecretKey) -> SharedSecret:
assert isinstance(key, PublicKey)
assert isinstance(secret, SecretKey)
return SharedSecret(
raise_api_result(
await self.api.send_ndjson_request(
@ -1017,6 +1135,10 @@ class _JsonCryptoSystem(CryptoSystem):
)
async def generate_shared_secret(self, key: PublicKey, secret: SecretKey, domain: bytes) -> SharedSecret:
assert isinstance(key, PublicKey)
assert isinstance(secret, SecretKey)
assert isinstance(domain, bytes)
return SharedSecret(
raise_api_result(
await self.api.send_ndjson_request(
@ -1032,6 +1154,8 @@ class _JsonCryptoSystem(CryptoSystem):
)
async def random_bytes(self, len: int) -> bytes:
assert isinstance(len, int)
return urlsafe_b64decode_no_pad(
raise_api_result(
await self.api.send_ndjson_request(
@ -1055,6 +1179,9 @@ class _JsonCryptoSystem(CryptoSystem):
)
async def hash_password(self, password: bytes, salt: bytes) -> str:
assert isinstance(password, bytes)
assert isinstance(salt, bytes)
return raise_api_result(
await self.api.send_ndjson_request(
Operation.CRYPTO_SYSTEM,
@ -1067,6 +1194,9 @@ class _JsonCryptoSystem(CryptoSystem):
)
async def verify_password(self, password: bytes, password_hash: str) -> bool:
assert isinstance(password, bytes)
assert isinstance(password_hash, str)
return raise_api_result(
await self.api.send_ndjson_request(
Operation.CRYPTO_SYSTEM,
@ -1079,6 +1209,9 @@ class _JsonCryptoSystem(CryptoSystem):
)
async def derive_shared_secret(self, password: bytes, salt: bytes) -> SharedSecret:
assert isinstance(password, bytes)
assert isinstance(salt, bytes)
return SharedSecret(
raise_api_result(
await self.api.send_ndjson_request(
@ -1129,6 +1262,8 @@ class _JsonCryptoSystem(CryptoSystem):
)
async def generate_hash(self, data: bytes) -> HashDigest:
assert isinstance(data, bytes)
return HashDigest(
raise_api_result(
await self.api.send_ndjson_request(
@ -1142,6 +1277,9 @@ class _JsonCryptoSystem(CryptoSystem):
)
async def validate_key_pair(self, key: PublicKey, secret: SecretKey) -> bool:
assert isinstance(key, PublicKey)
assert isinstance(secret, SecretKey)
return raise_api_result(
await self.api.send_ndjson_request(
Operation.CRYPTO_SYSTEM,
@ -1154,6 +1292,9 @@ class _JsonCryptoSystem(CryptoSystem):
)
async def validate_hash(self, data: bytes, hash_digest: HashDigest) -> bool:
assert isinstance(data, bytes)
assert isinstance(hash_digest, HashDigest)
return raise_api_result(
await self.api.send_ndjson_request(
Operation.CRYPTO_SYSTEM,
@ -1166,6 +1307,9 @@ class _JsonCryptoSystem(CryptoSystem):
)
async def distance(self, key1: CryptoKey, key2: CryptoKey) -> CryptoKeyDistance:
assert isinstance(key1, CryptoKey)
assert isinstance(key2, CryptoKey)
return CryptoKeyDistance(
raise_api_result(
await self.api.send_ndjson_request(
@ -1180,6 +1324,10 @@ class _JsonCryptoSystem(CryptoSystem):
)
async def sign(self, key: PublicKey, secret: SecretKey, data: bytes) -> Signature:
assert isinstance(key, PublicKey)
assert isinstance(secret, SecretKey)
assert isinstance(data, bytes)
return Signature(
raise_api_result(
await self.api.send_ndjson_request(
@ -1195,6 +1343,10 @@ class _JsonCryptoSystem(CryptoSystem):
)
async def verify(self, key: PublicKey, data: bytes, signature: Signature):
assert isinstance(key, PublicKey)
assert isinstance(data, bytes)
assert isinstance(signature, Signature)
return raise_api_result(
await self.api.send_ndjson_request(
Operation.CRYPTO_SYSTEM,
@ -1224,6 +1376,11 @@ class _JsonCryptoSystem(CryptoSystem):
shared_secret: SharedSecret,
associated_data: Optional[bytes],
) -> bytes:
assert isinstance(body, bytes)
assert isinstance(nonce, Nonce)
assert isinstance(shared_secret, SharedSecret)
assert associated_data is None or isinstance(associated_data, bytes)
return urlsafe_b64decode_no_pad(
raise_api_result(
await self.api.send_ndjson_request(
@ -1246,6 +1403,11 @@ class _JsonCryptoSystem(CryptoSystem):
shared_secret: SharedSecret,
associated_data: Optional[bytes],
) -> bytes:
assert isinstance(body, bytes)
assert isinstance(nonce, Nonce)
assert isinstance(shared_secret, SharedSecret)
assert associated_data is None or isinstance(associated_data, bytes)
return urlsafe_b64decode_no_pad(
raise_api_result(
await self.api.send_ndjson_request(
@ -1262,6 +1424,9 @@ class _JsonCryptoSystem(CryptoSystem):
)
async def crypt_no_auth(self, body: bytes, nonce: Nonce, shared_secret: SharedSecret) -> bytes:
assert isinstance(body, bytes)
assert isinstance(nonce, Nonce)
assert isinstance(shared_secret, SharedSecret)
return urlsafe_b64decode_no_pad(
raise_api_result(
await self.api.send_ndjson_request(

View File

@ -2,7 +2,7 @@ import base64
import json
from enum import StrEnum
from functools import total_ordering
from typing import Any, Optional, Self, Tuple
from typing import Any, Optional, Self
####################################################################
@ -122,6 +122,7 @@ class EncodedString(str):
@classmethod
def from_bytes(cls, b: bytes) -> Self:
assert isinstance(b, bytes)
return cls(urlsafe_b64encode_no_pad(b))
@ -160,6 +161,8 @@ class Nonce(EncodedString):
class KeyPair(str):
@classmethod
def from_parts(cls, key: PublicKey, secret: SecretKey) -> Self:
assert isinstance(key, PublicKey)
assert isinstance(secret, SecretKey)
return cls(f"{key}:{secret}")
def key(self) -> PublicKey:
@ -168,7 +171,7 @@ class KeyPair(str):
def secret(self) -> SecretKey:
return SecretKey(self.split(":", 1)[1])
def to_parts(self) -> Tuple[PublicKey, SecretKey]:
def to_parts(self) -> tuple[PublicKey, SecretKey]:
public, secret = self.split(":", 1)
return (PublicKey(public), SecretKey(secret))
@ -188,6 +191,8 @@ class CryptoTyped(str):
class TypedKey(CryptoTyped):
@classmethod
def from_value(cls, kind: CryptoKind, value: PublicKey) -> Self:
assert isinstance(kind, CryptoKind)
assert isinstance(value, PublicKey)
return cls(f"{kind}:{value}")
def value(self) -> PublicKey:
@ -197,6 +202,8 @@ class TypedKey(CryptoTyped):
class TypedSecret(CryptoTyped):
@classmethod
def from_value(cls, kind: CryptoKind, value: SecretKey) -> Self:
assert isinstance(kind, CryptoKind)
assert isinstance(value, SecretKey)
return cls(f"{kind}:{value}")
def value(self) -> SecretKey:
@ -206,6 +213,8 @@ class TypedSecret(CryptoTyped):
class TypedKeyPair(CryptoTyped):
@classmethod
def from_value(cls, kind: CryptoKind, value: KeyPair) -> Self:
assert isinstance(kind, CryptoKind)
assert isinstance(value, KeyPair)
return cls(f"{kind}:{value}")
def value(self) -> KeyPair:
@ -215,6 +224,8 @@ class TypedKeyPair(CryptoTyped):
class TypedSignature(CryptoTyped):
@classmethod
def from_value(cls, kind: CryptoKind, value: Signature) -> Self:
assert isinstance(kind, CryptoKind)
assert isinstance(value, Signature)
return cls(f"{kind}:{value}")
def value(self) -> Signature:
@ -226,7 +237,7 @@ class ValueSubkey(int):
class ValueSeqNum(int):
pass
NONE = 4294967295
####################################################################
@ -284,10 +295,13 @@ class NewPrivateRouteResult:
blob: bytes
def __init__(self, route_id: RouteId, blob: bytes):
assert isinstance(route_id, RouteId)
assert isinstance(blob, bytes)
self.route_id = route_id
self.blob = blob
def to_tuple(self) -> Tuple[RouteId, bytes]:
def to_tuple(self) -> tuple[RouteId, bytes]:
return (self.route_id, self.blob)
@classmethod
@ -300,6 +314,9 @@ class DHTSchemaSMPLMember:
m_cnt: int
def __init__(self, m_key: PublicKey, m_cnt: int):
assert isinstance(m_key, PublicKey)
assert isinstance(m_cnt, int)
self.m_key = m_key
self.m_cnt = m_cnt
@ -321,10 +338,15 @@ class DHTSchema:
@classmethod
def dflt(cls, o_cnt: int) -> Self:
assert isinstance(o_cnt, int)
return cls(DHTSchemaKind.DFLT, o_cnt=o_cnt)
@classmethod
def smpl(cls, o_cnt: int, members: list[DHTSchemaSMPLMember]) -> Self:
assert isinstance(o_cnt, int)
assert isinstance(members, list)
for m in members:
assert isinstance(m, DHTSchemaSMPLMember)
return cls(DHTSchemaKind.SMPL, o_cnt=o_cnt, members=members)
@classmethod
@ -404,8 +426,8 @@ class DHTRecordReport:
@classmethod
def from_json(cls, j: dict) -> Self:
return cls(
[[p[0], p[1]] for p in j["subkeys"]],
[[p[0], p[1]] for p in j["offline_subkeys"]],
[(p[0], p[1]) for p in j["subkeys"]],
[(p[0], p[1]) for p in j["offline_subkeys"]],
[ValueSeqNum(s) for s in j["local_seqs"]],
[ValueSeqNum(s) for s in j["network_seqs"]],
)

View File

@ -185,7 +185,7 @@ serial_test = { version = "2.0.0", default-features = false, features = [
console_error_panic_hook = "0.1.7"
wasm-bindgen-test = "0.3.50"
wasm-logger = "0.2.0"
tracing-wasm = { version = "0.2.1" }
veilid-tracing-wasm = "^0"
### BUILD OPTIONS

View File

@ -18,8 +18,8 @@ pub fn setup() -> () {
let config = veilid_tracing_wasm::WASMLayerConfig::new()
.with_report_logs_in_timings(false);
.with_max_level(Level::TRACE);
.with_console_config(tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor);
tracing_wasm::set_as_global_default_with_config(config);
.with_console_config(veilid_tracing_wasm::ConsoleConfig::ReportWithoutConsoleColor);
veilid_tracing_wasm::set_as_global_default_with_config(config);
} else {
wasm_logger::init(wasm_logger::Config::default());
}