mirror of
https://gitlab.com/veilid/veilid.git
synced 2024-12-28 00:39:25 -05:00
initial version of preempt for dht get_value
This commit is contained in:
parent
048dbe476b
commit
ddb889e4cf
@ -10,10 +10,16 @@ where
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub(crate) enum FanoutResultKind {
|
||||
Partial,
|
||||
Timeout,
|
||||
Finished,
|
||||
Exhausted,
|
||||
}
|
||||
impl FanoutResultKind {
|
||||
pub fn is_partial(&self) -> bool {
|
||||
matches!(self, Self::Partial)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct FanoutResult {
|
||||
@ -23,6 +29,7 @@ pub(crate) struct FanoutResult {
|
||||
|
||||
pub(crate) fn debug_fanout_result(result: &FanoutResult) -> String {
|
||||
let kc = match result.kind {
|
||||
FanoutResultKind::Partial => "P",
|
||||
FanoutResultKind::Timeout => "T",
|
||||
FanoutResultKind::Finished => "F",
|
||||
FanoutResultKind::Exhausted => "E",
|
||||
|
@ -10,6 +10,8 @@ struct OutboundGetValueContext {
|
||||
pub descriptor: Option<Arc<SignedValueDescriptor>>,
|
||||
/// The parsed schema from the descriptor if we have one
|
||||
pub schema: Option<DHTSchema>,
|
||||
/// If we should send a partial update with the current contetx
|
||||
pub send_partial_update: bool,
|
||||
}
|
||||
|
||||
/// The result of the outbound_get_value operation
|
||||
@ -29,7 +31,7 @@ impl StorageManager {
|
||||
subkey: ValueSubkey,
|
||||
safety_selection: SafetySelection,
|
||||
last_get_result: GetResult,
|
||||
) -> VeilidAPIResult<OutboundGetValueResult> {
|
||||
) -> VeilidAPIResult<flume::Receiver<VeilidAPIResult<OutboundGetValueResult>>> {
|
||||
let routing_table = rpc_processor.routing_table();
|
||||
|
||||
// Get the DHT parameters for 'GetValue'
|
||||
@ -49,171 +51,301 @@ impl StorageManager {
|
||||
inner.get_value_nodes(key)?.unwrap_or_default()
|
||||
};
|
||||
|
||||
// Make do-get-value answer context
|
||||
// Parse the schema
|
||||
let schema = if let Some(d) = &last_get_result.opt_descriptor {
|
||||
Some(d.schema()?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Make the return channel
|
||||
let (out_tx, out_rx) = flume::unbounded::<VeilidAPIResult<OutboundGetValueResult>>();
|
||||
|
||||
// Make do-get-value answer context
|
||||
let context = Arc::new(Mutex::new(OutboundGetValueContext {
|
||||
value: last_get_result.opt_value,
|
||||
value_nodes: vec![],
|
||||
descriptor: last_get_result.opt_descriptor.clone(),
|
||||
schema,
|
||||
send_partial_update: false,
|
||||
}));
|
||||
|
||||
// Routine to call to generate fanout
|
||||
let call_routine = |next_node: NodeRef| {
|
||||
let rpc_processor = rpc_processor.clone();
|
||||
let call_routine = {
|
||||
let context = context.clone();
|
||||
let last_descriptor = last_get_result.opt_descriptor.clone();
|
||||
async move {
|
||||
let gva = network_result_try!(
|
||||
rpc_processor
|
||||
.clone()
|
||||
.rpc_call_get_value(
|
||||
Destination::direct(next_node.clone()).with_safety(safety_selection),
|
||||
key,
|
||||
subkey,
|
||||
last_descriptor.map(|x| (*x).clone()),
|
||||
)
|
||||
.await?
|
||||
);
|
||||
let rpc_processor = rpc_processor.clone();
|
||||
move |next_node: NodeRef| {
|
||||
let context = context.clone();
|
||||
let rpc_processor = rpc_processor.clone();
|
||||
let last_descriptor = last_get_result.opt_descriptor.clone();
|
||||
async move {
|
||||
let gva = network_result_try!(
|
||||
rpc_processor
|
||||
.clone()
|
||||
.rpc_call_get_value(
|
||||
Destination::direct(next_node.clone())
|
||||
.with_safety(safety_selection),
|
||||
key,
|
||||
subkey,
|
||||
last_descriptor.map(|x| (*x).clone()),
|
||||
)
|
||||
.await?
|
||||
);
|
||||
|
||||
// Keep the descriptor if we got one. If we had a last_descriptor it will
|
||||
// already be validated by rpc_call_get_value
|
||||
if let Some(descriptor) = gva.answer.descriptor {
|
||||
let mut ctx = context.lock();
|
||||
if ctx.descriptor.is_none() && ctx.schema.is_none() {
|
||||
let schema = match descriptor.schema() {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
return Ok(NetworkResult::invalid_message(e));
|
||||
}
|
||||
// Keep the descriptor if we got one. If we had a last_descriptor it will
|
||||
// already be validated by rpc_call_get_value
|
||||
if let Some(descriptor) = gva.answer.descriptor {
|
||||
let mut ctx = context.lock();
|
||||
if ctx.descriptor.is_none() && ctx.schema.is_none() {
|
||||
let schema = match descriptor.schema() {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
return Ok(NetworkResult::invalid_message(e));
|
||||
}
|
||||
};
|
||||
ctx.schema = Some(schema);
|
||||
ctx.descriptor = Some(Arc::new(descriptor));
|
||||
}
|
||||
}
|
||||
|
||||
// Keep the value if we got one and it is newer and it passes schema validation
|
||||
if let Some(value) = gva.answer.value {
|
||||
log_dht!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq());
|
||||
let mut ctx = context.lock();
|
||||
|
||||
// Ensure we have a schema and descriptor
|
||||
let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema)
|
||||
else {
|
||||
// Got a value but no descriptor for it
|
||||
// Move to the next node
|
||||
return Ok(NetworkResult::invalid_message(
|
||||
"Got value with no descriptor",
|
||||
));
|
||||
};
|
||||
ctx.schema = Some(schema);
|
||||
ctx.descriptor = Some(Arc::new(descriptor));
|
||||
}
|
||||
}
|
||||
|
||||
// Keep the value if we got one and it is newer and it passes schema validation
|
||||
if let Some(value) = gva.answer.value {
|
||||
log_dht!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq());
|
||||
let mut ctx = context.lock();
|
||||
// Validate with schema
|
||||
if !schema.check_subkey_value_data(
|
||||
descriptor.owner(),
|
||||
subkey,
|
||||
value.value_data(),
|
||||
) {
|
||||
// Validation failed, ignore this value
|
||||
// Move to the next node
|
||||
return Ok(NetworkResult::invalid_message(format!(
|
||||
"Schema validation failed on subkey {}",
|
||||
subkey
|
||||
)));
|
||||
}
|
||||
|
||||
// Ensure we have a schema and descriptor
|
||||
let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else {
|
||||
// Got a value but no descriptor for it
|
||||
// Move to the next node
|
||||
return Ok(NetworkResult::invalid_message(
|
||||
"Got value with no descriptor",
|
||||
));
|
||||
};
|
||||
// If we have a prior value, see if this is a newer sequence number
|
||||
if let Some(prior_value) = &ctx.value {
|
||||
let prior_seq = prior_value.value_data().seq();
|
||||
let new_seq = value.value_data().seq();
|
||||
|
||||
// Validate with schema
|
||||
if !schema.check_subkey_value_data(
|
||||
descriptor.owner(),
|
||||
subkey,
|
||||
value.value_data(),
|
||||
) {
|
||||
// Validation failed, ignore this value
|
||||
// Move to the next node
|
||||
return Ok(NetworkResult::invalid_message(format!(
|
||||
"Schema validation failed on subkey {}",
|
||||
subkey
|
||||
)));
|
||||
}
|
||||
|
||||
// If we have a prior value, see if this is a newer sequence number
|
||||
if let Some(prior_value) = &ctx.value {
|
||||
let prior_seq = prior_value.value_data().seq();
|
||||
let new_seq = value.value_data().seq();
|
||||
|
||||
if new_seq == prior_seq {
|
||||
// If sequence number is the same, the data should be the same
|
||||
if prior_value.value_data() != value.value_data() {
|
||||
// Move to the next node
|
||||
return Ok(NetworkResult::invalid_message("value data mismatch"));
|
||||
if new_seq == prior_seq {
|
||||
// If sequence number is the same, the data should be the same
|
||||
if prior_value.value_data() != value.value_data() {
|
||||
// Move to the next node
|
||||
return Ok(NetworkResult::invalid_message(
|
||||
"value data mismatch",
|
||||
));
|
||||
}
|
||||
// Increase the consensus count for the existing value
|
||||
ctx.value_nodes.push(next_node);
|
||||
} else if new_seq > prior_seq {
|
||||
// If the sequence number is greater, start over with the new value
|
||||
ctx.value = Some(Arc::new(value));
|
||||
// One node has shown us this value so far
|
||||
ctx.value_nodes = vec![next_node];
|
||||
// Send an update since the value changed
|
||||
ctx.send_partial_update = true;
|
||||
} else {
|
||||
// If the sequence number is older, ignore it
|
||||
}
|
||||
// Increase the consensus count for the existing value
|
||||
ctx.value_nodes.push(next_node);
|
||||
} else if new_seq > prior_seq {
|
||||
// If the sequence number is greater, start over with the new value
|
||||
} else {
|
||||
// If we have no prior value, keep it
|
||||
ctx.value = Some(Arc::new(value));
|
||||
// One node has shown us this value so far
|
||||
ctx.value_nodes = vec![next_node];
|
||||
} else {
|
||||
// If the sequence number is older, ignore it
|
||||
// Send an update since the value changed
|
||||
ctx.send_partial_update = true;
|
||||
}
|
||||
} else {
|
||||
// If we have no prior value, keep it
|
||||
ctx.value = Some(Arc::new(value));
|
||||
// One node has shown us this value so far
|
||||
ctx.value_nodes = vec![next_node];
|
||||
}
|
||||
|
||||
// Return peers if we have some
|
||||
log_network_result!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len());
|
||||
|
||||
Ok(NetworkResult::value(gva.answer.peers))
|
||||
}
|
||||
|
||||
// Return peers if we have some
|
||||
log_network_result!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len());
|
||||
|
||||
Ok(NetworkResult::value(gva.answer.peers))
|
||||
}
|
||||
};
|
||||
|
||||
// Routine to call to check if we're done at each step
|
||||
let check_done = |_closest_nodes: &[NodeRef]| {
|
||||
// If we have reached sufficient consensus, return done
|
||||
let ctx = context.lock();
|
||||
if ctx.value.is_some()
|
||||
&& ctx.descriptor.is_some()
|
||||
&& ctx.value_nodes.len() >= consensus_count
|
||||
{
|
||||
return Some(());
|
||||
let check_done = {
|
||||
let context = context.clone();
|
||||
let out_tx = out_tx.clone();
|
||||
move |_closest_nodes: &[NodeRef]| {
|
||||
let mut ctx = context.lock();
|
||||
|
||||
// send partial update if desired
|
||||
if ctx.send_partial_update {
|
||||
ctx.send_partial_update=false;
|
||||
|
||||
// return partial result
|
||||
let fanout_result = FanoutResult {
|
||||
kind: FanoutResultKind::Partial,
|
||||
value_nodes: ctx.value_nodes.clone(),
|
||||
};
|
||||
if let Err(e) = out_tx.send(Ok(OutboundGetValueResult {
|
||||
fanout_result,
|
||||
get_result: GetResult {
|
||||
opt_value: ctx.value.clone(),
|
||||
opt_descriptor: ctx.descriptor.clone(),
|
||||
},
|
||||
})) {
|
||||
log_dht!(debug "Sending partial GetValue result failed: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// If we have reached sufficient consensus, return done
|
||||
if ctx.value.is_some()
|
||||
&& ctx.descriptor.is_some()
|
||||
&& ctx.value_nodes.len() >= consensus_count
|
||||
{
|
||||
return Some(());
|
||||
}
|
||||
None
|
||||
}
|
||||
None
|
||||
};
|
||||
|
||||
// Call the fanout
|
||||
let fanout_call = FanoutCall::new(
|
||||
routing_table.clone(),
|
||||
// Call the fanout in a spawned task
|
||||
spawn(Box::pin(async move {
|
||||
let fanout_call = FanoutCall::new(
|
||||
routing_table.clone(),
|
||||
key,
|
||||
key_count,
|
||||
fanout,
|
||||
timeout_us,
|
||||
capability_fanout_node_info_filter(vec![CAP_DHT]),
|
||||
call_routine,
|
||||
check_done,
|
||||
);
|
||||
|
||||
let kind = match fanout_call.run(init_fanout_queue).await {
|
||||
// If we don't finish in the timeout (too much time passed checking for consensus)
|
||||
TimeoutOr::Timeout => FanoutResultKind::Timeout,
|
||||
// If we finished with or without consensus (enough nodes returning the same value)
|
||||
TimeoutOr::Value(Ok(Some(()))) => FanoutResultKind::Finished,
|
||||
// If we ran out of nodes before getting consensus)
|
||||
TimeoutOr::Value(Ok(None)) => FanoutResultKind::Exhausted,
|
||||
// Failed
|
||||
TimeoutOr::Value(Err(e)) => {
|
||||
// If we finished with an error, return that
|
||||
log_dht!(debug "GetValue fanout error: {}", e);
|
||||
if let Err(e) = out_tx.send(Err(e.into())) {
|
||||
log_dht!(debug "Sending GetValue fanout error failed: {}", e);
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let ctx = context.lock();
|
||||
let fanout_result = FanoutResult {
|
||||
kind,
|
||||
value_nodes: ctx.value_nodes.clone(),
|
||||
};
|
||||
log_network_result!(debug "GetValue Fanout: {:?}", fanout_result);
|
||||
|
||||
if let Err(e) = out_tx.send(Ok(OutboundGetValueResult {
|
||||
fanout_result,
|
||||
get_result: GetResult {
|
||||
opt_value: ctx.value.clone(),
|
||||
opt_descriptor: ctx.descriptor.clone(),
|
||||
},
|
||||
})) {
|
||||
log_dht!(debug "Sending GetValue result failed: {}", e);
|
||||
}
|
||||
}))
|
||||
.detach();
|
||||
|
||||
Ok(out_rx)
|
||||
}
|
||||
|
||||
pub(super) fn process_deferred_outbound_get_value_result_inner(&self, inner: &mut StorageManagerInner, res_rx: flume::Receiver<Result<get_value::OutboundGetValueResult, VeilidAPIError>>, key: TypedKey, subkey: ValueSubkey, last_seq: ValueSeqNum) {
|
||||
let this = self.clone();
|
||||
inner.process_deferred_results(
|
||||
res_rx,
|
||||
Box::new(
|
||||
move |result: VeilidAPIResult<get_value::OutboundGetValueResult>| -> SendPinBoxFuture<bool> {
|
||||
let this = this.clone();
|
||||
Box::pin(async move {
|
||||
let result = match result {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
log_rtab!(debug "Deferred fanout error: {}", e);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
let is_partial = result.fanout_result.kind.is_partial();
|
||||
let value_data = match this.process_outbound_get_value_result(key, subkey, Some(last_seq), result).await {
|
||||
Ok(Some(v)) => v,
|
||||
Ok(None) => {
|
||||
return is_partial;
|
||||
}
|
||||
Err(e) => {
|
||||
log_rtab!(debug "Deferred fanout error: {}", e);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
if is_partial {
|
||||
// If more partial results show up, don't send an update until we're done
|
||||
return true;
|
||||
}
|
||||
// If we processed the final result, possibly send an update
|
||||
// if the sequence number changed since our first partial update
|
||||
// Send with a max count as this is not attached to any watch
|
||||
if last_seq != value_data.seq() {
|
||||
if let Err(e) = this.update_callback_value_change(key,ValueSubkeyRangeSet::single(subkey), u32::MAX, Some(value_data)).await {
|
||||
log_rtab!(debug "Failed sending deferred fanout value change: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Return done
|
||||
false
|
||||
})
|
||||
},
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
pub(super) async fn process_outbound_get_value_result(&self, key: TypedKey, subkey: ValueSubkey, opt_last_seq: Option<u32>, result: get_value::OutboundGetValueResult) -> Result<Option<ValueData>, VeilidAPIError> {
|
||||
// See if we got a value back
|
||||
let Some(get_result_value) = result.get_result.opt_value else {
|
||||
// If we got nothing back then we also had nothing beforehand, return nothing
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// Keep the list of nodes that returned a value for later reference
|
||||
let mut inner = self.lock().await?;
|
||||
|
||||
inner.process_fanout_results(
|
||||
key,
|
||||
key_count,
|
||||
fanout,
|
||||
timeout_us,
|
||||
capability_fanout_node_info_filter(vec![CAP_DHT]),
|
||||
call_routine,
|
||||
check_done,
|
||||
core::iter::once((subkey, &result.fanout_result)),
|
||||
false,
|
||||
);
|
||||
|
||||
let kind = match fanout_call.run(init_fanout_queue).await {
|
||||
// If we don't finish in the timeout (too much time passed checking for consensus)
|
||||
TimeoutOr::Timeout => FanoutResultKind::Timeout,
|
||||
// If we finished with or without consensus (enough nodes returning the same value)
|
||||
TimeoutOr::Value(Ok(Some(()))) => FanoutResultKind::Finished,
|
||||
// If we ran out of nodes before getting consensus)
|
||||
TimeoutOr::Value(Ok(None)) => FanoutResultKind::Exhausted,
|
||||
// Failed
|
||||
TimeoutOr::Value(Err(e)) => {
|
||||
// If we finished with an error, return that
|
||||
log_dht!(debug "GetValue Fanout Error: {}", e);
|
||||
return Err(e.into());
|
||||
}
|
||||
};
|
||||
|
||||
let ctx = context.lock();
|
||||
let fanout_result = FanoutResult {
|
||||
kind,
|
||||
value_nodes: ctx.value_nodes.clone(),
|
||||
};
|
||||
log_network_result!(debug "GetValue Fanout: {:?}", fanout_result);
|
||||
|
||||
Ok(OutboundGetValueResult {
|
||||
fanout_result,
|
||||
get_result: GetResult {
|
||||
opt_value: ctx.value.clone(),
|
||||
opt_descriptor: ctx.descriptor.clone(),
|
||||
},
|
||||
})
|
||||
// If we got a new value back then write it to the opened record
|
||||
if Some(get_result_value.value_data().seq()) != opt_last_seq {
|
||||
inner
|
||||
.handle_set_local_value(
|
||||
key,
|
||||
subkey,
|
||||
get_result_value.clone(),
|
||||
WatchUpdateMode::UpdateAll,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Ok(Some(get_result_value.value_data().clone()))
|
||||
}
|
||||
|
||||
/// Handle a received 'Get Value' query
|
||||
|
@ -264,7 +264,7 @@ impl StorageManager {
|
||||
// No last descriptor, no last value
|
||||
// Use the safety selection we opened the record with
|
||||
let subkey: ValueSubkey = 0;
|
||||
let result = self
|
||||
let res_rx = self
|
||||
.outbound_get_value(
|
||||
rpc_processor,
|
||||
key,
|
||||
@ -273,12 +273,24 @@ impl StorageManager {
|
||||
GetResult::default(),
|
||||
)
|
||||
.await?;
|
||||
// Wait for the first result
|
||||
let Ok(result) = res_rx.recv_async().await else {
|
||||
apibail_internal!("failed to receive results");
|
||||
};
|
||||
let result = result?;
|
||||
|
||||
// If we got nothing back, the key wasn't found
|
||||
if result.get_result.opt_value.is_none() && result.get_result.opt_descriptor.is_none() {
|
||||
// No result
|
||||
apibail_key_not_found!(key);
|
||||
};
|
||||
let last_seq = result
|
||||
.get_result
|
||||
.opt_value
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.value_data()
|
||||
.seq();
|
||||
|
||||
// Reopen inner to store value we just got
|
||||
let mut inner = self.lock().await?;
|
||||
@ -295,9 +307,16 @@ impl StorageManager {
|
||||
}
|
||||
|
||||
// Open the new record
|
||||
inner
|
||||
let out = inner
|
||||
.open_new_record(key, writer, subkey, result.get_result, safety_selection)
|
||||
.await
|
||||
.await;
|
||||
|
||||
if out.is_ok() {
|
||||
self.process_deferred_outbound_get_value_result_inner(
|
||||
&mut inner, res_rx, key, subkey, last_seq,
|
||||
);
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
/// Close an opened local record
|
||||
@ -402,7 +421,7 @@ impl StorageManager {
|
||||
.opt_value
|
||||
.as_ref()
|
||||
.map(|v| v.value_data().seq());
|
||||
let result = self
|
||||
let res_rx = self
|
||||
.outbound_get_value(
|
||||
rpc_processor,
|
||||
key,
|
||||
@ -412,32 +431,33 @@ impl StorageManager {
|
||||
)
|
||||
.await?;
|
||||
|
||||
// See if we got a value back
|
||||
let Some(get_result_value) = result.get_result.opt_value else {
|
||||
// If we got nothing back then we also had nothing beforehand, return nothing
|
||||
return Ok(None);
|
||||
// Wait for the first result
|
||||
let Ok(result) = res_rx.recv_async().await else {
|
||||
apibail_internal!("failed to receive results");
|
||||
};
|
||||
let result = result?;
|
||||
let partial = result.fanout_result.kind.is_partial();
|
||||
|
||||
// Keep the list of nodes that returned a value for later reference
|
||||
let mut inner = self.lock().await?;
|
||||
inner.process_fanout_results(
|
||||
key,
|
||||
core::iter::once((subkey, &result.fanout_result)),
|
||||
false,
|
||||
);
|
||||
// Process the returned result
|
||||
let out = self
|
||||
.process_outbound_get_value_result(key, subkey, opt_last_seq, result)
|
||||
.await?;
|
||||
|
||||
// If we got a new value back then write it to the opened record
|
||||
if Some(get_result_value.value_data().seq()) != opt_last_seq {
|
||||
inner
|
||||
.handle_set_local_value(
|
||||
if let Some(out) = &out {
|
||||
// If there's more to process, do it in the background
|
||||
if partial {
|
||||
let mut inner = self.lock().await?;
|
||||
self.process_deferred_outbound_get_value_result_inner(
|
||||
&mut inner,
|
||||
res_rx,
|
||||
key,
|
||||
subkey,
|
||||
get_result_value.clone(),
|
||||
WatchUpdateMode::UpdateAll,
|
||||
)
|
||||
.await?;
|
||||
out.seq(),
|
||||
);
|
||||
}
|
||||
}
|
||||
Ok(Some(get_result_value.value_data().clone()))
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Set the value of a subkey on an opened local record
|
||||
@ -920,6 +940,31 @@ impl StorageManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Send a value change up through the callback
|
||||
#[instrument(level = "trace", skip(self), err)]
|
||||
async fn update_callback_value_change(
|
||||
&self,
|
||||
key: TypedKey,
|
||||
subkeys: ValueSubkeyRangeSet,
|
||||
count: u32,
|
||||
value: Option<ValueData>,
|
||||
) -> Result<(), VeilidAPIError> {
|
||||
let opt_update_callback = {
|
||||
let inner = self.lock().await?;
|
||||
inner.update_callback.clone()
|
||||
};
|
||||
|
||||
if let Some(update_callback) = opt_update_callback {
|
||||
update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
|
||||
key,
|
||||
subkeys,
|
||||
count,
|
||||
value,
|
||||
})));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn check_fanout_set_offline(
|
||||
&self,
|
||||
key: TypedKey,
|
||||
@ -927,6 +972,7 @@ impl StorageManager {
|
||||
fanout_result: &FanoutResult,
|
||||
) -> bool {
|
||||
match fanout_result.kind {
|
||||
FanoutResultKind::Partial => false,
|
||||
FanoutResultKind::Timeout => {
|
||||
log_stor!(debug "timeout in set_value, adding offline subkey: {}:{}", key, subkey);
|
||||
true
|
||||
|
@ -32,6 +32,8 @@ pub(super) struct StorageManagerInner {
|
||||
pub tick_future: Option<SendPinBoxFuture<()>>,
|
||||
/// Update callback to send ValueChanged notification to
|
||||
pub update_callback: Option<UpdateCallback>,
|
||||
/// Deferred result processor
|
||||
pub deferred_result_processor: DeferredStreamProcessor,
|
||||
|
||||
/// The maximum consensus count
|
||||
set_consensus_count: usize,
|
||||
@ -88,6 +90,7 @@ impl StorageManagerInner {
|
||||
opt_routing_table: Default::default(),
|
||||
tick_future: Default::default(),
|
||||
update_callback: None,
|
||||
deferred_result_processor: DeferredStreamProcessor::default(),
|
||||
set_consensus_count,
|
||||
}
|
||||
}
|
||||
@ -126,6 +129,9 @@ impl StorageManagerInner {
|
||||
|
||||
self.load_metadata().await?;
|
||||
|
||||
// Start deferred results processors
|
||||
self.deferred_result_processor.init().await;
|
||||
|
||||
// Schedule tick
|
||||
let tick_future = interval(1000, move || {
|
||||
let this = outer_self.clone();
|
||||
@ -151,6 +157,9 @@ impl StorageManagerInner {
|
||||
f.await;
|
||||
}
|
||||
|
||||
// Stop deferred result processor
|
||||
self.deferred_result_processor.terminate().await;
|
||||
|
||||
// Final flush on record stores
|
||||
if let Some(mut local_record_store) = self.local_record_store.take() {
|
||||
if let Err(e) = local_record_store.flush().await {
|
||||
@ -708,4 +717,12 @@ impl StorageManagerInner {
|
||||
subkeys: ValueSubkeyRangeSet::single(subkey),
|
||||
});
|
||||
}
|
||||
|
||||
pub fn process_deferred_results<T: Send + 'static>(
|
||||
&mut self,
|
||||
receiver: flume::Receiver<T>,
|
||||
handler: impl FnMut(T) -> SendPinBoxFuture<bool> + Send + 'static,
|
||||
) -> bool {
|
||||
self.deferred_result_processor.add(receiver, handler)
|
||||
}
|
||||
}
|
||||
|
@ -417,7 +417,7 @@ impl StorageManager {
|
||||
watch_id: u64,
|
||||
) -> VeilidAPIResult<NetworkResult<()>> {
|
||||
// Update local record store with new value
|
||||
let (is_value_seq_newer, opt_update_callback, value) = {
|
||||
let (is_value_seq_newer, value) = {
|
||||
let mut inner = self.lock().await?;
|
||||
|
||||
// Don't process update if the record is closed
|
||||
@ -516,7 +516,7 @@ impl StorageManager {
|
||||
}
|
||||
}
|
||||
|
||||
(is_value_seq_newer, inner.update_callback.clone(), value)
|
||||
(is_value_seq_newer, value)
|
||||
};
|
||||
|
||||
// Announce ValueChanged VeilidUpdate
|
||||
@ -526,18 +526,13 @@ impl StorageManager {
|
||||
|
||||
let do_update = is_value_seq_newer || subkeys.len() > 1 || count == 0;
|
||||
if do_update {
|
||||
if let Some(update_callback) = opt_update_callback {
|
||||
update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
|
||||
key,
|
||||
subkeys,
|
||||
count,
|
||||
value: if is_value_seq_newer {
|
||||
Some(value.unwrap().value_data().clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
})));
|
||||
}
|
||||
let value = if is_value_seq_newer {
|
||||
Some(value.unwrap().value_data().clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
self.update_callback_value_change(key, subkeys, count, value)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(NetworkResult::value(()))
|
||||
|
@ -52,58 +52,93 @@ impl TryFrom<String> for AttachmentState {
|
||||
}
|
||||
}
|
||||
|
||||
/// Describe the attachment state of the Veilid node
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
|
||||
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
|
||||
pub struct VeilidStateAttachment {
|
||||
/// The overall quality of the routing table if attached, or the current state the attachment state machine.
|
||||
pub state: AttachmentState,
|
||||
/// If attached and there are enough eachable nodes in the routing table to perform all the actions of the PublicInternet RoutingDomain,
|
||||
/// including things like private/safety route allocation and DHT operations.
|
||||
pub public_internet_ready: bool,
|
||||
/// If attached and there are enough eachable nodes in the routing table to perform all the actions of the LocalNetwork RoutingDomain.
|
||||
pub local_network_ready: bool,
|
||||
}
|
||||
|
||||
/// Describe a recently accessed peer
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
|
||||
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
|
||||
pub struct PeerTableData {
|
||||
/// The node ids used by this peer
|
||||
#[schemars(with = "Vec<String>")]
|
||||
#[cfg_attr(target_arch = "wasm32", tsify(type = "string[]"))]
|
||||
pub node_ids: Vec<TypedKey>,
|
||||
/// The peer's human readable address.
|
||||
pub peer_address: String,
|
||||
/// Statistics we have collected on this peer.
|
||||
pub peer_stats: PeerStats,
|
||||
}
|
||||
|
||||
/// Describe the current network state of the Veilid node
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
|
||||
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
|
||||
pub struct VeilidStateNetwork {
|
||||
/// If the network has been started or not.
|
||||
pub started: bool,
|
||||
/// The total number of bytes per second used by Veilid currently in the download direction.
|
||||
pub bps_down: ByteCount,
|
||||
/// The total number of bytes per second used by Veilid currently in the upload direction.
|
||||
pub bps_up: ByteCount,
|
||||
/// The list of most recently accessed peers.
|
||||
/// This is not an active connection table, nor is representative of the entire routing table.
|
||||
pub peers: Vec<PeerTableData>,
|
||||
}
|
||||
|
||||
/// Describe a private route change that has happened
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
|
||||
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
|
||||
pub struct VeilidRouteChange {
|
||||
/// If a private route that was allocated has died, it is listed here.
|
||||
#[schemars(with = "Vec<String>")]
|
||||
pub dead_routes: Vec<RouteId>,
|
||||
/// If a private route that was imported has died, it is listed here.
|
||||
#[schemars(with = "Vec<String>")]
|
||||
pub dead_remote_routes: Vec<RouteId>,
|
||||
}
|
||||
|
||||
/// Describe changes to the Veilid node configuration
|
||||
/// Currently this is only ever emitted once, however we reserve the right to
|
||||
/// add the ability to change the configuration or have it changed by the Veilid node
|
||||
/// itself during runtime.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
|
||||
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
|
||||
pub struct VeilidStateConfig {
|
||||
/// If the Veilid node configuration has changed the full new config will be here.
|
||||
pub config: VeilidConfigInner,
|
||||
}
|
||||
|
||||
/// Describe when DHT records have subkey values changed
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
|
||||
#[cfg_attr(target_arch = "wasm32", derive(Tsify))]
|
||||
pub struct VeilidValueChange {
|
||||
/// The DHT Record key that changed
|
||||
#[schemars(with = "String")]
|
||||
pub key: TypedKey,
|
||||
/// The portion of the DHT Record's subkeys that have changed
|
||||
/// If the subkey range is empty, any watch present on the value has died.
|
||||
pub subkeys: ValueSubkeyRangeSet,
|
||||
/// The count remaining on the watch that triggered this value change
|
||||
/// If there is no watch and this is received, it will be set to u32::MAX
|
||||
/// If this value is zero, any watch present on the value has died.
|
||||
pub count: u32,
|
||||
/// The (optional) value data for the first subkey in the subkeys range
|
||||
/// If 'subkeys' is not a single value, other values than the first value
|
||||
/// must be retrieved with RoutingContext::get_dht_value().
|
||||
pub value: Option<ValueData>,
|
||||
}
|
||||
|
||||
/// An update from the veilid-core to the host application describing a change
|
||||
/// to the internal state of the Veilid node.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
|
||||
#[cfg_attr(target_arch = "wasm32", derive(Tsify), tsify(into_wasm_abi))]
|
||||
#[serde(tag = "kind")]
|
||||
@ -120,6 +155,7 @@ pub enum VeilidUpdate {
|
||||
}
|
||||
from_impl_to_jsvalue!(VeilidUpdate);
|
||||
|
||||
/// A queriable state of the internals of veilid-core.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
|
||||
#[cfg_attr(target_arch = "wasm32", derive(Tsify), tsify(into_wasm_abi))]
|
||||
pub struct VeilidState {
|
||||
|
125
veilid-tools/src/deferred_stream_processor.rs
Normal file
125
veilid-tools/src/deferred_stream_processor.rs
Normal file
@ -0,0 +1,125 @@
|
||||
use futures_util::{
|
||||
future::{select, Either},
|
||||
stream::FuturesUnordered,
|
||||
StreamExt,
|
||||
};
|
||||
use stop_token::future::FutureExt as _;
|
||||
|
||||
use super::*;
|
||||
|
||||
/// Background processor for streams
|
||||
/// Handles streams to completion, passing each item from the stream to a callback
|
||||
pub struct DeferredStreamProcessor {
|
||||
pub opt_deferred_stream_channel: Option<flume::Sender<SendPinBoxFuture<()>>>,
|
||||
pub opt_stopper: Option<StopSource>,
|
||||
pub opt_join_handle: Option<MustJoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl DeferredStreamProcessor {
|
||||
/// Create a new DeferredStreamProcessor
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
opt_deferred_stream_channel: None,
|
||||
opt_stopper: None,
|
||||
opt_join_handle: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize the processor before use
|
||||
pub async fn init(&mut self) {
|
||||
let stopper = StopSource::new();
|
||||
let stop_token = stopper.token();
|
||||
self.opt_stopper = Some(stopper);
|
||||
let (dsc_tx, dsc_rx) = flume::unbounded::<SendPinBoxFuture<()>>();
|
||||
self.opt_deferred_stream_channel = Some(dsc_tx);
|
||||
self.opt_join_handle = Some(spawn(Self::processor(stop_token, dsc_rx)));
|
||||
}
|
||||
|
||||
/// Terminate the processor and ensure all streams are closed
|
||||
pub async fn terminate(&mut self) {
|
||||
drop(self.opt_deferred_stream_channel.take());
|
||||
drop(self.opt_stopper.take());
|
||||
if let Some(jh) = self.opt_join_handle.take() {
|
||||
jh.await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn processor(stop_token: StopToken, dsc_rx: flume::Receiver<SendPinBoxFuture<()>>) {
|
||||
let mut unord = FuturesUnordered::<SendPinBoxFuture<()>>::new();
|
||||
|
||||
// Ensure the unord never finishes
|
||||
unord.push(Box::pin(std::future::pending()));
|
||||
|
||||
// Processor loop
|
||||
let mut unord_fut = unord.next();
|
||||
let mut dsc_fut = dsc_rx.recv_async();
|
||||
while let Ok(res) = select(unord_fut, dsc_fut)
|
||||
.timeout_at(stop_token.clone())
|
||||
.await
|
||||
{
|
||||
match res {
|
||||
Either::Left((x, old_dsc_fut)) => {
|
||||
// Unord future processor should never get empty
|
||||
assert!(x.is_some());
|
||||
|
||||
// Make another unord future to process
|
||||
unord_fut = unord.next();
|
||||
// put back the other future and keep going
|
||||
dsc_fut = old_dsc_fut;
|
||||
}
|
||||
Either::Right((new_proc, old_unord_fut)) => {
|
||||
// Immediately drop the old unord future
|
||||
// because we never care about it completing
|
||||
drop(old_unord_fut);
|
||||
let Ok(new_proc) = new_proc else {
|
||||
break;
|
||||
};
|
||||
|
||||
// Add a new stream to process
|
||||
unord.push(new_proc);
|
||||
|
||||
// Make a new unord future because we don't care about the
|
||||
// completion of the last unord future, they never return
|
||||
// anything.
|
||||
unord_fut = unord.next();
|
||||
// Make a new receiver future
|
||||
dsc_fut = dsc_rx.recv_async();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Queue a stream to process in the background
|
||||
/// * 'receiver' is the stream to process
|
||||
/// * 'handler' is the callback to handle each item from the stream
|
||||
/// Returns 'true' if the stream was added for processing, and 'false' if the stream could not be added, possibly due to not being initialized
|
||||
pub fn add<T: Send + 'static>(
|
||||
&mut self,
|
||||
receiver: flume::Receiver<T>,
|
||||
mut handler: impl FnMut(T) -> SendPinBoxFuture<bool> + Send + 'static,
|
||||
) -> bool {
|
||||
let Some(st) = self.opt_stopper.as_ref().map(|s| s.token()) else {
|
||||
return false;
|
||||
};
|
||||
let Some(dsc_tx) = self.opt_deferred_stream_channel.clone() else {
|
||||
return false;
|
||||
};
|
||||
let drp = Box::pin(async move {
|
||||
while let Ok(Ok(res)) = receiver.recv_async().timeout_at(st.clone()).await {
|
||||
if !handler(res).await {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
if dsc_tx.send(drp).is_err() {
|
||||
return false;
|
||||
}
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for DeferredStreamProcessor {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
@ -29,6 +29,7 @@ pub mod assembly_buffer;
|
||||
pub mod async_peek_stream;
|
||||
pub mod async_tag_lock;
|
||||
pub mod clone_stream;
|
||||
pub mod deferred_stream_processor;
|
||||
pub mod eventual;
|
||||
pub mod eventual_base;
|
||||
pub mod eventual_value;
|
||||
@ -162,6 +163,8 @@ pub use async_tag_lock::*;
|
||||
#[doc(inline)]
|
||||
pub use clone_stream::*;
|
||||
#[doc(inline)]
|
||||
pub use deferred_stream_processor::*;
|
||||
#[doc(inline)]
|
||||
pub use eventual::*;
|
||||
#[doc(inline)]
|
||||
pub use eventual_base::{EventualCommon, EventualResolvedFuture};
|
||||
|
Loading…
Reference in New Issue
Block a user