[ci skip] work in progress

This commit is contained in:
Christien Rioux 2025-03-28 07:33:07 -04:00
parent f002711ab7
commit e636237b2d
9 changed files with 259 additions and 135 deletions

View File

@ -29,8 +29,9 @@ impl Default for FanoutResultKind {
pub struct FanoutResult {
/// How the fanout completed
pub kind: FanoutResultKind,
/// How many nodes counted toward consensus
pub count: usize,
/// The set of nodes that counted toward consensus
/// (for example, had the most recent value for this subkey)
pub consensus_nodes: Vec<NodeRef>,
/// Which nodes accepted the request
pub value_nodes: Vec<NodeRef>,
}
@ -38,7 +39,7 @@ pub struct FanoutResult {
impl fmt::Display for FanoutResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let kc = match self.kind {
FanoutResultKind::Incomplete => "P",
FanoutResultKind::Incomplete => "I",
FanoutResultKind::Timeout => "T",
FanoutResultKind::Consensus => "C",
FanoutResultKind::Exhausted => "E",
@ -48,15 +49,15 @@ impl fmt::Display for FanoutResult {
f,
"{}:{}[{}]",
kc,
self.value_nodes.len(),
self.value_nodes
self.consensus_nodes.len(),
self.consensus_nodes
.iter()
.map(|x| x.to_string())
.collect::<Vec<_>>()
.join(","),
)
} else {
write!(f, "{}:{}", kc, self.value_nodes.len())
write!(f, "{}:{}", kc, self.consensus_nodes.len())
}
}
}
@ -194,8 +195,8 @@ impl<'a> FanoutCall<'a> {
let fanout_result = ctx.fanout_queue.with_nodes(|nodes, sorted_nodes| {
// Count up nodes we have seen in order and see if our closest nodes have a consensus
let mut consensus: Option<bool> = None;
let mut consensus_nodes: Vec<NodeRef> = vec![];
let mut value_nodes: Vec<NodeRef> = vec![];
let mut count: usize = 0;
for sn in sorted_nodes {
let node = nodes.get(sn).unwrap();
match node.status {
@ -216,8 +217,9 @@ impl<'a> FanoutCall<'a> {
FanoutNodeStatus::Accepted => {
// Node counts toward consensus and value node list
value_nodes.push(node.node_ref.clone());
count += 1;
if consensus.is_none() && count >= self.consensus_count {
consensus_nodes.push(node.node_ref.clone());
if consensus.is_none() && consensus_nodes.len() >= self.consensus_count {
consensus = Some(true);
}
}
@ -228,17 +230,17 @@ impl<'a> FanoutCall<'a> {
match consensus {
Some(true) => FanoutResult {
kind: FanoutResultKind::Consensus,
count,
consensus_nodes,
value_nodes,
},
Some(false) => FanoutResult {
kind: FanoutResultKind::Incomplete,
count,
consensus_nodes,
value_nodes,
},
None => FanoutResult {
kind: FanoutResultKind::Exhausted,
count,
consensus_nodes,
value_nodes,
},
}
@ -419,7 +421,7 @@ impl<'a> FanoutCall<'a> {
fanout_queue: FanoutQueue::new(self.node_id.kind, node_sort),
result: FanoutResult {
kind: FanoutResultKind::Incomplete,
count: 0,
consensus_nodes: vec![],
value_nodes: vec![],
},
done: false,

View File

@ -210,9 +210,8 @@ impl StorageManager {
match fanout_result.kind {
FanoutResultKind::Incomplete => {
// Send partial update if desired,
// if we've gotten at least one value node before consensus
if ctx.send_partial_update && fanout_result.value_nodes.len() >= 1 {
// Send partial update if desired, if we've gotten at least one consensus node
if ctx.send_partial_update && fanout_result.consensus_nodes.len() >= 1 {
ctx.send_partial_update = false;
// Return partial result
@ -370,11 +369,18 @@ impl StorageManager {
return Ok(None);
};
// Get cryptosystem
let crypto = self.crypto();
let Some(vcrypto) = crypto.get(key.kind) else {
apibail_generic!("unsupported cryptosystem");
};
// Keep the list of nodes that returned a value for later reference
let mut inner = self.inner.lock().await;
Self::process_fanout_results_inner(
&mut inner,
&vcrypto,
key,
core::iter::once((subkey, &result.fanout_result)),
false,

View File

@ -29,13 +29,10 @@ impl DescriptorInfo {
struct SubkeySeqCount {
/// The newest sequence number found for a subkey
pub seq: ValueSeqNum,
/// The consensus count for the subkey
pub count: usize,
// Maybe add per-subkey value nodes someday? Is this even useful?
// /// The set of nodes that had the most recent value for this subkey
// pub value_nodes: Vec<NodeRef>,
// /// The set of nodes that had any value for this subkey
// pub all_nodes: Vec<NodeRef>,
/// The set of nodes that had the most recent value for this subkey
pub consensus_nodes: Vec<NodeRef>,
/// The set of nodes that had any value for this subkey
pub value_nodes: Vec<NodeRef>,
}
/// The context of the outbound_get_value operation
@ -49,7 +46,7 @@ struct OutboundInspectValueContext {
/// The result of the outbound_get_value operation
pub(super) struct OutboundInspectValueResult {
/// Fanout results for each subkey
pub fanout_results: Vec<FanoutResult>,
pub subkey_fanout_results: Vec<FanoutResult>,
/// The inspection that was retrieved
pub inspect_result: InspectResult,
}
@ -113,7 +110,11 @@ impl StorageManager {
seqcounts: local_inspect_result
.seqs
.iter()
.map(|s| SubkeySeqCount { seq: *s, count: 0 })
.map(|s| SubkeySeqCount {
seq: *s,
consensus_nodes: vec![],
value_nodes: vec![],
})
.collect(),
opt_descriptor_info,
}));
@ -209,7 +210,8 @@ impl StorageManager {
.map(|s| SubkeySeqCount {
seq: *s,
// One node has shown us the newest sequence numbers so far
count: 1,
consensus_nodes: vec![next_node.clone()],
value_nodes: vec![next_node.clone()],
})
.collect();
} else {
@ -227,7 +229,7 @@ impl StorageManager {
// If we already have consensus for this subkey, don't bother updating it any more
// While we may find a better sequence number if we keep looking, this does not mimic the behavior
// of get and set unless we stop here
if ctx_seqcnt.count >= consensus_count {
if ctx_seqcnt.consensus_nodes.len() >= consensus_count {
continue;
}
@ -240,12 +242,13 @@ impl StorageManager {
{
// One node has shown us the latest sequence numbers so far
ctx_seqcnt.seq = answer_seq;
ctx_seqcnt.count = 1;
ctx_seqcnt.consensus_nodes = vec![next_node.clone()];
} else if answer_seq == ctx_seqcnt.seq {
// Keep the nodes that showed us the latest values
ctx_seqcnt.count += 1;
ctx_seqcnt.consensus_nodes.push(next_node.clone());
}
}
ctx_seqcnt.value_nodes.push(next_node.clone());
}
}
@ -261,15 +264,17 @@ impl StorageManager {
};
// Routine to call to check if we're done at each step
// For inspect, we are tracking consensus externally from the FanoutCall,
// for each subkey, rather than a single consensus, so the single fanoutresult
// that is passed in here is ignored in favor of our own per-subkey tracking
let check_done = {
let context = context.clone();
Arc::new(move |fanout_result: &FanoutResult| {
Arc::new(move |_: &FanoutResult| {
// If we have reached sufficient consensus on all subkeys, return done
let ctx = context.lock();
let mut has_consensus = true;
for cs in ctx.seqcounts.iter() {
if cs.count < consensus_count {
if cs.consensus_nodes.len() < consensus_count {
has_consensus = false;
break;
}
@ -295,27 +300,26 @@ impl StorageManager {
let fanout_result = fanout_call.run(init_fanout_queue).await?;
xxx continue applying array of count/value_nodes to inspect output, make sure it lines up with fanout behavior
let ctx = context.lock();
let mut fanout_results = vec![];
let mut subkey_fanout_results = vec![];
for cs in &ctx.seqcounts {
let has_consensus = cs.count >= consensus_count;
let fanout_result = FanoutResult {
let has_consensus = cs.consensus_nodes.len() >= consensus_count;
let subkey_fanout_result = FanoutResult {
kind: if has_consensus {
FanoutResultKind::Consensus
} else {
kind
fanout_result.kind
},
consensus_nodes: cs.consensus_nodes.clone(),
value_nodes: cs.value_nodes.clone(),
};
fanout_results.push(fanout_result);
subkey_fanout_results.push(subkey_fanout_result);
}
veilid_log!(self debug "InspectValue Fanout ({:?}):\n{}", kind, debug_fanout_results(&fanout_results));
veilid_log!(self debug "InspectValue Fanout ({:?}):\n{}", fanout_result.kind, debug_fanout_results(&subkey_fanout_results));
Ok(OutboundInspectValueResult {
fanout_results,
subkey_fanout_results,
inspect_result: InspectResult {
subkeys: ctx
.opt_descriptor_info

View File

@ -364,19 +364,21 @@ impl StorageManager {
/// Get the set of nodes in our active watches
pub async fn get_active_watch_nodes(&self) -> Vec<Destination> {
let inner = self.inner.lock().await;
inner
.opened_records
.values()
.filter_map(|v| {
v.active_watch().map(|aw| {
Destination::direct(
aw.watch_node
.routing_domain_filtered(RoutingDomain::PublicInternet),
)
.with_safety(v.safety_selection())
})
})
.collect()
let mut out = vec![];
for opened_record in inner.opened_records.values() {
if let Some(aw) = opened_record.active_watch() {
for pn in &aw.per_node {
out.push(
Destination::direct(
pn.watch_node
.routing_domain_filtered(RoutingDomain::PublicInternet),
)
.with_safety(opened_record.safety_selection()),
);
}
}
}
out
}
/// Builds the record key for a given schema and owner
@ -857,16 +859,17 @@ impl StorageManager {
let inner = self.inner.lock().await;
// Get the safety selection and the writer we opened this record
// and whatever active watch id and watch node we may have in case this is a watch update
let (safety_selection, opt_writer, opt_watch_id, opt_watch_node) = {
// and whatever active watch we may have in case this is a watch update
let (safety_selection, opt_writer, opt_watch_nodes) = {
let Some(opened_record) = inner.opened_records.get(&key) else {
apibail_generic!("record not open");
};
(
opened_record.safety_selection(),
opened_record.writer().cloned(),
opened_record.active_watch().map(|aw| aw.id),
opened_record.active_watch().map(|aw| aw.watch_node.clone()),
opened_record
.active_watch()
.map(|aw| aw.watch_nodes.clone()),
)
};
@ -1038,6 +1041,12 @@ impl StorageManager {
subkeys
};
// Get cryptosystem
let crypto = self.crypto();
let Some(vcrypto) = crypto.get(key.kind) else {
apibail_generic!("unsupported cryptosystem");
};
let mut inner = self.inner.lock().await;
let safety_selection = {
let Some(opened_record) = inner.opened_records.get(&key) else {
@ -1122,7 +1131,7 @@ impl StorageManager {
{
assert_eq!(
result.inspect_result.subkeys.len() as u64,
result.fanout_results.len() as u64,
result.subkey_fanout_results.len() as u64,
"mismatch between subkeys returned and fanout results returned"
);
}
@ -1140,10 +1149,11 @@ impl StorageManager {
.inspect_result
.subkeys
.iter()
.zip(result.fanout_results.iter());
.zip(result.subkey_fanout_results.iter());
Self::process_fanout_results_inner(
&mut inner,
&vcrypto,
key,
results_iter,
false,
@ -1215,7 +1225,7 @@ impl StorageManager {
let get_consensus = self
.config()
.with(|c| c.network.dht.get_value_count as usize);
let value_node_count = fanout_result.value_nodes.len();
let value_node_count = fanout_result.consensus_nodes.len();
if value_node_count < get_consensus {
veilid_log!(self debug "timeout with insufficient consensus ({}<{}), adding offline subkey: {}:{}",
value_node_count, get_consensus,
@ -1232,7 +1242,7 @@ impl StorageManager {
let get_consensus = self
.config()
.with(|c| c.network.dht.get_value_count as usize);
let value_node_count = fanout_result.value_nodes.len();
let value_node_count = fanout_result.consensus_nodes.len();
if value_node_count < get_consensus {
veilid_log!(self debug "exhausted with insufficient consensus ({}<{}), adding offline subkey: {}:{}",
value_node_count, get_consensus,
@ -1543,6 +1553,7 @@ impl StorageManager {
I: IntoIterator<Item = (ValueSubkey, &'a FanoutResult)>,
>(
inner: &mut StorageManagerInner,
vcrypto: &CryptoSystemGuard<'_>,
key: TypedKey,
subkey_results_iter: I,
is_set: bool,
@ -1576,7 +1587,17 @@ impl StorageManager {
.iter()
.map(|kv| (*kv.0, kv.1.last_seen))
.collect::<Vec<_>>();
nodes_ts.sort_by(|a, b| b.1.cmp(&a.1));
nodes_ts.sort_by(|a, b| {
// Timestamp is first metric
let res = b.1.cmp(&a.1);
if res != cmp::Ordering::Equal {
return res;
}
// Distance is the next metric, closer nodes first
let da = vcrypto.distance(&a.0, &key.value);
let db = vcrypto.distance(&b.0, &key.value);
da.cmp(&db)
});
for dead_node_key in nodes_ts.iter().skip(consensus_count) {
d.nodes.remove(&dead_node_key.0);

View File

@ -1,19 +1,40 @@
use super::*;
#[derive(Clone, Debug)]
pub(in crate::storage_manager) struct ActiveWatch {
pub(in crate::storage_manager) struct PerNodeActiveWatch {
/// The watch id returned from the watch node
pub id: u64,
/// The expiration of a successful watch
pub expiration_ts: Timestamp,
/// Which node accepted the watch
pub watch_node: NodeRef,
/// How many value change notifications are left
pub count: u32,
}
/// Requested parameters for watch
#[derive(Clone, Debug)]
pub(in crate::storage_manager) struct ActiveWatchParameters {
/// Requested expiration timestamp
pub expiration_ts: Timestamp,
/// How many notifications the requestor asked for
pub count: u32,
/// Subkeys requested for this watch
pub subkeys: ValueSubkeyRangeSet,
}
#[derive(Clone, Debug)]
pub(in crate::storage_manager) struct ActiveWatch {
/// Requested parameters
pub params: ActiveWatchParameters,
/// Active watches per node
pub per_node: Vec<PerNodeActiveWatch>,
/// Minimum expiration time for all our nodes
pub min_expiration_ts: Timestamp,
/// How many value change updates remain
pub remaining_count: u32,
/// Which private route is responsible for receiving ValueChanged notifications
pub opt_value_changed_route: Option<PublicKey>,
/// Which subkeys we are watching
pub subkeys: ValueSubkeyRangeSet,
/// How many notifications are left
pub count: u32,
}
/// The state associated with a local record when it is opened
@ -28,7 +49,7 @@ pub(in crate::storage_manager) struct OpenedRecord {
/// The safety selection in current use
safety_selection: SafetySelection,
/// Active watch we have on this record
/// Active watches we have on this record
active_watch: Option<ActiveWatch>,
}
@ -55,15 +76,76 @@ impl OpenedRecord {
self.safety_selection = safety_selection;
}
pub fn set_active_watch(&mut self, active_watch: ActiveWatch) {
self.active_watch = Some(active_watch);
fn calculate_min_expiration_ts(per_node: &[PerNodeActiveWatch]) -> Option<Timestamp> {
per_node
.iter()
.map(|x| x.expiration_ts)
.reduce(|a, b| a.min(b))
}
pub fn new_active_watch(
&mut self,
params: ActiveWatchParameters,
opt_value_changed_route: Option<CryptoKey>,
per_node: Vec<PerNodeActiveWatch>,
) {
assert!(
self.active_watch.is_none(),
"should have cleared active watch first"
);
assert!(!per_node.is_empty(), "must have at least one watch node");
let min_expiration_ts = Self::calculate_min_expiration_ts(&per_node).unwrap();
let remaining_count = params.count;
self.active_watch = Some(ActiveWatch {
params,
per_node,
min_expiration_ts,
remaining_count,
opt_value_changed_route,
});
}
pub fn clear_active_watch(&mut self) {
self.active_watch = None;
}
pub fn active_watch(&self) -> Option<ActiveWatch> {
self.active_watch.clone()
pub fn active_watch(&self) -> Option<&ActiveWatch> {
self.active_watch.as_ref()
}
pub fn active_watch_mut(&mut self) -> Option<&mut ActiveWatch> {
self.active_watch.as_mut()
}
pub fn per_node_active_watch_by_id(&self, watch_id: u64) -> Option<&PerNodeActiveWatch> {
self.active_watch
.as_ref()
.map(|x| x.per_node.iter().find(|x| x.id == watch_id))
.flatten()
}
pub fn per_node_active_watch_by_id_mut(
&mut self,
watch_id: u64,
) -> Option<&mut PerNodeActiveWatch> {
self.active_watch
.as_mut()
.map(|x| x.per_node.iter_mut().find(|x| x.id == watch_id))
.flatten()
}
pub fn remove_per_node_active_watch_by_id(&mut self, watch_id: u64) {
let Some(active_watch) = self.active_watch.as_mut() else {
return;
};
let Some(n) = active_watch.per_node.iter().position(|x| x.id == watch_id) else {
return;
};
active_watch.per_node.remove(n);
active_watch.min_expiration_ts =
Self::calculate_min_expiration_ts(&active_watch.per_node).unwrap();
}
}

View File

@ -190,9 +190,8 @@ impl StorageManager {
match fanout_result.kind {
FanoutResultKind::Incomplete => {
// Send partial update if desired,
// if we've gotten at least one value node before consensus
if ctx.send_partial_update && fanout_result.value_nodes.len() >= 1 {
// Send partial update if desired, if we've gotten at least consensus node
if ctx.send_partial_update && fanout_result.consensus_nodes.len() >= 1 {
ctx.send_partial_update = false;
// Return partial result
@ -350,6 +349,12 @@ impl StorageManager {
safety_selection: SafetySelection,
result: set_value::OutboundSetValueResult,
) -> Result<Option<ValueData>, VeilidAPIError> {
// Get cryptosystem
let crypto = self.crypto();
let Some(vcrypto) = crypto.get(key.kind) else {
apibail_generic!("unsupported cryptosystem");
};
// Regain the lock after network access
let mut inner = self.inner.lock().await;
@ -363,6 +368,7 @@ impl StorageManager {
// Keep the list of nodes that returned a value for later reference
Self::process_fanout_results_inner(
&mut inner,
&vcrypto,
key,
core::iter::once((subkey, &result.fanout_result)),
true,

View File

@ -13,51 +13,48 @@ impl StorageManager {
let mut inner = self.inner.lock().await;
let routing_table = self.routing_table();
let update_callback = self.update_callback();
//let update_callback = self.update_callback();
let cur_ts = Timestamp::now();
for (k, v) in inner.opened_records.iter_mut() {
// If no active watch, then skip this
let Some(active_watch) = v.active_watch() else {
continue;
};
for active_watch in v.active_watches() {
// See if the active watch's node is dead
let mut is_dead = false;
if !active_watch.watch_node.state(cur_ts).is_alive() {
// Watched node is dead
is_dead = true;
}
// See if the active watch's node is dead
let mut is_dead = false;
if !active_watch.watch_node.state(cur_ts).is_alive() {
// Watched node is dead
is_dead = true;
}
// See if the private route we're using is dead
if !is_dead {
if let Some(value_changed_route) = active_watch.opt_value_changed_route {
if routing_table
.route_spec_store()
.get_route_id_for_key(&value_changed_route)
.is_none()
{
// Route we would receive value changes on is dead
is_dead = true;
// See if the private route we're using is dead
if !is_dead {
if let Some(value_changed_route) = active_watch.opt_value_changed_route {
if routing_table
.route_spec_store()
.get_route_id_for_key(&value_changed_route)
.is_none()
{
// Route we would receive value changes on is dead
is_dead = true;
}
}
}
}
// See if the watch is expired
if !is_dead && active_watch.expiration_ts <= cur_ts {
// Watch has expired
is_dead = true;
}
// See if the watch is expired
if !is_dead && active_watch.expiration_ts <= cur_ts {
// Watch has expired
is_dead = true;
}
if is_dead {
v.clear_active_watch();
if is_dead {
v.remove_active_watch(active_watch.id);
// Send valuechange with dead count and no subkeys
update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
key: *k,
subkeys: ValueSubkeyRangeSet::new(),
count: 0,
value: None,
})));
// // Send valuechange with dead count and no subkeys
// update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
// key: *k,
// subkeys: ValueSubkeyRangeSet::new(),
// count: 0,
// value: None,
// })));
}
}
}
}

View File

@ -218,8 +218,12 @@ impl StorageManager {
}
// Keep the list of nodes that returned a value for later reference
let crypto = self.crypto();
let vcrypto = crypto.get(result.key.kind).unwrap();
Self::process_fanout_results_inner(
&mut inner,
&vcrypto,
result.key,
result.fanout_results.iter().map(|x| (x.0, &x.1)),
true,

View File

@ -8,15 +8,21 @@ struct OutboundWatchValueContext {
pub opt_watch_value_result: Option<OutboundWatchValueResult>,
}
/// The record of a node accepting a watch
#[derive(Debug, Clone)]
pub(super) struct WatchNode {
pub watch_id: u64,
/// The node that
pub node_ref: NodeRef,
/// The expiration of a successful watch
pub expiration_ts: Timestamp,
}
/// The result of the outbound_watch_value operation
#[derive(Debug, Clone)]
pub(super) struct OutboundWatchValueResult {
/// The expiration of a successful watch
pub expiration_ts: Timestamp,
/// What watch id was returned
pub watch_id: u64,
/// Which node accepted the watch
pub watch_node: NodeRef,
/// Which nodes accepted the watch
pub watch_nodes: Vec<WatchNode>,
/// Which private route is responsible for receiving ValueChanged notifications
pub opt_value_changed_route: Option<PublicKey>,
}
@ -195,12 +201,13 @@ impl StorageManager {
let routing_domain = RoutingDomain::PublicInternet;
// Get the DHT parameters for 'WatchValue', some of which are the same for 'SetValue' operations
let (key_count, timeout_us, set_value_count) = self.config().with(|c| {
// Get the DHT parameters for 'WatchValue', some of which are the same for 'GetValue' operations
let (key_count, consensus_count, fanout, timeout_us) = self.config().with(|c| {
(
c.network.dht.max_find_node_count as usize,
TimestampDuration::from(ms_to_us(c.network.dht.set_value_timeout_ms)),
c.network.dht.set_value_count as usize,
c.network.dht.get_value_count as usize,
c.network.dht.get_value_fanout as usize,
TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)),
)
});
@ -301,16 +308,16 @@ impl StorageManager {
};
// Call the fanout
// Use a fixed fanout concurrency of 1 because we only want one watch
// Use a longer timeout (timeout_us * set_value_count) because we may need to try multiple nodes
// Use the same fanout parameters as a get
// and each one might take timeout_us time.
let routing_table = self.routing_table();
let fanout_call = FanoutCall::new(
&routing_table,
key,
key_count,
1,
TimestampDuration::new(timeout_us.as_u64() * (set_value_count as u64)),
fanout,
consensus_count,
timeout_us,
capability_fanout_node_info_filter(vec![CAP_DHT, CAP_DHT_WATCH]),
call_routine,
check_done,
@ -422,15 +429,10 @@ impl StorageManager {
};
// No active watch means no callback
let Some(mut active_watch) = opened_record.active_watch() else {
let Some(mut active_watch) = opened_record.active_watch_by_id(watch_id) else {
return Ok(NetworkResult::value(()));
};
// If the watch id doesn't match, then don't process this
if active_watch.id != watch_id {
return Ok(NetworkResult::value(()));
}
// If the reporting node is not the same as our watch, don't process the value change
if !active_watch
.watch_node
@ -445,11 +447,11 @@ impl StorageManager {
veilid_log!(self debug "watch count went backward: {}: {}/{}", key, count, active_watch.count);
// Force count to zero
count = 0;
opened_record.clear_active_watch();
opened_record.remove_active_watch(watch_id);
} else if count == 0 {
// If count is zero, we're done, cancel the watch and the app can renew it if it wants
veilid_log!(self debug "watch count finished: {}", key);
opened_record.clear_active_watch();
opened_record.clear_active_watches();
} else {
veilid_log!(self debug
"watch count decremented: {}: {}/{}",