diff --git a/veilid-core/src/rpc_processor/fanout_call.rs b/veilid-core/src/rpc_processor/fanout_call.rs index 9399a171..c64c688a 100644 --- a/veilid-core/src/rpc_processor/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout_call.rs @@ -10,10 +10,16 @@ where #[derive(Debug, Copy, Clone)] pub(crate) enum FanoutResultKind { + Partial, Timeout, Finished, Exhausted, } +impl FanoutResultKind { + pub fn is_partial(&self) -> bool { + matches!(self, Self::Partial) + } +} #[derive(Debug, Clone)] pub(crate) struct FanoutResult { @@ -23,6 +29,7 @@ pub(crate) struct FanoutResult { pub(crate) fn debug_fanout_result(result: &FanoutResult) -> String { let kc = match result.kind { + FanoutResultKind::Partial => "P", FanoutResultKind::Timeout => "T", FanoutResultKind::Finished => "F", FanoutResultKind::Exhausted => "E", diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index 3cdf3210..9e0d6fee 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -10,6 +10,8 @@ struct OutboundGetValueContext { pub descriptor: Option>, /// The parsed schema from the descriptor if we have one pub schema: Option, + /// If we should send a partial update with the current contetx + pub send_partial_update: bool, } /// The result of the outbound_get_value operation @@ -29,7 +31,7 @@ impl StorageManager { subkey: ValueSubkey, safety_selection: SafetySelection, last_get_result: GetResult, - ) -> VeilidAPIResult { + ) -> VeilidAPIResult>> { let routing_table = rpc_processor.routing_table(); // Get the DHT parameters for 'GetValue' @@ -49,171 +51,301 @@ impl StorageManager { inner.get_value_nodes(key)?.unwrap_or_default() }; - // Make do-get-value answer context + // Parse the schema let schema = if let Some(d) = &last_get_result.opt_descriptor { Some(d.schema()?) } else { None }; + + // Make the return channel + let (out_tx, out_rx) = flume::unbounded::>(); + + // Make do-get-value answer context let context = Arc::new(Mutex::new(OutboundGetValueContext { value: last_get_result.opt_value, value_nodes: vec![], descriptor: last_get_result.opt_descriptor.clone(), schema, + send_partial_update: false, })); // Routine to call to generate fanout - let call_routine = |next_node: NodeRef| { - let rpc_processor = rpc_processor.clone(); + let call_routine = { let context = context.clone(); - let last_descriptor = last_get_result.opt_descriptor.clone(); - async move { - let gva = network_result_try!( - rpc_processor - .clone() - .rpc_call_get_value( - Destination::direct(next_node.clone()).with_safety(safety_selection), - key, - subkey, - last_descriptor.map(|x| (*x).clone()), - ) - .await? - ); + let rpc_processor = rpc_processor.clone(); + move |next_node: NodeRef| { + let context = context.clone(); + let rpc_processor = rpc_processor.clone(); + let last_descriptor = last_get_result.opt_descriptor.clone(); + async move { + let gva = network_result_try!( + rpc_processor + .clone() + .rpc_call_get_value( + Destination::direct(next_node.clone()) + .with_safety(safety_selection), + key, + subkey, + last_descriptor.map(|x| (*x).clone()), + ) + .await? + ); - // Keep the descriptor if we got one. If we had a last_descriptor it will - // already be validated by rpc_call_get_value - if let Some(descriptor) = gva.answer.descriptor { - let mut ctx = context.lock(); - if ctx.descriptor.is_none() && ctx.schema.is_none() { - let schema = match descriptor.schema() { - Ok(v) => v, - Err(e) => { - return Ok(NetworkResult::invalid_message(e)); - } + // Keep the descriptor if we got one. If we had a last_descriptor it will + // already be validated by rpc_call_get_value + if let Some(descriptor) = gva.answer.descriptor { + let mut ctx = context.lock(); + if ctx.descriptor.is_none() && ctx.schema.is_none() { + let schema = match descriptor.schema() { + Ok(v) => v, + Err(e) => { + return Ok(NetworkResult::invalid_message(e)); + } + }; + ctx.schema = Some(schema); + ctx.descriptor = Some(Arc::new(descriptor)); + } + } + + // Keep the value if we got one and it is newer and it passes schema validation + if let Some(value) = gva.answer.value { + log_dht!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq()); + let mut ctx = context.lock(); + + // Ensure we have a schema and descriptor + let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) + else { + // Got a value but no descriptor for it + // Move to the next node + return Ok(NetworkResult::invalid_message( + "Got value with no descriptor", + )); }; - ctx.schema = Some(schema); - ctx.descriptor = Some(Arc::new(descriptor)); - } - } - // Keep the value if we got one and it is newer and it passes schema validation - if let Some(value) = gva.answer.value { - log_dht!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq()); - let mut ctx = context.lock(); + // Validate with schema + if !schema.check_subkey_value_data( + descriptor.owner(), + subkey, + value.value_data(), + ) { + // Validation failed, ignore this value + // Move to the next node + return Ok(NetworkResult::invalid_message(format!( + "Schema validation failed on subkey {}", + subkey + ))); + } - // Ensure we have a schema and descriptor - let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else { - // Got a value but no descriptor for it - // Move to the next node - return Ok(NetworkResult::invalid_message( - "Got value with no descriptor", - )); - }; + // If we have a prior value, see if this is a newer sequence number + if let Some(prior_value) = &ctx.value { + let prior_seq = prior_value.value_data().seq(); + let new_seq = value.value_data().seq(); - // Validate with schema - if !schema.check_subkey_value_data( - descriptor.owner(), - subkey, - value.value_data(), - ) { - // Validation failed, ignore this value - // Move to the next node - return Ok(NetworkResult::invalid_message(format!( - "Schema validation failed on subkey {}", - subkey - ))); - } - - // If we have a prior value, see if this is a newer sequence number - if let Some(prior_value) = &ctx.value { - let prior_seq = prior_value.value_data().seq(); - let new_seq = value.value_data().seq(); - - if new_seq == prior_seq { - // If sequence number is the same, the data should be the same - if prior_value.value_data() != value.value_data() { - // Move to the next node - return Ok(NetworkResult::invalid_message("value data mismatch")); + if new_seq == prior_seq { + // If sequence number is the same, the data should be the same + if prior_value.value_data() != value.value_data() { + // Move to the next node + return Ok(NetworkResult::invalid_message( + "value data mismatch", + )); + } + // Increase the consensus count for the existing value + ctx.value_nodes.push(next_node); + } else if new_seq > prior_seq { + // If the sequence number is greater, start over with the new value + ctx.value = Some(Arc::new(value)); + // One node has shown us this value so far + ctx.value_nodes = vec![next_node]; + // Send an update since the value changed + ctx.send_partial_update = true; + } else { + // If the sequence number is older, ignore it } - // Increase the consensus count for the existing value - ctx.value_nodes.push(next_node); - } else if new_seq > prior_seq { - // If the sequence number is greater, start over with the new value + } else { + // If we have no prior value, keep it ctx.value = Some(Arc::new(value)); // One node has shown us this value so far ctx.value_nodes = vec![next_node]; - } else { - // If the sequence number is older, ignore it + // Send an update since the value changed + ctx.send_partial_update = true; } - } else { - // If we have no prior value, keep it - ctx.value = Some(Arc::new(value)); - // One node has shown us this value so far - ctx.value_nodes = vec![next_node]; } + + // Return peers if we have some + log_network_result!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len()); + + Ok(NetworkResult::value(gva.answer.peers)) } - - // Return peers if we have some - log_network_result!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len()); - - Ok(NetworkResult::value(gva.answer.peers)) } }; // Routine to call to check if we're done at each step - let check_done = |_closest_nodes: &[NodeRef]| { - // If we have reached sufficient consensus, return done - let ctx = context.lock(); - if ctx.value.is_some() - && ctx.descriptor.is_some() - && ctx.value_nodes.len() >= consensus_count - { - return Some(()); + let check_done = { + let context = context.clone(); + let out_tx = out_tx.clone(); + move |_closest_nodes: &[NodeRef]| { + let mut ctx = context.lock(); + + // send partial update if desired + if ctx.send_partial_update { + ctx.send_partial_update=false; + + // return partial result + let fanout_result = FanoutResult { + kind: FanoutResultKind::Partial, + value_nodes: ctx.value_nodes.clone(), + }; + if let Err(e) = out_tx.send(Ok(OutboundGetValueResult { + fanout_result, + get_result: GetResult { + opt_value: ctx.value.clone(), + opt_descriptor: ctx.descriptor.clone(), + }, + })) { + log_dht!(debug "Sending partial GetValue result failed: {}", e); + } + } + + // If we have reached sufficient consensus, return done + if ctx.value.is_some() + && ctx.descriptor.is_some() + && ctx.value_nodes.len() >= consensus_count + { + return Some(()); + } + None } - None }; - // Call the fanout - let fanout_call = FanoutCall::new( - routing_table.clone(), + // Call the fanout in a spawned task + spawn(Box::pin(async move { + let fanout_call = FanoutCall::new( + routing_table.clone(), + key, + key_count, + fanout, + timeout_us, + capability_fanout_node_info_filter(vec![CAP_DHT]), + call_routine, + check_done, + ); + + let kind = match fanout_call.run(init_fanout_queue).await { + // If we don't finish in the timeout (too much time passed checking for consensus) + TimeoutOr::Timeout => FanoutResultKind::Timeout, + // If we finished with or without consensus (enough nodes returning the same value) + TimeoutOr::Value(Ok(Some(()))) => FanoutResultKind::Finished, + // If we ran out of nodes before getting consensus) + TimeoutOr::Value(Ok(None)) => FanoutResultKind::Exhausted, + // Failed + TimeoutOr::Value(Err(e)) => { + // If we finished with an error, return that + log_dht!(debug "GetValue fanout error: {}", e); + if let Err(e) = out_tx.send(Err(e.into())) { + log_dht!(debug "Sending GetValue fanout error failed: {}", e); + } + return; + } + }; + + let ctx = context.lock(); + let fanout_result = FanoutResult { + kind, + value_nodes: ctx.value_nodes.clone(), + }; + log_network_result!(debug "GetValue Fanout: {:?}", fanout_result); + + if let Err(e) = out_tx.send(Ok(OutboundGetValueResult { + fanout_result, + get_result: GetResult { + opt_value: ctx.value.clone(), + opt_descriptor: ctx.descriptor.clone(), + }, + })) { + log_dht!(debug "Sending GetValue result failed: {}", e); + } + })) + .detach(); + + Ok(out_rx) + } + + pub(super) fn process_deferred_outbound_get_value_result_inner(&self, inner: &mut StorageManagerInner, res_rx: flume::Receiver>, key: TypedKey, subkey: ValueSubkey, last_seq: ValueSeqNum) { + let this = self.clone(); + inner.process_deferred_results( + res_rx, + Box::new( + move |result: VeilidAPIResult| -> SendPinBoxFuture { + let this = this.clone(); + Box::pin(async move { + let result = match result { + Ok(v) => v, + Err(e) => { + log_rtab!(debug "Deferred fanout error: {}", e); + return false; + } + }; + let is_partial = result.fanout_result.kind.is_partial(); + let value_data = match this.process_outbound_get_value_result(key, subkey, Some(last_seq), result).await { + Ok(Some(v)) => v, + Ok(None) => { + return is_partial; + } + Err(e) => { + log_rtab!(debug "Deferred fanout error: {}", e); + return false; + } + }; + if is_partial { + // If more partial results show up, don't send an update until we're done + return true; + } + // If we processed the final result, possibly send an update + // if the sequence number changed since our first partial update + // Send with a max count as this is not attached to any watch + if last_seq != value_data.seq() { + if let Err(e) = this.update_callback_value_change(key,ValueSubkeyRangeSet::single(subkey), u32::MAX, Some(value_data)).await { + log_rtab!(debug "Failed sending deferred fanout value change: {}", e); + } + } + + // Return done + false + }) + }, + ), + ); + } + + pub(super) async fn process_outbound_get_value_result(&self, key: TypedKey, subkey: ValueSubkey, opt_last_seq: Option, result: get_value::OutboundGetValueResult) -> Result, VeilidAPIError> { + // See if we got a value back + let Some(get_result_value) = result.get_result.opt_value else { + // If we got nothing back then we also had nothing beforehand, return nothing + return Ok(None); + }; + + // Keep the list of nodes that returned a value for later reference + let mut inner = self.lock().await?; + + inner.process_fanout_results( key, - key_count, - fanout, - timeout_us, - capability_fanout_node_info_filter(vec![CAP_DHT]), - call_routine, - check_done, + core::iter::once((subkey, &result.fanout_result)), + false, ); - let kind = match fanout_call.run(init_fanout_queue).await { - // If we don't finish in the timeout (too much time passed checking for consensus) - TimeoutOr::Timeout => FanoutResultKind::Timeout, - // If we finished with or without consensus (enough nodes returning the same value) - TimeoutOr::Value(Ok(Some(()))) => FanoutResultKind::Finished, - // If we ran out of nodes before getting consensus) - TimeoutOr::Value(Ok(None)) => FanoutResultKind::Exhausted, - // Failed - TimeoutOr::Value(Err(e)) => { - // If we finished with an error, return that - log_dht!(debug "GetValue Fanout Error: {}", e); - return Err(e.into()); - } - }; - - let ctx = context.lock(); - let fanout_result = FanoutResult { - kind, - value_nodes: ctx.value_nodes.clone(), - }; - log_network_result!(debug "GetValue Fanout: {:?}", fanout_result); - - Ok(OutboundGetValueResult { - fanout_result, - get_result: GetResult { - opt_value: ctx.value.clone(), - opt_descriptor: ctx.descriptor.clone(), - }, - }) + // If we got a new value back then write it to the opened record + if Some(get_result_value.value_data().seq()) != opt_last_seq { + inner + .handle_set_local_value( + key, + subkey, + get_result_value.clone(), + WatchUpdateMode::UpdateAll, + ) + .await?; + } + Ok(Some(get_result_value.value_data().clone())) } /// Handle a received 'Get Value' query diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 7aa52ad9..b8088ba8 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -264,7 +264,7 @@ impl StorageManager { // No last descriptor, no last value // Use the safety selection we opened the record with let subkey: ValueSubkey = 0; - let result = self + let res_rx = self .outbound_get_value( rpc_processor, key, @@ -273,12 +273,24 @@ impl StorageManager { GetResult::default(), ) .await?; + // Wait for the first result + let Ok(result) = res_rx.recv_async().await else { + apibail_internal!("failed to receive results"); + }; + let result = result?; // If we got nothing back, the key wasn't found if result.get_result.opt_value.is_none() && result.get_result.opt_descriptor.is_none() { // No result apibail_key_not_found!(key); }; + let last_seq = result + .get_result + .opt_value + .as_ref() + .unwrap() + .value_data() + .seq(); // Reopen inner to store value we just got let mut inner = self.lock().await?; @@ -295,9 +307,16 @@ impl StorageManager { } // Open the new record - inner + let out = inner .open_new_record(key, writer, subkey, result.get_result, safety_selection) - .await + .await; + + if out.is_ok() { + self.process_deferred_outbound_get_value_result_inner( + &mut inner, res_rx, key, subkey, last_seq, + ); + } + out } /// Close an opened local record @@ -402,7 +421,7 @@ impl StorageManager { .opt_value .as_ref() .map(|v| v.value_data().seq()); - let result = self + let res_rx = self .outbound_get_value( rpc_processor, key, @@ -412,32 +431,33 @@ impl StorageManager { ) .await?; - // See if we got a value back - let Some(get_result_value) = result.get_result.opt_value else { - // If we got nothing back then we also had nothing beforehand, return nothing - return Ok(None); + // Wait for the first result + let Ok(result) = res_rx.recv_async().await else { + apibail_internal!("failed to receive results"); }; + let result = result?; + let partial = result.fanout_result.kind.is_partial(); - // Keep the list of nodes that returned a value for later reference - let mut inner = self.lock().await?; - inner.process_fanout_results( - key, - core::iter::once((subkey, &result.fanout_result)), - false, - ); + // Process the returned result + let out = self + .process_outbound_get_value_result(key, subkey, opt_last_seq, result) + .await?; - // If we got a new value back then write it to the opened record - if Some(get_result_value.value_data().seq()) != opt_last_seq { - inner - .handle_set_local_value( + if let Some(out) = &out { + // If there's more to process, do it in the background + if partial { + let mut inner = self.lock().await?; + self.process_deferred_outbound_get_value_result_inner( + &mut inner, + res_rx, key, subkey, - get_result_value.clone(), - WatchUpdateMode::UpdateAll, - ) - .await?; + out.seq(), + ); + } } - Ok(Some(get_result_value.value_data().clone())) + + Ok(out) } /// Set the value of a subkey on an opened local record @@ -920,6 +940,31 @@ impl StorageManager { Ok(()) } + // Send a value change up through the callback + #[instrument(level = "trace", skip(self), err)] + async fn update_callback_value_change( + &self, + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + count: u32, + value: Option, + ) -> Result<(), VeilidAPIError> { + let opt_update_callback = { + let inner = self.lock().await?; + inner.update_callback.clone() + }; + + if let Some(update_callback) = opt_update_callback { + update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { + key, + subkeys, + count, + value, + }))); + } + Ok(()) + } + fn check_fanout_set_offline( &self, key: TypedKey, @@ -927,6 +972,7 @@ impl StorageManager { fanout_result: &FanoutResult, ) -> bool { match fanout_result.kind { + FanoutResultKind::Partial => false, FanoutResultKind::Timeout => { log_stor!(debug "timeout in set_value, adding offline subkey: {}:{}", key, subkey); true diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs index 57ffdb1a..b43c78a1 100644 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -32,6 +32,8 @@ pub(super) struct StorageManagerInner { pub tick_future: Option>, /// Update callback to send ValueChanged notification to pub update_callback: Option, + /// Deferred result processor + pub deferred_result_processor: DeferredStreamProcessor, /// The maximum consensus count set_consensus_count: usize, @@ -88,6 +90,7 @@ impl StorageManagerInner { opt_routing_table: Default::default(), tick_future: Default::default(), update_callback: None, + deferred_result_processor: DeferredStreamProcessor::default(), set_consensus_count, } } @@ -126,6 +129,9 @@ impl StorageManagerInner { self.load_metadata().await?; + // Start deferred results processors + self.deferred_result_processor.init().await; + // Schedule tick let tick_future = interval(1000, move || { let this = outer_self.clone(); @@ -151,6 +157,9 @@ impl StorageManagerInner { f.await; } + // Stop deferred result processor + self.deferred_result_processor.terminate().await; + // Final flush on record stores if let Some(mut local_record_store) = self.local_record_store.take() { if let Err(e) = local_record_store.flush().await { @@ -708,4 +717,12 @@ impl StorageManagerInner { subkeys: ValueSubkeyRangeSet::single(subkey), }); } + + pub fn process_deferred_results( + &mut self, + receiver: flume::Receiver, + handler: impl FnMut(T) -> SendPinBoxFuture + Send + 'static, + ) -> bool { + self.deferred_result_processor.add(receiver, handler) + } } diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index c8b3a77e..2de238e2 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -417,7 +417,7 @@ impl StorageManager { watch_id: u64, ) -> VeilidAPIResult> { // Update local record store with new value - let (is_value_seq_newer, opt_update_callback, value) = { + let (is_value_seq_newer, value) = { let mut inner = self.lock().await?; // Don't process update if the record is closed @@ -516,7 +516,7 @@ impl StorageManager { } } - (is_value_seq_newer, inner.update_callback.clone(), value) + (is_value_seq_newer, value) }; // Announce ValueChanged VeilidUpdate @@ -526,18 +526,13 @@ impl StorageManager { let do_update = is_value_seq_newer || subkeys.len() > 1 || count == 0; if do_update { - if let Some(update_callback) = opt_update_callback { - update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { - key, - subkeys, - count, - value: if is_value_seq_newer { - Some(value.unwrap().value_data().clone()) - } else { - None - }, - }))); - } + let value = if is_value_seq_newer { + Some(value.unwrap().value_data().clone()) + } else { + None + }; + self.update_callback_value_change(key, subkeys, count, value) + .await?; } Ok(NetworkResult::value(())) diff --git a/veilid-core/src/veilid_api/types/veilid_state.rs b/veilid-core/src/veilid_api/types/veilid_state.rs index 41c3bc04..c751d096 100644 --- a/veilid-core/src/veilid_api/types/veilid_state.rs +++ b/veilid-core/src/veilid_api/types/veilid_state.rs @@ -52,58 +52,93 @@ impl TryFrom for AttachmentState { } } +/// Describe the attachment state of the Veilid node #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] #[cfg_attr(target_arch = "wasm32", derive(Tsify))] pub struct VeilidStateAttachment { + /// The overall quality of the routing table if attached, or the current state the attachment state machine. pub state: AttachmentState, + /// If attached and there are enough eachable nodes in the routing table to perform all the actions of the PublicInternet RoutingDomain, + /// including things like private/safety route allocation and DHT operations. pub public_internet_ready: bool, + /// If attached and there are enough eachable nodes in the routing table to perform all the actions of the LocalNetwork RoutingDomain. pub local_network_ready: bool, } +/// Describe a recently accessed peer #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] #[cfg_attr(target_arch = "wasm32", derive(Tsify))] pub struct PeerTableData { + /// The node ids used by this peer #[schemars(with = "Vec")] #[cfg_attr(target_arch = "wasm32", tsify(type = "string[]"))] pub node_ids: Vec, + /// The peer's human readable address. pub peer_address: String, + /// Statistics we have collected on this peer. pub peer_stats: PeerStats, } +/// Describe the current network state of the Veilid node #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] #[cfg_attr(target_arch = "wasm32", derive(Tsify))] pub struct VeilidStateNetwork { + /// If the network has been started or not. pub started: bool, + /// The total number of bytes per second used by Veilid currently in the download direction. pub bps_down: ByteCount, + /// The total number of bytes per second used by Veilid currently in the upload direction. pub bps_up: ByteCount, + /// The list of most recently accessed peers. + /// This is not an active connection table, nor is representative of the entire routing table. pub peers: Vec, } +/// Describe a private route change that has happened #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] #[cfg_attr(target_arch = "wasm32", derive(Tsify))] pub struct VeilidRouteChange { + /// If a private route that was allocated has died, it is listed here. #[schemars(with = "Vec")] pub dead_routes: Vec, + /// If a private route that was imported has died, it is listed here. #[schemars(with = "Vec")] pub dead_remote_routes: Vec, } +/// Describe changes to the Veilid node configuration +/// Currently this is only ever emitted once, however we reserve the right to +/// add the ability to change the configuration or have it changed by the Veilid node +/// itself during runtime. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] #[cfg_attr(target_arch = "wasm32", derive(Tsify))] pub struct VeilidStateConfig { + /// If the Veilid node configuration has changed the full new config will be here. pub config: VeilidConfigInner, } +/// Describe when DHT records have subkey values changed #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] #[cfg_attr(target_arch = "wasm32", derive(Tsify))] pub struct VeilidValueChange { + /// The DHT Record key that changed #[schemars(with = "String")] pub key: TypedKey, + /// The portion of the DHT Record's subkeys that have changed + /// If the subkey range is empty, any watch present on the value has died. pub subkeys: ValueSubkeyRangeSet, + /// The count remaining on the watch that triggered this value change + /// If there is no watch and this is received, it will be set to u32::MAX + /// If this value is zero, any watch present on the value has died. pub count: u32, + /// The (optional) value data for the first subkey in the subkeys range + /// If 'subkeys' is not a single value, other values than the first value + /// must be retrieved with RoutingContext::get_dht_value(). pub value: Option, } +/// An update from the veilid-core to the host application describing a change +/// to the internal state of the Veilid node. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] #[cfg_attr(target_arch = "wasm32", derive(Tsify), tsify(into_wasm_abi))] #[serde(tag = "kind")] @@ -120,6 +155,7 @@ pub enum VeilidUpdate { } from_impl_to_jsvalue!(VeilidUpdate); +/// A queriable state of the internals of veilid-core. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] #[cfg_attr(target_arch = "wasm32", derive(Tsify), tsify(into_wasm_abi))] pub struct VeilidState { diff --git a/veilid-tools/src/deferred_stream_processor.rs b/veilid-tools/src/deferred_stream_processor.rs new file mode 100644 index 00000000..de9577f7 --- /dev/null +++ b/veilid-tools/src/deferred_stream_processor.rs @@ -0,0 +1,125 @@ +use futures_util::{ + future::{select, Either}, + stream::FuturesUnordered, + StreamExt, +}; +use stop_token::future::FutureExt as _; + +use super::*; + +/// Background processor for streams +/// Handles streams to completion, passing each item from the stream to a callback +pub struct DeferredStreamProcessor { + pub opt_deferred_stream_channel: Option>>, + pub opt_stopper: Option, + pub opt_join_handle: Option>, +} + +impl DeferredStreamProcessor { + /// Create a new DeferredStreamProcessor + pub fn new() -> Self { + Self { + opt_deferred_stream_channel: None, + opt_stopper: None, + opt_join_handle: None, + } + } + + /// Initialize the processor before use + pub async fn init(&mut self) { + let stopper = StopSource::new(); + let stop_token = stopper.token(); + self.opt_stopper = Some(stopper); + let (dsc_tx, dsc_rx) = flume::unbounded::>(); + self.opt_deferred_stream_channel = Some(dsc_tx); + self.opt_join_handle = Some(spawn(Self::processor(stop_token, dsc_rx))); + } + + /// Terminate the processor and ensure all streams are closed + pub async fn terminate(&mut self) { + drop(self.opt_deferred_stream_channel.take()); + drop(self.opt_stopper.take()); + if let Some(jh) = self.opt_join_handle.take() { + jh.await; + } + } + + async fn processor(stop_token: StopToken, dsc_rx: flume::Receiver>) { + let mut unord = FuturesUnordered::>::new(); + + // Ensure the unord never finishes + unord.push(Box::pin(std::future::pending())); + + // Processor loop + let mut unord_fut = unord.next(); + let mut dsc_fut = dsc_rx.recv_async(); + while let Ok(res) = select(unord_fut, dsc_fut) + .timeout_at(stop_token.clone()) + .await + { + match res { + Either::Left((x, old_dsc_fut)) => { + // Unord future processor should never get empty + assert!(x.is_some()); + + // Make another unord future to process + unord_fut = unord.next(); + // put back the other future and keep going + dsc_fut = old_dsc_fut; + } + Either::Right((new_proc, old_unord_fut)) => { + // Immediately drop the old unord future + // because we never care about it completing + drop(old_unord_fut); + let Ok(new_proc) = new_proc else { + break; + }; + + // Add a new stream to process + unord.push(new_proc); + + // Make a new unord future because we don't care about the + // completion of the last unord future, they never return + // anything. + unord_fut = unord.next(); + // Make a new receiver future + dsc_fut = dsc_rx.recv_async(); + } + } + } + } + + /// Queue a stream to process in the background + /// * 'receiver' is the stream to process + /// * 'handler' is the callback to handle each item from the stream + /// Returns 'true' if the stream was added for processing, and 'false' if the stream could not be added, possibly due to not being initialized + pub fn add( + &mut self, + receiver: flume::Receiver, + mut handler: impl FnMut(T) -> SendPinBoxFuture + Send + 'static, + ) -> bool { + let Some(st) = self.opt_stopper.as_ref().map(|s| s.token()) else { + return false; + }; + let Some(dsc_tx) = self.opt_deferred_stream_channel.clone() else { + return false; + }; + let drp = Box::pin(async move { + while let Ok(Ok(res)) = receiver.recv_async().timeout_at(st.clone()).await { + if !handler(res).await { + break; + } + } + }); + if dsc_tx.send(drp).is_err() { + return false; + } + true + } +} + +impl Default for DeferredStreamProcessor { + fn default() -> Self { + Self::new() + } +} diff --git a/veilid-tools/src/lib.rs b/veilid-tools/src/lib.rs index 57845fae..27c3969b 100644 --- a/veilid-tools/src/lib.rs +++ b/veilid-tools/src/lib.rs @@ -29,6 +29,7 @@ pub mod assembly_buffer; pub mod async_peek_stream; pub mod async_tag_lock; pub mod clone_stream; +pub mod deferred_stream_processor; pub mod eventual; pub mod eventual_base; pub mod eventual_value; @@ -162,6 +163,8 @@ pub use async_tag_lock::*; #[doc(inline)] pub use clone_stream::*; #[doc(inline)] +pub use deferred_stream_processor::*; +#[doc(inline)] pub use eventual::*; #[doc(inline)] pub use eventual_base::{EventualCommon, EventualResolvedFuture};