initial version of preempt for dht get_value

This commit is contained in:
Christien Rioux 2024-05-17 18:23:34 -04:00
parent 2bfb6c635c
commit c272c768fc
8 changed files with 529 additions and 168 deletions

View File

@ -10,10 +10,16 @@ where
#[derive(Debug, Copy, Clone)]
pub(crate) enum FanoutResultKind {
Partial,
Timeout,
Finished,
Exhausted,
}
impl FanoutResultKind {
pub fn is_partial(&self) -> bool {
matches!(self, Self::Partial)
}
}
#[derive(Debug, Clone)]
pub(crate) struct FanoutResult {
@ -23,6 +29,7 @@ pub(crate) struct FanoutResult {
pub(crate) fn debug_fanout_result(result: &FanoutResult) -> String {
let kc = match result.kind {
FanoutResultKind::Partial => "P",
FanoutResultKind::Timeout => "T",
FanoutResultKind::Finished => "F",
FanoutResultKind::Exhausted => "E",

View File

@ -10,6 +10,8 @@ struct OutboundGetValueContext {
pub descriptor: Option<Arc<SignedValueDescriptor>>,
/// The parsed schema from the descriptor if we have one
pub schema: Option<DHTSchema>,
/// If we should send a partial update with the current contetx
pub send_partial_update: bool,
}
/// The result of the outbound_get_value operation
@ -29,7 +31,7 @@ impl StorageManager {
subkey: ValueSubkey,
safety_selection: SafetySelection,
last_get_result: GetResult,
) -> VeilidAPIResult<OutboundGetValueResult> {
) -> VeilidAPIResult<flume::Receiver<VeilidAPIResult<OutboundGetValueResult>>> {
let routing_table = rpc_processor.routing_table();
// Get the DHT parameters for 'GetValue'
@ -49,30 +51,40 @@ impl StorageManager {
inner.get_value_nodes(key)?.unwrap_or_default()
};
// Make do-get-value answer context
// Parse the schema
let schema = if let Some(d) = &last_get_result.opt_descriptor {
Some(d.schema()?)
} else {
None
};
// Make the return channel
let (out_tx, out_rx) = flume::unbounded::<VeilidAPIResult<OutboundGetValueResult>>();
// Make do-get-value answer context
let context = Arc::new(Mutex::new(OutboundGetValueContext {
value: last_get_result.opt_value,
value_nodes: vec![],
descriptor: last_get_result.opt_descriptor.clone(),
schema,
send_partial_update: false,
}));
// Routine to call to generate fanout
let call_routine = |next_node: NodeRef| {
let rpc_processor = rpc_processor.clone();
let call_routine = {
let context = context.clone();
let rpc_processor = rpc_processor.clone();
move |next_node: NodeRef| {
let context = context.clone();
let rpc_processor = rpc_processor.clone();
let last_descriptor = last_get_result.opt_descriptor.clone();
async move {
let gva = network_result_try!(
rpc_processor
.clone()
.rpc_call_get_value(
Destination::direct(next_node.clone()).with_safety(safety_selection),
Destination::direct(next_node.clone())
.with_safety(safety_selection),
key,
subkey,
last_descriptor.map(|x| (*x).clone()),
@ -102,7 +114,8 @@ impl StorageManager {
let mut ctx = context.lock();
// Ensure we have a schema and descriptor
let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else {
let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema)
else {
// Got a value but no descriptor for it
// Move to the next node
return Ok(NetworkResult::invalid_message(
@ -133,7 +146,9 @@ impl StorageManager {
// If sequence number is the same, the data should be the same
if prior_value.value_data() != value.value_data() {
// Move to the next node
return Ok(NetworkResult::invalid_message("value data mismatch"));
return Ok(NetworkResult::invalid_message(
"value data mismatch",
));
}
// Increase the consensus count for the existing value
ctx.value_nodes.push(next_node);
@ -142,6 +157,8 @@ impl StorageManager {
ctx.value = Some(Arc::new(value));
// One node has shown us this value so far
ctx.value_nodes = vec![next_node];
// Send an update since the value changed
ctx.send_partial_update = true;
} else {
// If the sequence number is older, ignore it
}
@ -150,6 +167,8 @@ impl StorageManager {
ctx.value = Some(Arc::new(value));
// One node has shown us this value so far
ctx.value_nodes = vec![next_node];
// Send an update since the value changed
ctx.send_partial_update = true;
}
}
@ -158,12 +177,37 @@ impl StorageManager {
Ok(NetworkResult::value(gva.answer.peers))
}
}
};
// Routine to call to check if we're done at each step
let check_done = |_closest_nodes: &[NodeRef]| {
let check_done = {
let context = context.clone();
let out_tx = out_tx.clone();
move |_closest_nodes: &[NodeRef]| {
let mut ctx = context.lock();
// send partial update if desired
if ctx.send_partial_update {
ctx.send_partial_update=false;
// return partial result
let fanout_result = FanoutResult {
kind: FanoutResultKind::Partial,
value_nodes: ctx.value_nodes.clone(),
};
if let Err(e) = out_tx.send(Ok(OutboundGetValueResult {
fanout_result,
get_result: GetResult {
opt_value: ctx.value.clone(),
opt_descriptor: ctx.descriptor.clone(),
},
})) {
log_dht!(debug "Sending partial GetValue result failed: {}", e);
}
}
// If we have reached sufficient consensus, return done
let ctx = context.lock();
if ctx.value.is_some()
&& ctx.descriptor.is_some()
&& ctx.value_nodes.len() >= consensus_count
@ -171,9 +215,11 @@ impl StorageManager {
return Some(());
}
None
}
};
// Call the fanout
// Call the fanout in a spawned task
spawn(Box::pin(async move {
let fanout_call = FanoutCall::new(
routing_table.clone(),
key,
@ -195,8 +241,11 @@ impl StorageManager {
// Failed
TimeoutOr::Value(Err(e)) => {
// If we finished with an error, return that
log_dht!(debug "GetValue Fanout Error: {}", e);
return Err(e.into());
log_dht!(debug "GetValue fanout error: {}", e);
if let Err(e) = out_tx.send(Err(e.into())) {
log_dht!(debug "Sending GetValue fanout error failed: {}", e);
}
return;
}
};
@ -207,13 +256,96 @@ impl StorageManager {
};
log_network_result!(debug "GetValue Fanout: {:?}", fanout_result);
Ok(OutboundGetValueResult {
if let Err(e) = out_tx.send(Ok(OutboundGetValueResult {
fanout_result,
get_result: GetResult {
opt_value: ctx.value.clone(),
opt_descriptor: ctx.descriptor.clone(),
},
})) {
log_dht!(debug "Sending GetValue result failed: {}", e);
}
}))
.detach();
Ok(out_rx)
}
pub(super) fn process_deferred_outbound_get_value_result_inner(&self, inner: &mut StorageManagerInner, res_rx: flume::Receiver<Result<get_value::OutboundGetValueResult, VeilidAPIError>>, key: TypedKey, subkey: ValueSubkey, last_seq: ValueSeqNum) {
let this = self.clone();
inner.process_deferred_results(
res_rx,
Box::new(
move |result: VeilidAPIResult<get_value::OutboundGetValueResult>| -> SendPinBoxFuture<bool> {
let this = this.clone();
Box::pin(async move {
let result = match result {
Ok(v) => v,
Err(e) => {
log_rtab!(debug "Deferred fanout error: {}", e);
return false;
}
};
let is_partial = result.fanout_result.kind.is_partial();
let value_data = match this.process_outbound_get_value_result(key, subkey, Some(last_seq), result).await {
Ok(Some(v)) => v,
Ok(None) => {
return is_partial;
}
Err(e) => {
log_rtab!(debug "Deferred fanout error: {}", e);
return false;
}
};
if is_partial {
// If more partial results show up, don't send an update until we're done
return true;
}
// If we processed the final result, possibly send an update
// if the sequence number changed since our first partial update
// Send with a max count as this is not attached to any watch
if last_seq != value_data.seq() {
if let Err(e) = this.update_callback_value_change(key,ValueSubkeyRangeSet::single(subkey), u32::MAX, Some(value_data)).await {
log_rtab!(debug "Failed sending deferred fanout value change: {}", e);
}
}
// Return done
false
})
},
),
);
}
pub(super) async fn process_outbound_get_value_result(&self, key: TypedKey, subkey: ValueSubkey, opt_last_seq: Option<u32>, result: get_value::OutboundGetValueResult) -> Result<Option<ValueData>, VeilidAPIError> {
// See if we got a value back
let Some(get_result_value) = result.get_result.opt_value else {
// If we got nothing back then we also had nothing beforehand, return nothing
return Ok(None);
};
// Keep the list of nodes that returned a value for later reference
let mut inner = self.lock().await?;
inner.process_fanout_results(
key,
core::iter::once((subkey, &result.fanout_result)),
false,
);
// If we got a new value back then write it to the opened record
if Some(get_result_value.value_data().seq()) != opt_last_seq {
inner
.handle_set_local_value(
key,
subkey,
get_result_value.clone(),
WatchUpdateMode::UpdateAll,
)
.await?;
}
Ok(Some(get_result_value.value_data().clone()))
}
/// Handle a received 'Get Value' query

View File

@ -264,7 +264,7 @@ impl StorageManager {
// No last descriptor, no last value
// Use the safety selection we opened the record with
let subkey: ValueSubkey = 0;
let result = self
let res_rx = self
.outbound_get_value(
rpc_processor,
key,
@ -273,12 +273,24 @@ impl StorageManager {
GetResult::default(),
)
.await?;
// Wait for the first result
let Ok(result) = res_rx.recv_async().await else {
apibail_internal!("failed to receive results");
};
let result = result?;
// If we got nothing back, the key wasn't found
if result.get_result.opt_value.is_none() && result.get_result.opt_descriptor.is_none() {
// No result
apibail_key_not_found!(key);
};
let last_seq = result
.get_result
.opt_value
.as_ref()
.unwrap()
.value_data()
.seq();
// Reopen inner to store value we just got
let mut inner = self.lock().await?;
@ -295,9 +307,16 @@ impl StorageManager {
}
// Open the new record
inner
let out = inner
.open_new_record(key, writer, subkey, result.get_result, safety_selection)
.await
.await;
if out.is_ok() {
self.process_deferred_outbound_get_value_result_inner(
&mut inner, res_rx, key, subkey, last_seq,
);
}
out
}
/// Close an opened local record
@ -402,7 +421,7 @@ impl StorageManager {
.opt_value
.as_ref()
.map(|v| v.value_data().seq());
let result = self
let res_rx = self
.outbound_get_value(
rpc_processor,
key,
@ -412,32 +431,33 @@ impl StorageManager {
)
.await?;
// See if we got a value back
let Some(get_result_value) = result.get_result.opt_value else {
// If we got nothing back then we also had nothing beforehand, return nothing
return Ok(None);
// Wait for the first result
let Ok(result) = res_rx.recv_async().await else {
apibail_internal!("failed to receive results");
};
let result = result?;
let partial = result.fanout_result.kind.is_partial();
// Keep the list of nodes that returned a value for later reference
// Process the returned result
let out = self
.process_outbound_get_value_result(key, subkey, opt_last_seq, result)
.await?;
if let Some(out) = &out {
// If there's more to process, do it in the background
if partial {
let mut inner = self.lock().await?;
inner.process_fanout_results(
key,
core::iter::once((subkey, &result.fanout_result)),
false,
);
// If we got a new value back then write it to the opened record
if Some(get_result_value.value_data().seq()) != opt_last_seq {
inner
.handle_set_local_value(
self.process_deferred_outbound_get_value_result_inner(
&mut inner,
res_rx,
key,
subkey,
get_result_value.clone(),
WatchUpdateMode::UpdateAll,
)
.await?;
out.seq(),
);
}
Ok(Some(get_result_value.value_data().clone()))
}
Ok(out)
}
/// Set the value of a subkey on an opened local record
@ -920,6 +940,31 @@ impl StorageManager {
Ok(())
}
// Send a value change up through the callback
#[instrument(level = "trace", skip(self), err)]
async fn update_callback_value_change(
&self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
count: u32,
value: Option<ValueData>,
) -> Result<(), VeilidAPIError> {
let opt_update_callback = {
let inner = self.lock().await?;
inner.update_callback.clone()
};
if let Some(update_callback) = opt_update_callback {
update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
key,
subkeys,
count,
value,
})));
}
Ok(())
}
fn check_fanout_set_offline(
&self,
key: TypedKey,
@ -927,6 +972,7 @@ impl StorageManager {
fanout_result: &FanoutResult,
) -> bool {
match fanout_result.kind {
FanoutResultKind::Partial => false,
FanoutResultKind::Timeout => {
log_stor!(debug "timeout in set_value, adding offline subkey: {}:{}", key, subkey);
true

View File

@ -32,6 +32,8 @@ pub(super) struct StorageManagerInner {
pub tick_future: Option<SendPinBoxFuture<()>>,
/// Update callback to send ValueChanged notification to
pub update_callback: Option<UpdateCallback>,
/// Deferred result processor
pub deferred_result_processor: DeferredStreamProcessor,
/// The maximum consensus count
set_consensus_count: usize,
@ -88,6 +90,7 @@ impl StorageManagerInner {
opt_routing_table: Default::default(),
tick_future: Default::default(),
update_callback: None,
deferred_result_processor: DeferredStreamProcessor::default(),
set_consensus_count,
}
}
@ -126,6 +129,9 @@ impl StorageManagerInner {
self.load_metadata().await?;
// Start deferred results processors
self.deferred_result_processor.init().await;
// Schedule tick
let tick_future = interval(1000, move || {
let this = outer_self.clone();
@ -151,6 +157,9 @@ impl StorageManagerInner {
f.await;
}
// Stop deferred result processor
self.deferred_result_processor.terminate().await;
// Final flush on record stores
if let Some(mut local_record_store) = self.local_record_store.take() {
if let Err(e) = local_record_store.flush().await {
@ -708,4 +717,12 @@ impl StorageManagerInner {
subkeys: ValueSubkeyRangeSet::single(subkey),
});
}
pub fn process_deferred_results<T: Send + 'static>(
&mut self,
receiver: flume::Receiver<T>,
handler: impl FnMut(T) -> SendPinBoxFuture<bool> + Send + 'static,
) -> bool {
self.deferred_result_processor.add(receiver, handler)
}
}

View File

@ -417,7 +417,7 @@ impl StorageManager {
watch_id: u64,
) -> VeilidAPIResult<NetworkResult<()>> {
// Update local record store with new value
let (is_value_seq_newer, opt_update_callback, value) = {
let (is_value_seq_newer, value) = {
let mut inner = self.lock().await?;
// Don't process update if the record is closed
@ -516,7 +516,7 @@ impl StorageManager {
}
}
(is_value_seq_newer, inner.update_callback.clone(), value)
(is_value_seq_newer, value)
};
// Announce ValueChanged VeilidUpdate
@ -526,18 +526,13 @@ impl StorageManager {
let do_update = is_value_seq_newer || subkeys.len() > 1 || count == 0;
if do_update {
if let Some(update_callback) = opt_update_callback {
update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
key,
subkeys,
count,
value: if is_value_seq_newer {
let value = if is_value_seq_newer {
Some(value.unwrap().value_data().clone())
} else {
None
},
})));
}
};
self.update_callback_value_change(key, subkeys, count, value)
.await?;
}
Ok(NetworkResult::value(()))

View File

@ -52,58 +52,93 @@ impl TryFrom<String> for AttachmentState {
}
}
/// Describe the attachment state of the Veilid node
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
pub struct VeilidStateAttachment {
/// The overall quality of the routing table if attached, or the current state the attachment state machine.
pub state: AttachmentState,
/// If attached and there are enough eachable nodes in the routing table to perform all the actions of the PublicInternet RoutingDomain,
/// including things like private/safety route allocation and DHT operations.
pub public_internet_ready: bool,
/// If attached and there are enough eachable nodes in the routing table to perform all the actions of the LocalNetwork RoutingDomain.
pub local_network_ready: bool,
}
/// Describe a recently accessed peer
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
pub struct PeerTableData {
/// The node ids used by this peer
#[schemars(with = "Vec<String>")]
#[cfg_attr(target_arch = "wasm32", tsify(type = "string[]"))]
pub node_ids: Vec<TypedKey>,
/// The peer's human readable address.
pub peer_address: String,
/// Statistics we have collected on this peer.
pub peer_stats: PeerStats,
}
/// Describe the current network state of the Veilid node
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
pub struct VeilidStateNetwork {
/// If the network has been started or not.
pub started: bool,
/// The total number of bytes per second used by Veilid currently in the download direction.
pub bps_down: ByteCount,
/// The total number of bytes per second used by Veilid currently in the upload direction.
pub bps_up: ByteCount,
/// The list of most recently accessed peers.
/// This is not an active connection table, nor is representative of the entire routing table.
pub peers: Vec<PeerTableData>,
}
/// Describe a private route change that has happened
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
pub struct VeilidRouteChange {
/// If a private route that was allocated has died, it is listed here.
#[schemars(with = "Vec<String>")]
pub dead_routes: Vec<RouteId>,
/// If a private route that was imported has died, it is listed here.
#[schemars(with = "Vec<String>")]
pub dead_remote_routes: Vec<RouteId>,
}
/// Describe changes to the Veilid node configuration
/// Currently this is only ever emitted once, however we reserve the right to
/// add the ability to change the configuration or have it changed by the Veilid node
/// itself during runtime.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
pub struct VeilidStateConfig {
/// If the Veilid node configuration has changed the full new config will be here.
pub config: VeilidConfigInner,
}
/// Describe when DHT records have subkey values changed
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
pub struct VeilidValueChange {
/// The DHT Record key that changed
#[schemars(with = "String")]
pub key: TypedKey,
/// The portion of the DHT Record's subkeys that have changed
/// If the subkey range is empty, any watch present on the value has died.
pub subkeys: ValueSubkeyRangeSet,
/// The count remaining on the watch that triggered this value change
/// If there is no watch and this is received, it will be set to u32::MAX
/// If this value is zero, any watch present on the value has died.
pub count: u32,
/// The (optional) value data for the first subkey in the subkeys range
/// If 'subkeys' is not a single value, other values than the first value
/// must be retrieved with RoutingContext::get_dht_value().
pub value: Option<ValueData>,
}
/// An update from the veilid-core to the host application describing a change
/// to the internal state of the Veilid node.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify), tsify(into_wasm_abi))]
#[serde(tag = "kind")]
@ -120,6 +155,7 @@ pub enum VeilidUpdate {
}
from_impl_to_jsvalue!(VeilidUpdate);
/// A queriable state of the internals of veilid-core.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify), tsify(into_wasm_abi))]
pub struct VeilidState {

View File

@ -0,0 +1,125 @@
use futures_util::{
future::{select, Either},
stream::FuturesUnordered,
StreamExt,
};
use stop_token::future::FutureExt as _;
use super::*;
/// Background processor for streams
/// Handles streams to completion, passing each item from the stream to a callback
pub struct DeferredStreamProcessor {
pub opt_deferred_stream_channel: Option<flume::Sender<SendPinBoxFuture<()>>>,
pub opt_stopper: Option<StopSource>,
pub opt_join_handle: Option<MustJoinHandle<()>>,
}
impl DeferredStreamProcessor {
/// Create a new DeferredStreamProcessor
pub fn new() -> Self {
Self {
opt_deferred_stream_channel: None,
opt_stopper: None,
opt_join_handle: None,
}
}
/// Initialize the processor before use
pub async fn init(&mut self) {
let stopper = StopSource::new();
let stop_token = stopper.token();
self.opt_stopper = Some(stopper);
let (dsc_tx, dsc_rx) = flume::unbounded::<SendPinBoxFuture<()>>();
self.opt_deferred_stream_channel = Some(dsc_tx);
self.opt_join_handle = Some(spawn(Self::processor(stop_token, dsc_rx)));
}
/// Terminate the processor and ensure all streams are closed
pub async fn terminate(&mut self) {
drop(self.opt_deferred_stream_channel.take());
drop(self.opt_stopper.take());
if let Some(jh) = self.opt_join_handle.take() {
jh.await;
}
}
async fn processor(stop_token: StopToken, dsc_rx: flume::Receiver<SendPinBoxFuture<()>>) {
let mut unord = FuturesUnordered::<SendPinBoxFuture<()>>::new();
// Ensure the unord never finishes
unord.push(Box::pin(std::future::pending()));
// Processor loop
let mut unord_fut = unord.next();
let mut dsc_fut = dsc_rx.recv_async();
while let Ok(res) = select(unord_fut, dsc_fut)
.timeout_at(stop_token.clone())
.await
{
match res {
Either::Left((x, old_dsc_fut)) => {
// Unord future processor should never get empty
assert!(x.is_some());
// Make another unord future to process
unord_fut = unord.next();
// put back the other future and keep going
dsc_fut = old_dsc_fut;
}
Either::Right((new_proc, old_unord_fut)) => {
// Immediately drop the old unord future
// because we never care about it completing
drop(old_unord_fut);
let Ok(new_proc) = new_proc else {
break;
};
// Add a new stream to process
unord.push(new_proc);
// Make a new unord future because we don't care about the
// completion of the last unord future, they never return
// anything.
unord_fut = unord.next();
// Make a new receiver future
dsc_fut = dsc_rx.recv_async();
}
}
}
}
/// Queue a stream to process in the background
/// * 'receiver' is the stream to process
/// * 'handler' is the callback to handle each item from the stream
/// Returns 'true' if the stream was added for processing, and 'false' if the stream could not be added, possibly due to not being initialized
pub fn add<T: Send + 'static>(
&mut self,
receiver: flume::Receiver<T>,
mut handler: impl FnMut(T) -> SendPinBoxFuture<bool> + Send + 'static,
) -> bool {
let Some(st) = self.opt_stopper.as_ref().map(|s| s.token()) else {
return false;
};
let Some(dsc_tx) = self.opt_deferred_stream_channel.clone() else {
return false;
};
let drp = Box::pin(async move {
while let Ok(Ok(res)) = receiver.recv_async().timeout_at(st.clone()).await {
if !handler(res).await {
break;
}
}
});
if dsc_tx.send(drp).is_err() {
return false;
}
true
}
}
impl Default for DeferredStreamProcessor {
fn default() -> Self {
Self::new()
}
}

View File

@ -29,6 +29,7 @@ pub mod assembly_buffer;
pub mod async_peek_stream;
pub mod async_tag_lock;
pub mod clone_stream;
pub mod deferred_stream_processor;
pub mod eventual;
pub mod eventual_base;
pub mod eventual_value;
@ -162,6 +163,8 @@ pub use async_tag_lock::*;
#[doc(inline)]
pub use clone_stream::*;
#[doc(inline)]
pub use deferred_stream_processor::*;
#[doc(inline)]
pub use eventual::*;
#[doc(inline)]
pub use eventual_base::{EventualCommon, EventualResolvedFuture};