From 1fe5004eef25500706b9fafc0c557b4d70a470a5 Mon Sep 17 00:00:00 2001 From: John Smith Date: Sat, 6 May 2023 21:09:40 -0400 Subject: [PATCH] checkpoint --- doc/config/sample.config | 14 +- doc/config/veilid-server-config.md | 14 +- .../src/routing_table/routing_table_inner.rs | 6 +- .../coders/operations/operation_get_value.rs | 1 + veilid-core/src/rpc_processor/fanout_call.rs | 29 ++- veilid-core/src/rpc_processor/mod.rs | 21 +- .../src/rpc_processor/operation_waiter.rs | 3 +- .../src/rpc_processor/rpc_find_node.rs | 7 +- .../src/rpc_processor/rpc_get_value.rs | 109 +++++++++- .../src/storage_manager/do_get_value.rs | 151 ++++++++++---- veilid-core/src/storage_manager/mod.rs | 190 ++++++++++++------ .../src/storage_manager/record_store.rs | 30 ++- .../src/storage_manager/types/record.rs | 4 +- .../src/tests/common/test_veilid_config.rs | 27 +-- veilid-core/src/veilid_api/error.rs | 16 +- .../src/veilid_api/types/dht/schema/dflt.rs | 23 +++ .../src/veilid_api/types/dht/schema/mod.rs | 13 ++ .../src/veilid_api/types/dht/schema/smpl.rs | 40 +++- veilid-core/src/veilid_config.rs | 4 +- veilid-flutter/lib/veilid.dart | 68 +++++-- veilid-server/src/settings.rs | 38 ++-- veilid-tools/src/tools.rs | 4 + veilid-wasm/tests/web.rs | 18 +- 23 files changed, 627 insertions(+), 203 deletions(-) diff --git a/doc/config/sample.config b/doc/config/sample.config index 60362d04..1d52f579 100644 --- a/doc/config/sample.config +++ b/doc/config/sample.config @@ -67,16 +67,16 @@ core: max_route_hop_count: 4 default_route_hop_count: 1 dht: - resolve_node_timeout_ms: 10000 - resolve_node_count: 20 - resolve_node_fanout: 3 max_find_node_count: 20 + resolve_node_timeout_ms: 10000 + resolve_node_count: 1 + resolve_node_fanout: 4 get_value_timeout_ms: 10000 - get_value_count: 20 - get_value_fanout: 3 + get_value_count: 3 + get_value_fanout: 4 set_value_timeout_ms: 10000 - set_value_count: 20 - set_value_fanout: 5 + set_value_count: 5 + set_value_fanout: 4 min_peer_count: 20 min_peer_refresh_time_ms: 2000 validate_dial_info_receipt_time_ms: 2000 diff --git a/doc/config/veilid-server-config.md b/doc/config/veilid-server-config.md index 13f556e9..e4a2ecc5 100644 --- a/doc/config/veilid-server-config.md +++ b/doc/config/veilid-server-config.md @@ -234,16 +234,16 @@ rpc: ```yaml dht: - resolve_node_timeout_ms: 10000 - resolve_node_count: 20 - resolve_node_fanout: 3 max_find_node_count: 20 + resolve_node_timeout_ms: 10000 + resolve_node_count: 1 + resolve_node_fanout: 4 get_value_timeout_ms: 10000 - get_value_count: 20 - get_value_fanout: 3 + get_value_count: 3 + get_value_fanout: 4 set_value_timeout_ms: 10000 - set_value_count: 20 - set_value_fanout: 5 + set_value_count: 5 + set_value_fanout: 4 min_peer_count: 20 min_peer_refresh_time_ms: 2000 validate_dial_info_receipt_time_ms: 2000 diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index e70e705d..aa08c08f 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -1253,7 +1253,7 @@ fn make_closest_noderef_sort( let cur_ts = get_aligned_timestamp(); let kind = node_id.kind; // Get cryptoversion to check distance with - let vcrypto = crypto.get(node_id.kind).unwrap(); + let vcrypto = crypto.get(kind).unwrap(); move |a: &NodeRefLocked, b: &NodeRefLocked| -> core::cmp::Ordering { // same nodes are always the same @@ -1275,8 +1275,8 @@ fn make_closest_noderef_sort( } // get keys - let a_key = a_entry.node_ids().get(node_id.kind).unwrap(); - let b_key = b_entry.node_ids().get(node_id.kind).unwrap(); + let a_key = a_entry.node_ids().get(kind).unwrap(); + let b_key = b_entry.node_ids().get(kind).unwrap(); // distance is the next metric, closer nodes first let da = vcrypto.distance(&a_key.value, &node_id.value); diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs index f8f0a37f..e46bf5d3 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs @@ -102,6 +102,7 @@ impl RPCOperationGetValueA { descriptor, }) } + pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { let question_context = validate_context .question_context diff --git a/veilid-core/src/rpc_processor/fanout_call.rs b/veilid-core/src/rpc_processor/fanout_call.rs index ff0617e1..e55606f5 100644 --- a/veilid-core/src/rpc_processor/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout_call.rs @@ -22,7 +22,7 @@ where crypto_kind: CryptoKind, node_id: TypedKey, context: Mutex>, - count: usize, + node_count: usize, fanout: usize, timeout_us: TimestampDuration, call_routine: C, @@ -39,14 +39,14 @@ where pub fn new( routing_table: RoutingTable, node_id: TypedKey, - count: usize, + node_count: usize, fanout: usize, timeout_us: TimestampDuration, call_routine: C, check_done: D, ) -> Arc { let context = Mutex::new(FanoutContext { - closest_nodes: Vec::with_capacity(count), + closest_nodes: Vec::with_capacity(node_count), called_nodes: TypedKeySet::new(), result: None, }); @@ -56,7 +56,7 @@ where node_id, crypto_kind: node_id.kind, context, - count, + node_count, fanout, timeout_us, call_routine, @@ -81,7 +81,7 @@ where self.routing_table .sort_and_clean_closest_noderefs(self.node_id, &mut ctx.closest_nodes); - ctx.closest_nodes.truncate(self.count); + ctx.closest_nodes.truncate(self.node_count); } fn remove_node(self: Arc, dead_node: NodeRef) { @@ -98,7 +98,7 @@ where fn get_next_node(self: Arc) -> Option { let mut next_node = None; let mut ctx = self.context.lock(); - for cn in &ctx.closest_nodes { + for cn in ctx.closest_nodes.clone() { if let Some(key) = cn.node_ids().get(self.crypto_kind) { if !ctx.called_nodes.contains(&key) { // New fanout call candidate found @@ -150,13 +150,16 @@ where } Err(e) => { // Error happened, abort everything and return the error + let mut ctx = self.context.lock(); + ctx.result = Some(Err(e)); + return; } }; } } fn init_closest_nodes(self: Arc) { - // Get the 'count' closest nodes to the key out of our routing table + // Get the 'node_count' closest nodes to the key out of our routing table let closest_nodes = { let routing_table = self.routing_table.clone(); @@ -181,7 +184,7 @@ where NodeRef::new(routing_table.clone(), v.unwrap().clone(), None) }; - routing_table.find_closest_nodes(self.count, self.node_id, filters, transform) + routing_table.find_closest_nodes(self.node_count, self.node_id, filters, transform) }; let mut ctx = self.context.lock(); @@ -189,6 +192,14 @@ where } pub async fn run(self: Arc) -> TimeoutOr, RPCError>> { + // Get timeout in milliseconds + let timeout_ms = match us_to_ms(self.timeout_us.as_u64()).map_err(RPCError::internal) { + Ok(v) => v, + Err(e) => { + return TimeoutOr::value(Err(e)); + } + }; + // Initialize closest nodes list self.clone().init_closest_nodes(); @@ -208,7 +219,7 @@ where } } // Wait for them to complete - timeout((self.timeout_us.as_u64() / 1000u64) as u32, async { + timeout(timeout_ms, async { while let Some(_) = unord.next().await {} }) .await diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 702fd73c..663f54b2 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -482,9 +482,10 @@ impl RPCProcessor { } // If nobody knows where this node is, ask the DHT for it - let (count, fanout, timeout) = { + let (node_count, _consensus_count, fanout, timeout) = { let c = this.config.get(); ( + c.network.dht.max_find_node_count as usize, c.network.dht.resolve_node_count as usize, c.network.dht.resolve_node_fanout as usize, TimestampDuration::from(ms_to_us(c.network.dht.resolve_node_timeout_ms)), @@ -493,7 +494,7 @@ impl RPCProcessor { // Search in preferred cryptosystem order let nr = match this - .search_dht_single_key(node_id, count, fanout, timeout, safety_selection) + .search_dht_single_key(node_id, node_count, fanout, timeout, safety_selection) .await { TimeoutOr::Timeout => None, @@ -1136,7 +1137,7 @@ impl RPCProcessor { })) } - // Issue a statement over the network, possibly using an anonymized route + /// Issue a statement over the network, possibly using an anonymized route #[instrument(level = "debug", skip(self, statement), err)] async fn statement( &self, @@ -1192,9 +1193,8 @@ impl RPCProcessor { Ok(NetworkResult::value(())) } - - // Issue a reply over the network, possibly using an anonymized route - // The request must want a response, or this routine fails + /// Issue a reply over the network, possibly using an anonymized route + /// The request must want a response, or this routine fails #[instrument(level = "debug", skip(self, request, answer), err)] async fn answer( &self, @@ -1253,6 +1253,9 @@ impl RPCProcessor { Ok(NetworkResult::value(())) } + /// Decoding RPC from the wire + /// This performs a capnp decode on the data, and if it passes the capnp schema + /// it performs the cryptographic validation required to pass the operation up for processing fn decode_rpc_operation( &self, encoded_msg: &RPCMessageEncoded, @@ -1270,6 +1273,12 @@ impl RPCProcessor { Ok(operation) } + /// Cryptographic RPC validation + /// We do this as part of the RPC network layer to ensure that any RPC operations that are + /// processed have already been validated cryptographically and it is not the job of the + /// caller or receiver. This does not mean the operation is 'semantically correct'. For + /// complex operations that require stateful validation and a more robust context than + /// 'signatures', the caller must still perform whatever validation is necessary fn validate_rpc_operation(&self, operation: &mut RPCOperation) -> Result<(), RPCError> { // If this is an answer, get the question context for this answer // If we received an answer for a question we did not ask, this will return an error diff --git a/veilid-core/src/rpc_processor/operation_waiter.rs b/veilid-core/src/rpc_processor/operation_waiter.rs index e2da9e9c..2f39e200 100644 --- a/veilid-core/src/rpc_processor/operation_waiter.rs +++ b/veilid-core/src/rpc_processor/operation_waiter.rs @@ -139,8 +139,7 @@ where mut handle: OperationWaitHandle, timeout_us: TimestampDuration, ) -> Result, RPCError> { - let timeout_ms = u32::try_from(timeout_us.as_u64() / 1000u64) - .map_err(|e| RPCError::map_internal("invalid timeout")(e))?; + let timeout_ms = us_to_ms(timeout_us.as_u64()).map_err(RPCError::internal)?; // Take the instance // After this, we must manually cancel since the cancel on handle drop is disabled diff --git a/veilid-core/src/rpc_processor/rpc_find_node.rs b/veilid-core/src/rpc_processor/rpc_find_node.rs index b96a7ee8..a910dc1b 100644 --- a/veilid-core/src/rpc_processor/rpc_find_node.rs +++ b/veilid-core/src/rpc_processor/rpc_find_node.rs @@ -84,13 +84,17 @@ impl RPCProcessor { let find_node_q = match kind { RPCOperationKind::Question(q) => match q.destructure() { (_, RPCQuestionDetail::FindNodeQ(q)) => q, - _ => panic!("not a status question"), + _ => panic!("not a findnode question"), }, _ => panic!("not a question"), }; + let node_id = find_node_q.destructure(); // add node information for the requesting node to our routing table let routing_table = self.routing_table(); + +xxx move this into routing table code, also do getvalue code + let Some(own_peer_info) = routing_table.get_own_peer_info(RoutingDomain::PublicInternet) else { // Our own node info is not yet available, drop this request. return Ok(NetworkResult::service_unavailable()); @@ -114,7 +118,6 @@ impl RPCProcessor { c.network.dht.max_find_node_count as usize }; - let node_id = find_node_q.destructure(); let closest_nodes = routing_table.find_closest_nodes( node_count, node_id, diff --git a/veilid-core/src/rpc_processor/rpc_get_value.rs b/veilid-core/src/rpc_processor/rpc_get_value.rs index 9fe2dcaa..f2cc5f7d 100644 --- a/veilid-core/src/rpc_processor/rpc_get_value.rs +++ b/veilid-core/src/rpc_processor/rpc_get_value.rs @@ -9,8 +9,12 @@ pub struct GetValueAnswer { } impl RPCProcessor { - // Sends a get value request and wait for response - // Can be sent via all methods including relays and routes + /// Sends a get value request and wait for response + /// Can be sent via all methods including relays + /// Safety routes may be used, but never private routes. + /// Because this leaks information about the identity of the node itself, + /// replying to this request received over a private route will leak + /// the identity of the node and defeat the private route. #[instrument(level = "trace", skip(self), ret, err)] pub async fn rpc_call_get_value( self, @@ -19,6 +23,19 @@ impl RPCProcessor { subkey: ValueSubkey, last_descriptor: Option, ) -> Result>, RPCError> { + // Ensure destination never has a private route + if matches!( + dest, + Destination::PrivateRoute { + private_route: _, + safety_selection: _ + } + ) { + return Err(RPCError::internal( + "Never send get value requests over private routes", + )); + } + let get_value_q = RPCOperationGetValueQ::new(key, subkey, last_descriptor.is_none()); let question = RPCQuestion::new( network_result_try!(self.get_destination_respond_to(&dest)?), @@ -73,6 +90,92 @@ impl RPCProcessor { &self, msg: RPCMessage, ) -> Result, RPCError> { - Err(RPCError::unimplemented("process_get_value_q")) + // Ensure this never came over a private route, safety route is okay though + match &msg.header.detail { + RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {} + RPCMessageHeaderDetail::PrivateRouted(_) => { + return Ok(NetworkResult::invalid_message( + "not processing get value request over private route", + )) + } + } + + // Get the question + let kind = msg.operation.kind().clone(); + let get_value_q = match kind { + RPCOperationKind::Question(q) => match q.destructure() { + (_, RPCQuestionDetail::GetValueQ(q)) => q, + _ => panic!("not a getvalue question"), + }, + _ => panic!("not a question"), + }; + + // Destructure + let (key, subkey, want_descriptor) = get_value_q.destructure(); + + // add node information for the requesting node to our routing table + let crypto_kind = key.kind; + let routing_table = self.routing_table(); + let own_node_id = routing_table.node_id(crypto_kind); + + // find N nodes closest to the target node in our routing table + // ensure the nodes returned are only the ones closer to the target node than ourself + let Some(vcrypto) = self.crypto.get(crypto_kind) else { + return Ok(NetworkResult::invalid_message("unsupported cryptosystem")); + }; + let own_distance = vcrypto.distance(&own_node_id.value, &key.value); + + let filter = Box::new( + move |rti: &RoutingTableInner, opt_entry: Option>| { + // Exclude our own node + let Some(entry) = opt_entry else { + return false; + }; + // Ensure only things that are valid/signed in the PublicInternet domain are returned + if !rti.filter_has_valid_signed_node_info( + RoutingDomain::PublicInternet, + true, + Some(entry.clone()), + ) { + return false; + } + // Ensure things further from the key than our own node are not included + let Some(entry_node_id) = entry.with(rti, |_rti, e| e.node_ids().get(crypto_kind)) else { + return false; + }; + let entry_distance = vcrypto.distance(&entry_node_id.value, &key.value); + if entry_distance >= own_distance { + return false; + } + + true + }, + ) as RoutingTableEntryFilter; + let filters = VecDeque::from([filter]); + + let node_count = { + let c = self.config.get(); + c.network.dht.max_find_node_count as usize + }; + + // + let closest_nodes = routing_table.find_closest_nodes( + node_count, + key, + filters, + // transform + |rti, entry| { + entry.unwrap().with(rti, |_rti, e| { + e.make_peer_info(RoutingDomain::PublicInternet).unwrap() + }) + }, + ); + + // Make status answer + let find_node_a = RPCOperationFindNodeA::new(closest_nodes)?; + + // Send status answer + self.answer(msg, RPCAnswer::new(RPCAnswerDetail::FindNodeA(find_node_a))) + .await } } diff --git a/veilid-core/src/storage_manager/do_get_value.rs b/veilid-core/src/storage_manager/do_get_value.rs index 5d9fd846..c6567d7b 100644 --- a/veilid-core/src/storage_manager/do_get_value.rs +++ b/veilid-core/src/storage_manager/do_get_value.rs @@ -1,46 +1,75 @@ use super::*; +/// The result of the do_get_value_operation pub struct DoGetValueResult { + /// The subkey value if we got one pub value: Option, + /// The descriptor if we got a fresh one or empty if no descriptor was needed pub descriptor: Option, } +/// The context of the do_get_value operation +struct DoGetValueContext { + /// The latest value of the subkey, may be the value passed in + pub value: Option, + /// The consensus count for the value we have received + pub value_count: usize, + /// The descriptor if we got a fresh one or empty if no descriptor was needed + pub descriptor: Option, + /// The parsed schema from the descriptor if we have one + pub schema: Option, +} + impl StorageManager { pub async fn do_get_value( &self, - mut inner: AsyncMutexGuardArc, + rpc_processor: RPCProcessor, key: TypedKey, subkey: ValueSubkey, - min_seq: ValueSeqNum, + last_value: Option, last_descriptor: Option, safety_selection: SafetySelection, - ) -> Result, VeilidAPIError> { - let Some(rpc_processor) = inner.rpc_processor.clone() else { - apibail_not_initialized!(); - }; - + ) -> Result { let routing_table = rpc_processor.routing_table(); // Get the DHT parameters for 'GetValue' - let (count, fanout, timeout) = { + let (key_count, consensus_count, fanout, timeout_us) = { let c = self.unlocked_inner.config.get(); ( + c.network.dht.max_find_node_count as usize, c.network.dht.get_value_count as usize, c.network.dht.get_value_fanout as usize, TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)), ) }; + // Make do-get-value answer context + let schema = if let Some(d) = &last_descriptor { + Some(d.schema()?) + } else { + None + }; + let context = Arc::new(Mutex::new(DoGetValueContext { + value: last_value, + value_count: 0, + descriptor: last_descriptor.clone(), + schema, + })); + // Routine to call to generate fanout let call_routine = |next_node: NodeRef| { let rpc_processor = rpc_processor.clone(); + let context = context.clone(); + let last_descriptor = last_descriptor.clone(); async move { match rpc_processor .clone() .rpc_call_get_value( Destination::direct(next_node).with_safety(safety_selection), - key, subkey, last_descriptor + key, + subkey, + last_descriptor, ) .await { @@ -49,12 +78,61 @@ impl StorageManager { // Any other failures, just try the next node return Ok(None); }); - + + // Keep the descriptor if we got one. If we had a last_descriptor it will + // already be validated by rpc_call_get_value + if let Some(descriptor) = v.answer.descriptor { + let mut ctx = context.lock(); + if ctx.descriptor.is_none() && ctx.schema.is_none() { + ctx.schema = + Some(descriptor.schema().map_err(RPCError::invalid_format)?); + ctx.descriptor = Some(descriptor); + } + } + // Keep the value if we got one and it is newer and it passes schema validation if let Some(value) = v.answer.value { - // See if this is even a candidate - if value.value_data(). xxx apply min_seq and also to OperationGetValueQ - // Validate with scheam + let mut ctx = context.lock(); + + // Ensure we have a schema and descriptor + let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else { + // Got a value but no descriptor for it + // Move to the next node + return Ok(None); + }; + + // Validate with schema + if !schema.check_subkey_value_data( + descriptor.owner(), + subkey, + value.value_data(), + ) { + // Validation failed, ignore this value + // Move to the next node + return Ok(None); + } + + // If we have a prior value, see if this is a newer sequence number + if let Some(prior_value) = &ctx.value { + let prior_seq = prior_value.value_data().seq(); + let new_seq = value.value_data().seq(); + + if new_seq == prior_seq { + // If sequence number is the same, the data should be the same + if prior_value.value_data() != value.value_data() { + // Move to the next node + return Ok(None); + } + // Increase the consensus count for the existing value + ctx.value_count += 1; + } else if new_seq > prior_seq { + // If the sequence number is greater, go with it + ctx.value = Some(value); + ctx.value_count = 1; + } else { + // If the sequence number is older, ignore it + } + } } // Return peers if we have some @@ -66,13 +144,11 @@ impl StorageManager { }; // Routine to call to check if we're done at each step - let check_done = |closest_nodes: &[NodeRef]| { - // If the node we want to locate is one of the closest nodes, return it immediately - if let Some(out) = closest_nodes - .iter() - .find(|x| x.node_ids().contains(&node_id)) - { - return Some(out.clone()); + let check_done = |_closest_nodes: &[NodeRef]| { + // If we have reached sufficient consensus, return done + let ctx = context.lock(); + if ctx.value.is_some() && ctx.descriptor.is_some() && ctx.value_count >= consensus_count { + return Some(()); } None }; @@ -80,28 +156,33 @@ impl StorageManager { // Call the fanout let fanout_call = FanoutCall::new( routing_table.clone(), - node_id, - count, + key, + key_count, fanout, timeout_us, call_routine, check_done, ); - fanout_call.run().await - - // Search in preferred cryptosystem order - let nr = this - .search_dht_single_key(node_id, count, fanout, timeout, safety_selection) - .await?; - - if let Some(nr) = &nr { - if nr.node_ids().contains(&node_id) { - // found a close node, but not exact within our configured resolve_node timeout - return Ok(None); + match fanout_call.run().await { + // If we don't finish in the timeout (too much time passed checking for consensus) + TimeoutOr::Timeout | + // If we finished with consensus (enough nodes returning the same value) + TimeoutOr::Value(Ok(Some(()))) | + // If we finished without consensus (ran out of nodes before getting consensus) + TimeoutOr::Value(Ok(None)) => { + // Return the best answer we've got + let ctx = context.lock(); + Ok(DoGetValueResult{ + value: ctx.value.clone(), + descriptor: ctx.descriptor.clone(), + }) + } + // Failed + TimeoutOr::Value(Err(e)) => { + // If we finished with an error, return that + Err(e.into()) } } - - Ok(nr) } } diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index ef03e04c..86f7301c 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -203,7 +203,7 @@ impl StorageManager { /// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ] fn get_key(vcrypto: CryptoSystemVersion, record: &Record) -> TypedKey where - D: RkyvArchive + RkyvSerialize, + D: Clone + RkyvArchive + RkyvSerialize, for<'t> ::Archived: CheckBytes>, ::Archived: RkyvDeserialize, { @@ -237,6 +237,11 @@ impl StorageManager { apibail_generic!("unsupported cryptosystem"); }; + // Get local record store + let Some(local_record_store) = inner.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + // Compile the dht schema let schema_data = schema.compile(); @@ -257,7 +262,6 @@ impl StorageManager { let record = Record::::new(cur_ts, signed_value_descriptor, local_record_detail)?; - let local_record_store = inner.local_record_store.as_mut().unwrap(); let dht_key = Self::get_key(vcrypto.clone(), &record); local_record_store.new_record(dht_key, record).await?; @@ -266,23 +270,16 @@ impl StorageManager { .await } - async fn open_record_inner( + fn open_record_inner_check_existing( &self, mut inner: AsyncMutexGuardArc, key: TypedKey, writer: Option, safety_selection: SafetySelection, - ) -> Result { - // Ensure the record is closed - if inner.opened_records.contains_key(&key) { - return Err(VeilidAPIError::generic( - "record is already open and should be closed first", - )); - } - - // Get cryptosystem - let Some(vcrypto) = self.unlocked_inner.crypto.get(key.kind) else { - apibail_generic!("unsupported cryptosystem"); + ) -> Option> { + // Get local record store + let Some(local_record_store) = inner.local_record_store.as_mut() else { + return Some(Err(VeilidAPIError::not_initialized())); }; // See if we have a local record already or not @@ -295,45 +292,124 @@ impl StorageManager { // Return record details (r.owner().clone(), r.schema()) }; - if let Some((owner, schema)) = inner - .local_record_store - .as_mut() - .unwrap() - .with_record_mut(key, cb) - { - // Had local record + let Some((owner, schema)) = local_record_store.with_record_mut(key, cb) else { + return None; + }; + // Had local record - // If the writer we chose is also the owner, we have the owner secret - // Otherwise this is just another subkey writer - let owner_secret = if let Some(writer) = writer { - if writer.key == owner { - Some(writer.secret) - } else { - None - } + // If the writer we chose is also the owner, we have the owner secret + // Otherwise this is just another subkey writer + let owner_secret = if let Some(writer) = writer { + if writer.key == owner { + Some(writer.secret) } else { None - }; - - // Write open record - inner.opened_records.insert(key, OpenedRecord::new(writer)); - - // Make DHT Record Descriptor to return - let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema); - Ok(descriptor) + } } else { - // No record yet, try to get it from the network - self.do_get_value(inner, key, 0, safety_selection).await + None + }; - // Make DHT Record Descriptor to return - // let descriptor = DHTRecordDescriptor { - // key, - // owner, - // owner_secret, - // schema, - // }; - // Ok(descriptor) + // Write open record + inner.opened_records.insert(key, OpenedRecord::new(writer)); + + // Make DHT Record Descriptor to return + let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema); + Some(Ok(descriptor)) + } + + async fn open_record_inner( + &self, + inner: AsyncMutexGuardArc, + key: TypedKey, + writer: Option, + safety_selection: SafetySelection, + ) -> Result { + // Ensure the record is closed + if inner.opened_records.contains_key(&key) { + apibail_generic!("record is already open and should be closed first"); } + + // See if we have a local record already or not + if let Some(res) = + self.open_record_inner_check_existing(inner, key, writer, safety_selection) + { + return res; + } + + // No record yet, try to get it from the network + + // Get rpc processor and drop mutex so we don't block while getting the value from the network + let rpc_processor = { + let inner = self.lock().await?; + let Some(rpc_processor) = inner.rpc_processor.clone() else { + // Offline, try again later + apibail_try_again!(); + }; + rpc_processor + }; + + // No last descriptor, no last value + let subkey: ValueSubkey = 0; + let result = self + .do_get_value(rpc_processor, key, subkey, None, None, safety_selection) + .await?; + + // If we got nothing back, the key wasn't found + if result.value.is_none() && result.descriptor.is_none() { + // No result + apibail_key_not_found!(key); + }; + + // Must have descriptor + let Some(signed_value_descriptor) = result.descriptor else { + // No descriptor for new record, can't store this + apibail_generic!("no descriptor"); + }; + + let owner = signed_value_descriptor.owner().clone(); + // If the writer we chose is also the owner, we have the owner secret + // Otherwise this is just another subkey writer + let owner_secret = if let Some(writer) = writer { + if writer.key == owner { + Some(writer.secret) + } else { + None + } + } else { + None + }; + let schema = signed_value_descriptor.schema()?; + + // Reopen inner to store value we just got + let mut inner = self.lock().await?; + + // Get local record store + let Some(local_record_store) = inner.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + + // Make and store a new record for this descriptor + let record = Record::::new( + get_aligned_timestamp(), + signed_value_descriptor, + LocalRecordDetail { safety_selection }, + )?; + local_record_store.new_record(key, record).await?; + + // If we got a subkey with the getvalue, it has already been validated against the schema, so store it + if let Some(signed_value_data) = result.value { + // Write subkey to local store + local_record_store + .set_subkey(key, subkey, signed_value_data) + .await?; + } + + // Write open record + inner.opened_records.insert(key, OpenedRecord::new(writer)); + + // Make DHT Record Descriptor to return + let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema); + Ok(descriptor) } pub async fn open_record( @@ -349,32 +425,34 @@ impl StorageManager { async fn close_record_inner( &self, - mut inner: AsyncMutexGuardArc, + inner: &mut AsyncMutexGuardArc, key: TypedKey, ) -> Result<(), VeilidAPIError> { - let Some(opened_record) = inner.opened_records.remove(&key) else { + let Some(_opened_record) = inner.opened_records.remove(&key) else { apibail_generic!("record not open"); }; Ok(()) } pub async fn close_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> { - let inner = self.lock().await?; - self.close_record_inner(inner, key).await + let mut inner = self.lock().await?; + self.close_record_inner(&mut inner, key).await } pub async fn delete_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> { - let inner = self.lock().await?; + let mut inner = self.lock().await?; // Ensure the record is closed if inner.opened_records.contains_key(&key) { - self.close_record_inner(inner, key).await?; + self.close_record_inner(&mut inner, key).await?; } - // Remove the record from the local store - //inner.local_record_store.unwrap().de + let Some(local_record_store) = inner.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; - unimplemented!(); + // Remove the record from the local store + local_record_store.delete_record(key).await } pub async fn get_value( diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index 966de820..dc30b35b 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -9,7 +9,7 @@ use hashlink::LruCache; pub struct RecordStore where - D: RkyvArchive + RkyvSerialize, + D: Clone + RkyvArchive + RkyvSerialize, for<'t> ::Archived: CheckBytes>, ::Archived: RkyvDeserialize, { @@ -32,7 +32,7 @@ where impl RecordStore where - D: RkyvArchive + RkyvSerialize, + D: Clone + RkyvArchive + RkyvSerialize, for<'t> ::Archived: CheckBytes>, ::Archived: RkyvDeserialize, { @@ -169,6 +169,11 @@ where let st_xact = subkey_table.transact(); let dead_records = mem::take(&mut self.dead_records); for (k, v) in dead_records { + // Record should already be gone from index + if self.record_index.contains_key(&k) { + log_stor!(error "dead record found in index: {:?}", k); + } + // Delete record rt_xact.delete(0, &k.bytes()); @@ -205,7 +210,6 @@ where } let record_table = self.record_table.clone().unwrap(); - let subkey_table = self.subkey_table.clone().unwrap(); let rt_xact = record_table.transact(); let changed_records = mem::take(&mut self.changed_records); @@ -277,6 +281,22 @@ where Ok(()) } + pub async fn delete_record(&mut self, key: TypedKey) -> Result<(), VeilidAPIError> { + // Get the record table key + let rtk = RecordTableKey { key }; + + // Remove record from the index + let Some(record) = self.record_index.remove(&rtk) else { + apibail_key_not_found!(key); + }; + + self.add_dead_record(rtk, record); + + self.purge_dead_records(false).await; + + Ok(()) + } + pub fn with_record(&mut self, key: TypedKey, f: F) -> Option where F: FnOnce(&Record) -> R, @@ -319,7 +339,7 @@ where out } - pub async fn get_subkey( + pub async fn get_subkey( &mut self, key: TypedKey, subkey: ValueSubkey, @@ -371,7 +391,7 @@ where return Ok(None); } - pub async fn set_subkey( + pub async fn set_subkey( &mut self, key: TypedKey, subkey: ValueSubkey, diff --git a/veilid-core/src/storage_manager/types/record.rs b/veilid-core/src/storage_manager/types/record.rs index 51a2edd0..eab24579 100644 --- a/veilid-core/src/storage_manager/types/record.rs +++ b/veilid-core/src/storage_manager/types/record.rs @@ -6,7 +6,7 @@ use super::*; #[archive_attr(repr(C), derive(CheckBytes))] pub struct Record where - D: RkyvArchive + RkyvSerialize, + D: Clone + RkyvArchive + RkyvSerialize, for<'t> ::Archived: CheckBytes>, ::Archived: RkyvDeserialize, { @@ -19,7 +19,7 @@ where impl Record where - D: RkyvArchive + RkyvSerialize, + D: Clone + RkyvArchive + RkyvSerialize, for<'t> ::Archived: CheckBytes>, ::Archived: RkyvDeserialize, { diff --git a/veilid-core/src/tests/common/test_veilid_config.rs b/veilid-core/src/tests/common/test_veilid_config.rs index 43a08534..8fa4a634 100644 --- a/veilid-core/src/tests/common/test_veilid_config.rs +++ b/veilid-core/src/tests/common/test_veilid_config.rs @@ -207,16 +207,16 @@ fn config_callback(key: String) -> ConfigCallbackReturn { "network.rpc.timeout_ms" => Ok(Box::new(5_000u32)), "network.rpc.max_route_hop_count" => Ok(Box::new(4u8)), "network.rpc.default_route_hop_count" => Ok(Box::new(1u8)), - "network.dht.resolve_node_timeout_ms" => Ok(Box::new(10_000u32)), - "network.dht.resolve_node_count" => Ok(Box::new(20u32)), - "network.dht.resolve_node_fanout" => Ok(Box::new(3u32)), "network.dht.max_find_node_count" => Ok(Box::new(20u32)), + "network.dht.resolve_node_timeout_ms" => Ok(Box::new(10_000u32)), + "network.dht.resolve_node_count" => Ok(Box::new(1u32)), + "network.dht.resolve_node_fanout" => Ok(Box::new(4u32)), "network.dht.get_value_timeout_ms" => Ok(Box::new(10_000u32)), - "network.dht.get_value_count" => Ok(Box::new(20u32)), - "network.dht.get_value_fanout" => Ok(Box::new(3u32)), + "network.dht.get_value_count" => Ok(Box::new(3u32)), + "network.dht.get_value_fanout" => Ok(Box::new(4u32)), "network.dht.set_value_timeout_ms" => Ok(Box::new(10_000u32)), - "network.dht.set_value_count" => Ok(Box::new(20u32)), - "network.dht.set_value_fanout" => Ok(Box::new(5u32)), + "network.dht.set_value_count" => Ok(Box::new(5u32)), + "network.dht.set_value_fanout" => Ok(Box::new(4u32)), "network.dht.min_peer_count" => Ok(Box::new(20u32)), "network.dht.min_peer_refresh_time_ms" => Ok(Box::new(2_000u32)), "network.dht.validate_dial_info_receipt_time_ms" => Ok(Box::new(5_000u32)), @@ -335,15 +335,16 @@ pub async fn test_config() { assert_eq!(inner.network.routing_table.limit_attached_good, 8u32); assert_eq!(inner.network.routing_table.limit_attached_weak, 4u32); + assert_eq!(inner.network.dht.max_find_node_count, 20u32); assert_eq!(inner.network.dht.resolve_node_timeout_ms, 10_000u32); - assert_eq!(inner.network.dht.resolve_node_count, 20u32); - assert_eq!(inner.network.dht.resolve_node_fanout, 3u32); + assert_eq!(inner.network.dht.resolve_node_count, 1u32); + assert_eq!(inner.network.dht.resolve_node_fanout, 4u32); assert_eq!(inner.network.dht.get_value_timeout_ms, 10_000u32); - assert_eq!(inner.network.dht.get_value_count, 20u32); - assert_eq!(inner.network.dht.get_value_fanout, 3u32); + assert_eq!(inner.network.dht.get_value_count, 3u32); + assert_eq!(inner.network.dht.get_value_fanout, 4u32); assert_eq!(inner.network.dht.set_value_timeout_ms, 10_000u32); - assert_eq!(inner.network.dht.set_value_count, 20u32); - assert_eq!(inner.network.dht.set_value_fanout, 5u32); + assert_eq!(inner.network.dht.set_value_count, 5u32); + assert_eq!(inner.network.dht.set_value_fanout, 4u32); assert_eq!(inner.network.dht.min_peer_count, 20u32); assert_eq!(inner.network.dht.min_peer_refresh_time_ms, 2_000u32); assert_eq!( diff --git a/veilid-core/src/veilid_api/error.rs b/veilid-core/src/veilid_api/error.rs index f2ae12e4..6978cb87 100644 --- a/veilid-core/src/veilid_api/error.rs +++ b/veilid-core/src/veilid_api/error.rs @@ -72,6 +72,14 @@ macro_rules! apibail_no_connection { }; } +#[allow(unused_macros)] +#[macro_export] +macro_rules! apibail_key_not_found { + ($x:expr) => { + return Err(VeilidAPIError::key_not_found($x)) + }; +} + #[allow(unused_macros)] #[macro_export] macro_rules! apibail_invalid_target { @@ -127,8 +135,8 @@ pub enum VeilidAPIError { InvalidTarget, #[error("No connection: {message}")] NoConnection { message: String }, - #[error("No peer info: {node_id}")] - NoPeerInfo { node_id: TypedKey }, + #[error("Key not found: {key}")] + KeyNotFound { key: TypedKey }, #[error("Internal: {message}")] Internal { message: String }, #[error("Unimplemented: {message}")] @@ -171,8 +179,8 @@ impl VeilidAPIError { message: msg.to_string(), } } - pub fn no_peer_info(node_id: TypedKey) -> Self { - Self::NoPeerInfo { node_id } + pub fn key_not_found(key: TypedKey) -> Self { + Self::KeyNotFound { key } } pub fn internal(msg: T) -> Self { Self::Internal { diff --git a/veilid-core/src/veilid_api/types/dht/schema/dflt.rs b/veilid-core/src/veilid_api/types/dht/schema/dflt.rs index 332ecd90..bd9c0858 100644 --- a/veilid-core/src/veilid_api/types/dht/schema/dflt.rs +++ b/veilid-core/src/veilid_api/types/dht/schema/dflt.rs @@ -42,6 +42,29 @@ impl DHTSchemaDFLT { pub fn data_size(&self) -> usize { 0 } + + /// Check a subkey value data against the schema + pub fn check_subkey_value_data( + &self, + owner: &PublicKey, + subkey: ValueSubkey, + value_data: &ValueData, + ) -> bool { + let subkey = subkey as usize; + + // Check is subkey is in owner range + if subkey < (self.o_cnt as usize) { + // Check value data has valid writer + if value_data.writer() == owner { + return true; + } + // Wrong writer + return false; + } + + // Subkey out of range + false + } } impl TryFrom<&[u8]> for DHTSchemaDFLT { diff --git a/veilid-core/src/veilid_api/types/dht/schema/mod.rs b/veilid-core/src/veilid_api/types/dht/schema/mod.rs index 160f1422..b043afcc 100644 --- a/veilid-core/src/veilid_api/types/dht/schema/mod.rs +++ b/veilid-core/src/veilid_api/types/dht/schema/mod.rs @@ -58,6 +58,19 @@ impl DHTSchema { DHTSchema::SMPL(s) => s.data_size(), } } + + /// Check a subkey value data against the schema + pub fn check_subkey_value_data( + &self, + owner: &PublicKey, + subkey: ValueSubkey, + value_data: &ValueData, + ) -> bool { + match self { + DHTSchema::DFLT(d) => d.check_subkey_value_data(owner, subkey, value_data), + DHTSchema::SMPL(s) => s.check_subkey_value_data(owner, subkey, value_data), + } + } } impl Default for DHTSchema { diff --git a/veilid-core/src/veilid_api/types/dht/schema/smpl.rs b/veilid-core/src/veilid_api/types/dht/schema/smpl.rs index 1ac4a17b..90b20b86 100644 --- a/veilid-core/src/veilid_api/types/dht/schema/smpl.rs +++ b/veilid-core/src/veilid_api/types/dht/schema/smpl.rs @@ -18,7 +18,7 @@ use super::*; pub struct DHTSchemaSMPLMember { /// Member key pub m_key: PublicKey, - /// Member subkey countanyway, + /// Member subkey count pub m_cnt: u16, } @@ -78,6 +78,44 @@ impl DHTSchemaSMPL { pub fn data_size(&self) -> usize { self.members.len() * mem::size_of::() } + + /// Check a subkey value data against the schema + pub fn check_subkey_value_data( + &self, + owner: &PublicKey, + subkey: ValueSubkey, + value_data: &ValueData, + ) -> bool { + let mut cur_subkey = subkey as usize; + + // Check is subkey is in owner range + if cur_subkey < (self.o_cnt as usize) { + // Check value data has valid writer + if value_data.writer() == owner { + return true; + } + // Wrong writer + return false; + } + cur_subkey -= self.o_cnt as usize; + + // Check all member ranges + for m in &self.members { + // Check if subkey is in member range + if cur_subkey < (m.m_cnt as usize) { + // Check value data has valid writer + if value_data.writer() == &m.m_key { + return true; + } + // Wrong writer + return false; + } + cur_subkey -= m.m_cnt as usize; + } + + // Subkey out of range + false + } } impl TryFrom<&[u8]> for DHTSchemaSMPL { diff --git a/veilid-core/src/veilid_config.rs b/veilid-core/src/veilid_config.rs index c6dc1ddf..46b14245 100644 --- a/veilid-core/src/veilid_config.rs +++ b/veilid-core/src/veilid_config.rs @@ -275,10 +275,10 @@ pub struct VeilidConfigTLS { RkyvDeserialize, )] pub struct VeilidConfigDHT { + pub max_find_node_count: u32, pub resolve_node_timeout_ms: u32, pub resolve_node_count: u32, pub resolve_node_fanout: u32, - pub max_find_node_count: u32, pub get_value_timeout_ms: u32, pub get_value_count: u32, pub get_value_fanout: u32, @@ -653,10 +653,10 @@ impl VeilidConfig { get_config!(inner.network.routing_table.limit_attached_strong); get_config!(inner.network.routing_table.limit_attached_good); get_config!(inner.network.routing_table.limit_attached_weak); + get_config!(inner.network.dht.max_find_node_count); get_config!(inner.network.dht.resolve_node_timeout_ms); get_config!(inner.network.dht.resolve_node_count); get_config!(inner.network.dht.resolve_node_fanout); - get_config!(inner.network.dht.max_find_node_count); get_config!(inner.network.dht.get_value_timeout_ms); get_config!(inner.network.dht.get_value_count); get_config!(inner.network.dht.get_value_fanout); diff --git a/veilid-flutter/lib/veilid.dart b/veilid-flutter/lib/veilid.dart index 8ab3b705..387d0bba 100644 --- a/veilid-flutter/lib/veilid.dart +++ b/veilid-flutter/lib/veilid.dart @@ -631,10 +631,10 @@ class VeilidConfigDHT { Map get json { return { + 'max_find_node_count': maxFindNodeCount, 'resolve_node_timeout_ms': resolveNodeTimeoutMs, 'resolve_node_count': resolveNodeCount, 'resolve_node_fanout': resolveNodeFanout, - 'max_find_node_count': maxFindNodeCount, 'get_value_timeout_ms': getValueTimeoutMs, 'get_value_count': getValueCount, 'get_value_fanout': getValueFanout, @@ -1557,17 +1557,25 @@ abstract class VeilidAPIException implements Exception { { return VeilidAPIExceptionTimeout(); } + case "TryAgain": + { + return VeilidAPIExceptionTryAgain(); + } case "Shutdown": { return VeilidAPIExceptionShutdown(); } - case "NodeNotFound": + case "InvalidTarget": { - return VeilidAPIExceptionNodeNotFound(json["node_id"]); + return VeilidAPIExceptionInvalidTarget(); } - case "NoDialInfo": + case "NoConnection": { - return VeilidAPIExceptionNoDialInfo(json["node_id"]); + return VeilidAPIExceptionNoConnection(json["message"]); + } + case "KeyNotFound": + { + return VeilidAPIExceptionKeyNotFound(json["key"]); } case "Internal": { @@ -1642,6 +1650,18 @@ class VeilidAPIExceptionTimeout implements VeilidAPIException { } } +class VeilidAPIExceptionTryAgain implements VeilidAPIException { + @override + String toString() { + return "VeilidAPIException: TryAgain"; + } + + @override + String toDisplayError() { + return "Try again"; + } +} + class VeilidAPIExceptionShutdown implements VeilidAPIException { @override String toString() { @@ -1654,38 +1674,50 @@ class VeilidAPIExceptionShutdown implements VeilidAPIException { } } -class VeilidAPIExceptionNodeNotFound implements VeilidAPIException { - final String nodeId; - +class VeilidAPIExceptionInvalidTarget implements VeilidAPIException { @override String toString() { - return "VeilidAPIException: NodeNotFound (nodeId: $nodeId)"; + return "VeilidAPIException: InvalidTarget"; } @override String toDisplayError() { - return "Node node found: $nodeId"; + return "Invalid target"; } - - // - VeilidAPIExceptionNodeNotFound(this.nodeId); } -class VeilidAPIExceptionNoDialInfo implements VeilidAPIException { - final String nodeId; +class VeilidAPIExceptionNoConnection implements VeilidAPIException { + final String message; @override String toString() { - return "VeilidAPIException: NoDialInfo (nodeId: $nodeId)"; + return "VeilidAPIException: NoConnection (message: $message)"; } @override String toDisplayError() { - return "No dial info: $nodeId"; + return "No connection: $message"; } // - VeilidAPIExceptionNoDialInfo(this.nodeId); + VeilidAPIExceptionNoConnection(this.message); +} + +class VeilidAPIExceptionKeyNotFound implements VeilidAPIException { + final String key; + + @override + String toString() { + return "VeilidAPIException: KeyNotFound (key: $key)"; + } + + @override + String toDisplayError() { + return "Key not found: $key"; + } + + // + VeilidAPIExceptionKeyNotFound(this.key); } class VeilidAPIExceptionInternal implements VeilidAPIException { diff --git a/veilid-server/src/settings.rs b/veilid-server/src/settings.rs index da8a1d7c..a53c1a31 100644 --- a/veilid-server/src/settings.rs +++ b/veilid-server/src/settings.rs @@ -83,16 +83,16 @@ core: max_route_hop_count: 4 default_route_hop_count: 1 dht: - resolve_node_timeout_ms: 10000 - resolve_node_count: 20 - resolve_node_fanout: 3 max_find_node_count: 20 + resolve_node_timeout_ms: 10000 + resolve_node_count: 1 + resolve_node_fanout: 4 get_value_timeout_ms: 10000 - get_value_count: 20 - get_value_fanout: 3 + get_value_count: 3 + get_value_fanout: 4 set_value_timeout_ms: 10000 - set_value_count: 20 - set_value_fanout: 5 + set_value_count: 5 + set_value_fanout: 4 min_peer_count: 20 min_peer_refresh_time_ms: 2000 validate_dial_info_receipt_time_ms: 2000 @@ -510,10 +510,10 @@ pub struct Rpc { #[derive(Debug, Deserialize, Serialize)] pub struct Dht { + pub max_find_node_count: u32, pub resolve_node_timeout_ms: u32, pub resolve_node_count: u32, pub resolve_node_fanout: u32, - pub max_find_node_count: u32, pub get_value_timeout_ms: u32, pub get_value_count: u32, pub get_value_fanout: u32, @@ -974,10 +974,10 @@ impl Settings { set_config_value!(inner.core.network.rpc.timeout_ms, value); set_config_value!(inner.core.network.rpc.max_route_hop_count, value); set_config_value!(inner.core.network.rpc.default_route_hop_count, value); + set_config_value!(inner.core.network.dht.max_find_node_count, value); set_config_value!(inner.core.network.dht.resolve_node_timeout_ms, value); set_config_value!(inner.core.network.dht.resolve_node_count, value); set_config_value!(inner.core.network.dht.resolve_node_fanout, value); - set_config_value!(inner.core.network.dht.max_find_node_count, value); set_config_value!(inner.core.network.dht.get_value_timeout_ms, value); set_config_value!(inner.core.network.dht.get_value_count, value); set_config_value!(inner.core.network.dht.get_value_fanout, value); @@ -1173,6 +1173,9 @@ impl Settings { "network.rpc.default_route_hop_count" => { Ok(Box::new(inner.core.network.rpc.default_route_hop_count)) } + "network.dht.max_find_node_count" => { + Ok(Box::new(inner.core.network.dht.max_find_node_count)) + } "network.dht.resolve_node_timeout_ms" => { Ok(Box::new(inner.core.network.dht.resolve_node_timeout_ms)) } @@ -1182,9 +1185,6 @@ impl Settings { "network.dht.resolve_node_fanout" => { Ok(Box::new(inner.core.network.dht.resolve_node_fanout)) } - "network.dht.max_find_node_count" => { - Ok(Box::new(inner.core.network.dht.max_find_node_count)) - } "network.dht.get_value_timeout_ms" => { Ok(Box::new(inner.core.network.dht.get_value_timeout_ms)) } @@ -1534,16 +1534,16 @@ mod tests { assert_eq!(s.core.network.rpc.max_route_hop_count, 4); assert_eq!(s.core.network.rpc.default_route_hop_count, 1); // - assert_eq!(s.core.network.dht.resolve_node_timeout_ms, 10_000u32); - assert_eq!(s.core.network.dht.resolve_node_count, 20u32); - assert_eq!(s.core.network.dht.resolve_node_fanout, 3u32); assert_eq!(s.core.network.dht.max_find_node_count, 20u32); + assert_eq!(s.core.network.dht.resolve_node_timeout_ms, 10_000u32); + assert_eq!(s.core.network.dht.resolve_node_count, 1u32); + assert_eq!(s.core.network.dht.resolve_node_fanout, 4u32); assert_eq!(s.core.network.dht.get_value_timeout_ms, 10_000u32); - assert_eq!(s.core.network.dht.get_value_count, 20u32); - assert_eq!(s.core.network.dht.get_value_fanout, 3u32); + assert_eq!(s.core.network.dht.get_value_count, 3u32); + assert_eq!(s.core.network.dht.get_value_fanout, 4u32); assert_eq!(s.core.network.dht.set_value_timeout_ms, 10_000u32); - assert_eq!(s.core.network.dht.set_value_count, 20u32); - assert_eq!(s.core.network.dht.set_value_fanout, 5u32); + assert_eq!(s.core.network.dht.set_value_count, 5u32); + assert_eq!(s.core.network.dht.set_value_fanout, 4u32); assert_eq!(s.core.network.dht.min_peer_count, 20u32); assert_eq!(s.core.network.dht.min_peer_refresh_time_ms, 2_000u32); assert_eq!( diff --git a/veilid-tools/src/tools.rs b/veilid-tools/src/tools.rs index a2ecb0fa..4aba1f00 100644 --- a/veilid-tools/src/tools.rs +++ b/veilid-tools/src/tools.rs @@ -130,6 +130,10 @@ pub fn ms_to_us(ms: u32) -> u64 { (ms as u64) * 1000u64 } +pub fn us_to_ms(us: u64) -> EyreResult { + u32::try_from(us / 1000u64).wrap_err("could not convert microseconds") +} + // Calculate retry attempt with logarhythmic falloff pub fn retry_falloff_log( last_us: u64, diff --git a/veilid-wasm/tests/web.rs b/veilid-wasm/tests/web.rs index 2ae8ca70..4ef8a5bc 100644 --- a/veilid-wasm/tests/web.rs +++ b/veilid-wasm/tests/web.rs @@ -52,16 +52,16 @@ fn init_callbacks() { case "network.rpc.timeout": return 10000000; case "network.rpc.max_route_hop_count": return 4; case "network.rpc.default_route_hop_count": return 1; - case "network.dht.resolve_node_timeout": return null; - case "network.dht.resolve_node_count": return 20; - case "network.dht.resolve_node_fanout": return 3; case "network.dht.max_find_node_count": return 20; - case "network.dht.get_value_timeout": return null; - case "network.dht.get_value_count": return 20; - case "network.dht.get_value_fanout": return 3; - case "network.dht.set_value_timeout": return null; - case "network.dht.set_value_count": return 20; - case "network.dht.set_value_fanout": return 5; + case "network.dht.resolve_node_timeout": return 10000; + case "network.dht.resolve_node_count": return 1; + case "network.dht.resolve_node_fanout": return 4; + case "network.dht.get_value_timeout": return 10000; + case "network.dht.get_value_count": return 3; + case "network.dht.get_value_fanout": return 4; + case "network.dht.set_value_timeout": return 10000; + case "network.dht.set_value_count": return 5; + case "network.dht.set_value_fanout": return 4; case "network.dht.min_peer_count": return 20; case "network.dht.min_peer_refresh_time": return 2000000; case "network.dht.validate_dial_info_receipt_time": return 5000000;