valuechanged logic fix and logging improvements

This commit is contained in:
Christien Rioux 2025-04-14 19:01:04 -04:00
parent e2f750f207
commit 1b96a4e3ea
30 changed files with 537 additions and 151 deletions

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("rpc");
/// Where to send an RPC message
#[derive(Debug, Clone)]
pub(crate) enum Destination {

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("rpc");
impl RPCProcessor {
// Sends a high level app request and wait for response
// Can be sent via all methods including relays and routes

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("rpc");
impl RPCProcessor {
// Sends a high level app message
// Can be sent via all methods including relays and routes

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("rpc");
impl RPCProcessor {
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(super) async fn process_cancel_tunnel_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("rpc");
impl RPCProcessor {
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(super) async fn process_complete_tunnel_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("rpc");
impl RPCProcessor {
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(super) async fn process_find_block_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("rpc");
impl RPCProcessor {
/// Send FindNodeQ RPC request, receive FindNodeA answer
/// Can be sent via all methods including relays

View File

@ -1,6 +1,8 @@
use super::*;
use crate::storage_manager::{SignedValueData, SignedValueDescriptor};
impl_veilid_log_facility!("rpc");
#[derive(Clone, Debug)]
pub struct GetValueAnswer {
pub value: Option<SignedValueData>,
@ -78,7 +80,7 @@ impl RPCProcessor {
crypto_kind: vcrypto.kind(),
});
veilid_log!(self debug "{}", debug_string);
veilid_log!(self debug target: "dht", "{}", debug_string);
let waitable_reply = network_result_try!(
self.question(dest.clone(), question, Some(question_context))
@ -128,13 +130,13 @@ impl RPCProcessor {
dest
);
veilid_log!(self debug "{}", debug_string_answer);
veilid_log!(self debug target: "dht", "{}", debug_string_answer);
let peer_ids: Vec<String> = peers
.iter()
.filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string()))
.collect();
veilid_log!(self debug "Peers: {:#?}", peer_ids);
veilid_log!(self debug target: "dht", "Peers: {:#?}", peer_ids);
}
// Validate peers returned are, in fact, closer to the key than the node we sent this to
@ -228,7 +230,7 @@ impl RPCProcessor {
msg.header.direct_sender_node_id()
);
veilid_log!(self debug "{}", debug_string);
veilid_log!(self debug target: "dht", "{}", debug_string);
}
// See if we would have accepted this as a set
@ -278,7 +280,7 @@ impl RPCProcessor {
msg.header.direct_sender_node_id()
);
veilid_log!(self debug "{}", debug_string_answer);
veilid_log!(self debug target: "dht", "{}", debug_string_answer);
}
// Make GetValue answer

View File

@ -1,6 +1,8 @@
use super::*;
use crate::storage_manager::SignedValueDescriptor;
impl_veilid_log_facility!("rpc");
#[derive(Clone, Debug)]
pub struct InspectValueAnswer {
pub seqs: Vec<ValueSeqNum>,
@ -81,7 +83,7 @@ impl RPCProcessor {
crypto_kind: vcrypto.kind(),
});
veilid_log!(self debug "{}", debug_string);
veilid_log!(self debug target: "dht", "{}", debug_string);
let waitable_reply = network_result_try!(
self.question(dest.clone(), question, Some(question_context))
@ -118,13 +120,13 @@ impl RPCProcessor {
debug_seqs(&seqs)
);
veilid_log!(self debug "{}", debug_string_answer);
veilid_log!(self debug target: "dht", "{}", debug_string_answer);
let peer_ids: Vec<String> = peers
.iter()
.filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string()))
.collect();
veilid_log!(self debug "Peers: {:#?}", peer_ids);
veilid_log!(self debug target: "dht", "Peers: {:#?}", peer_ids);
}
// Validate peers returned are, in fact, closer to the key than the node we sent this to
@ -209,7 +211,7 @@ impl RPCProcessor {
msg.header.direct_sender_node_id()
);
veilid_log!(self debug "{}", debug_string);
veilid_log!(self debug target: "dht", "{}", debug_string);
}
// See if we would have accepted this as a set
@ -247,7 +249,7 @@ impl RPCProcessor {
msg.header.direct_sender_node_id()
);
veilid_log!(self debug "{}", debug_string_answer);
veilid_log!(self debug target: "dht", "{}", debug_string_answer);
}
// Make InspectValue answer

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("rpc");
impl RPCProcessor {
// Sends a unidirectional in-band return receipt
// Can be sent via all methods including relays and routes

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("rpc");
impl RPCProcessor {
#[instrument(level = "trace", target = "rpc", skip_all, err)]
async fn process_route_safety_route_hop(

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("rpc");
#[derive(Clone, Debug)]
pub struct SetValueAnswer {
pub set: bool,
@ -89,7 +91,7 @@ impl RPCProcessor {
});
if debug_target_enabled!("dht") {
veilid_log!(self debug "{}", debug_string);
veilid_log!(self debug target: "dht", "{}", debug_string);
}
let waitable_reply = network_result_try!(
@ -140,13 +142,13 @@ impl RPCProcessor {
dest,
);
veilid_log!(self debug "{}", debug_string_answer);
veilid_log!(self debug target: "dht", "{}", debug_string_answer);
let peer_ids: Vec<String> = peers
.iter()
.filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string()))
.collect();
veilid_log!(self debug "Peers: {:#?}", peer_ids);
veilid_log!(self debug target: "dht", "Peers: {:#?}", peer_ids);
}
// Validate peers returned are, in fact, closer to the key than the node we sent this to
@ -244,7 +246,7 @@ impl RPCProcessor {
msg.header.direct_sender_node_id()
);
veilid_log!(self debug "{}", debug_string);
veilid_log!(self debug target: "dht", "{}", debug_string);
// If there are less than 'set_value_count' peers that are closer, then store here too
let set_value_count = self
@ -296,7 +298,7 @@ impl RPCProcessor {
msg.header.direct_sender_node_id()
);
veilid_log!(self debug "{}", debug_string_answer);
veilid_log!(self debug target: "dht", "{}", debug_string_answer);
}
// Make SetValue answer

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("rpc");
impl RPCProcessor {
// Sends a unidirectional signal to a node
// Can be sent via relays but not routes. For routed 'signal' like capabilities, use AppMessage.

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("rpc");
impl RPCProcessor {
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(super) async fn process_start_tunnel_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("rpc");
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Default)]
pub struct StatusResult {
pub opt_sender_info: Option<SenderInfo>,

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("rpc");
impl RPCProcessor {
#[instrument(level = "trace", target = "rpc", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(super) async fn process_supply_block_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> {

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("rpc");
impl RPCProcessor {
// Can only be sent directly, not via relays or routes
#[instrument(level = "trace", target = "rpc", skip(self), ret, err)]

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("rpc");
impl RPCProcessor {
// Sends a dht value change notification
// Can be sent via all methods including relays and routes but never over a safety route
@ -88,7 +90,7 @@ impl RPCProcessor {
msg.header.direct_sender_node_id(),
);
veilid_log!(self debug "{}", debug_string_stmt);
veilid_log!(self debug target: "dht", "{}", debug_string_stmt);
}
// Save the subkey, creating a new record if necessary

View File

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("rpc");
#[derive(Clone, Debug)]
pub struct WatchValueAnswer {
pub accepted: bool,
@ -85,7 +87,7 @@ impl RPCProcessor {
RPCQuestionDetail::WatchValueQ(Box::new(watch_value_q)),
);
veilid_log!(self debug "{}", debug_string);
veilid_log!(self debug target: "dht", "{}", debug_string);
let waitable_reply =
network_result_try!(self.question(dest.clone(), question, None).await?);
@ -122,13 +124,13 @@ impl RPCProcessor {
dest
);
veilid_log!(self debug "{}", debug_string_answer);
veilid_log!(self debug target: "dht", "{}", debug_string_answer);
let peer_ids: Vec<String> = peers
.iter()
.filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string()))
.collect();
veilid_log!(self debug "Peers: {:#?}", peer_ids);
veilid_log!(self debug target: "dht", "Peers: {:#?}", peer_ids);
}
// Validate accepted requests
@ -249,7 +251,7 @@ impl RPCProcessor {
watcher
);
veilid_log!(self debug "{}", debug_string);
veilid_log!(self debug target: "dht", "{}", debug_string);
}
// Get the nodes that we know about that are closer to the the key than our own node
@ -308,7 +310,7 @@ impl RPCProcessor {
msg.header.direct_sender_node_id()
);
veilid_log!(self debug "{}", debug_string_answer);
veilid_log!(self debug target: "dht", "{}", debug_string_answer);
}
// Make WatchValue answer

View File

@ -3,6 +3,8 @@ use stop_token::future::FutureExt as _;
use super::*;
impl_veilid_log_facility!("rpc");
#[derive(Debug)]
pub(super) enum RPCWorkerRequestKind {
Message { message_encoded: MessageEncoded },

View File

@ -1032,6 +1032,8 @@ impl StorageManager {
// If we have no subkeys left, then set the count to zero to indicate a full cancellation
let count = if new_subkeys.is_empty() {
0
} else if let Some(state) = outbound_watch.state() {
state.remaining_count()
} else {
desired.count
};

View File

@ -18,6 +18,8 @@ pub(in crate::storage_manager) struct OutboundWatchManager {
pub outbound_watches: HashMap<TypedKey, OutboundWatch>,
/// Last known active watch per node+record
pub per_node_states: HashMap<PerNodeKey, PerNodeState>,
/// Value changed updates that need inpection to determine if they should be reported
pub needs_change_inspection: HashMap<TypedKey, ValueSubkeyRangeSet>,
}
impl fmt::Display for OutboundWatchManager {
@ -44,6 +46,21 @@ impl fmt::Display for OutboundWatchManager {
}
}
out += "]\n";
out += "needs_change_inspection: [\n";
{
let mut keys = self
.needs_change_inspection
.keys()
.copied()
.collect::<Vec<_>>();
keys.sort();
for k in keys {
let v = self.needs_change_inspection.get(&k).unwrap();
out += &format!(" {}: {}\n", k, v);
}
}
out += "]\n";
write!(f, "{}", out)
}
@ -60,6 +77,7 @@ impl OutboundWatchManager {
Self {
outbound_watches: HashMap::new(),
per_node_states: HashMap::new(),
needs_change_inspection: HashMap::new(),
}
}
@ -157,4 +175,12 @@ impl OutboundWatchManager {
self.per_node_states
.retain(|k, _| !finished_pnks.contains(k) && !expired_pnks.contains(k));
}
/// Set a record up to be inspected for changed subkeys
pub fn enqueue_change_inspect(&mut self, record_key: TypedKey, subkeys: ValueSubkeyRangeSet) {
self.needs_change_inspection
.entry(record_key)
.and_modify(|x| *x = x.union(&subkeys))
.or_insert(subkeys);
}
}

View File

@ -112,22 +112,25 @@ impl OutboundWatch {
}
// If there is no current watch then there is nothing to cancel
let Some(state) = self.state.as_ref() else {
let Some(_state) = self.state.as_ref() else {
return false;
};
// If the desired parameters is None then cancel
let Some(desired) = self.desired.as_ref() else {
let Some(_desired) = self.desired.as_ref() else {
return true;
};
// If the desired parameters is different than the current parameters
// then cancel so we can eventually reconcile to the new parameters
state.params() != desired
false
}
/// Returns true if this outbound watch can be renewed
pub fn needs_renew(&self, registry: &VeilidComponentRegistry, cur_ts: Timestamp) -> bool {
pub fn needs_renew(
&self,
registry: &VeilidComponentRegistry,
consensus_count: usize,
cur_ts: Timestamp,
) -> bool {
if self.is_dead() || self.needs_cancel(registry) {
veilid_log!(registry warn "should have checked for is_dead and needs_cancel first");
return false;
@ -138,11 +141,84 @@ impl OutboundWatch {
return false;
};
// If the watch has per node watches that have expired,
// but we can extend our watch then renew. Do this only within RENEW_OUTBOUND_WATCHES_DURATION_SECS
// of the actual expiration. If we're looking at this after the actual expiration, don't try because
// the watch id will have died
// Should have desired parameters here
let Some(desired) = self.desired.as_ref() else {
veilid_log!(registry warn "needs_cancel should have returned true");
return false;
};
// If we have a consensus, we can avoid fanout by renewing rather than reconciling
// but if we don't have a consensus, we should defer to fanout to try to improve it
if state.nodes().len() < consensus_count {
return false;
}
// If we have a consensus but need to renew because some per-node watches
// either expired or had their routes die, do it
if self.wants_per_node_watch_update(registry, state, cur_ts) {
return true;
}
// If the desired parameters have changed, then we should renew with them
state.params() != desired
}
/// Returns true if there is work to be done on getting the outbound
/// watch to its desired state
pub fn needs_reconcile(
&self,
registry: &VeilidComponentRegistry,
consensus_count: usize,
cur_ts: Timestamp,
) -> bool {
if self.is_dead()
|| self.needs_cancel(registry)
|| self.needs_renew(registry, consensus_count, cur_ts)
{
veilid_log!(registry warn "should have checked for is_dead, needs_cancel, needs_renew first");
return false;
}
// If desired is none, then is_dead() or needs_cancel() should have been true
let Some(desired) = self.desired.as_ref() else {
veilid_log!(registry warn "is_dead() or needs_cancel() should have been true");
return false;
};
// If there is a desired watch but no current state, then reconcile
let Some(state) = self.state() else {
return true;
};
// If we are still working on getting the 'current' state to match
// the 'desired' state, then do the reconcile if we are within the timeframe for it
if state.nodes().len() < consensus_count
&& cur_ts >= state.next_reconcile_ts().unwrap_or_default()
{
return true;
}
// If we have a consensus, or are not attempting consensus at this time,
// but need to reconcile because some per-node watches either expired or had their routes die, do it
if self.wants_per_node_watch_update(registry, state, cur_ts) {
return true;
}
// If the desired parameters have changed, then we should reconcile with them
state.params() != desired
}
/// Returns true if we need to update our per-node watches due to expiration,
/// or if they are all dead because the route died and needs to be updated
fn wants_per_node_watch_update(
&self,
registry: &VeilidComponentRegistry,
state: &OutboundWatchState,
cur_ts: Timestamp,
) -> bool {
// If the watch has per node watches that have expired, but we can extend our watch then renew.
// Do this only within RENEW_OUTBOUND_WATCHES_DURATION_SECS of the actual expiration.
// If we're looking at this after the actual expiration, don't try because the watch id will have died.
let renew_ts = cur_ts + TimestampDuration::new_secs(RENEW_OUTBOUND_WATCHES_DURATION_SECS);
if renew_ts >= state.min_expiration_ts()
&& cur_ts < state.min_expiration_ts()
@ -166,44 +242,4 @@ impl OutboundWatch {
false
}
/// Returns true if there is work to be done on getting the outbound
/// watch to its desired state
pub fn needs_reconcile(
&self,
registry: &VeilidComponentRegistry,
consensus_count: usize,
cur_ts: Timestamp,
) -> bool {
if self.is_dead() || self.needs_cancel(registry) || self.needs_renew(registry, cur_ts) {
veilid_log!(registry warn "should have checked for is_dead, needs_cancel, needs_renew first");
return false;
}
// If desired is none, then is_dead() or needs_cancel() should have been true
let Some(desired) = self.desired.as_ref() else {
veilid_log!(registry warn "is_dead() or needs_cancel() should have been true");
return false;
};
// If there is a desired watch but no current state, then reconcile
let Some(state) = self.state() else {
return true;
};
// If the params are different, then needs_cancel() should have returned true
if state.params() != desired {
veilid_log!(registry warn "needs_cancel() should have returned true");
return false;
}
// If we are still working on getting the 'current' state to match
// the 'desired' state, then do the reconcile if we are within the timeframe for it
if state.nodes().len() < consensus_count
&& cur_ts >= state.next_reconcile_ts().unwrap_or_default()
{
return true;
}
// No work to do on this watch at this time
false
}
}

View File

@ -52,7 +52,6 @@ pub(in crate::storage_manager) struct OutboundWatchStateEditor<'a> {
}
impl OutboundWatchStateEditor<'_> {
#[expect(dead_code)]
pub fn set_params(&mut self, params: OutboundWatchParameters) {
self.state.params = params;
}
@ -108,6 +107,24 @@ impl OutboundWatchState {
&self.value_changed_routes
}
/// Get the parameters we use if we're updating this state's per node watches
pub fn get_per_node_params(
&self,
desired: &OutboundWatchParameters,
) -> OutboundWatchParameters {
// Change the params to update count
if self.params() != desired {
// If parameters are changing, just use the desired parameters
desired.clone()
} else {
// If this is a renewal of the same parameters,
// use the current remaining update count for the rpc
let mut renew_params = desired.clone();
renew_params.count = self.remaining_count();
renew_params
}
}
pub fn edit<R, F: FnOnce(&mut OutboundWatchStateEditor) -> R>(
&mut self,
per_node_state: &HashMap<PerNodeKey, PerNodeState>,

View File

@ -25,6 +25,13 @@ impl StorageManager {
};
}
// Iterate all queued change inspections and do them
for (k, v) in inner.outbound_watch_manager.needs_change_inspection.drain() {
// Get next work on watch and queue it if we have something to do
let op_fut = self.get_change_inspection_operation(k, v);
self.background_operation_processor.add_future(op_fut);
}
Ok(())
}
}

View File

@ -34,6 +34,14 @@ pub(super) struct OutboundWatchValueResult {
pub ignored: Vec<NodeRef>,
}
impl OutboundWatchValueResult {
pub fn merge(&mut self, other: OutboundWatchValueResult) {
self.accepted.extend(other.accepted);
self.rejected.extend(other.rejected);
self.ignored.extend(other.ignored);
}
}
impl StorageManager {
/// Perform a 'watch value cancel' on the network without fanout
#[instrument(level = "trace", target = "dht", skip_all, err)]
@ -365,6 +373,7 @@ impl StorageManager {
}
// Send valuechange with dead count and no subkeys to inform the api that this watch is now gone completely
drop(watch_lock);
self.update_callback_value_change(record_key, ValueSubkeyRangeSet::new(), 0, None);
}
@ -497,7 +506,7 @@ impl StorageManager {
return;
}
let (per_node_states, renew_params) = {
let (per_node_states, per_node_params) = {
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_manager
@ -507,10 +516,17 @@ impl StorageManager {
veilid_log!(self warn "watch being renewed should have still been in the table");
return;
};
let Some(desired) = outbound_watch.desired() else {
veilid_log!(self warn "watch being renewed should have desired parameters");
return;
};
let Some(state) = outbound_watch.state_mut() else {
veilid_log!(self warn "watch being renewed should have current state");
return;
};
let mut per_node_states = vec![];
let mut missing_pnks = BTreeSet::new();
for pnk in state.nodes() {
@ -530,17 +546,15 @@ impl StorageManager {
editor.retain_nodes(|x| !missing_pnks.contains(x));
});
// Change the params to update count
let mut renew_params = state.params().clone();
renew_params.count = state.remaining_count();
let per_node_params = state.get_per_node_params(&desired);
(per_node_states, renew_params)
(per_node_states, per_node_params)
};
// Now reach out to each node and renew their watches
let mut unord = FuturesUnordered::new();
for (_pnk, pns) in per_node_states {
let params = renew_params.clone();
let params = per_node_params.clone();
let watch_lock = watch_lock.clone();
unord.push(async move {
self.outbound_watch_value_change(
@ -552,27 +566,34 @@ impl StorageManager {
.await
});
}
let mut owvresults = vec![];
// Process and merge all results since we're not fanning out
let mut opt_owvresult: Option<OutboundWatchValueResult> = None;
while let Some(res) = unord.next().await {
match res {
Ok(r) => owvresults.push(r),
Ok(r) => {
opt_owvresult = match opt_owvresult {
Some(mut owvresult) => {
owvresult.merge(r);
Some(owvresult)
}
None => Some(r),
};
}
Err(e) => {
veilid_log!(self debug "outbound watch change error: {}", e);
}
}
}
// Update state
{
// Update state with merged results if we have them
if let Some(owvresult) = opt_owvresult {
let inner = &mut *self.inner.lock().await;
for owvresult in owvresults {
self.process_outbound_watch_value_result_inner(inner, record_key, owvresult);
}
self.process_outbound_watch_value_result_inner(inner, record_key, owvresult);
}
}
/// Perform fanout to add per-node watches to an outbound watch
/// Must have no current state, or have a match to desired parameters
/// Perform fanout to add or update per-node watches to an outbound watch
pub(super) async fn process_outbound_watch_reconcile(
&self,
watch_lock: AsyncTagLockGuard<TypedKey>,
@ -586,7 +607,7 @@ impl StorageManager {
// Get the nodes already active on this watch,
// and the parameters to fanout with for the rest
let (per_node_state, reconcile_params) = {
let (per_node_state, per_node_params) = {
let inner = &mut *self.inner.lock().await;
let Some(outbound_watch) = inner
.outbound_watch_manager
@ -604,13 +625,9 @@ impl StorageManager {
};
// Get active per node states
let mut per_node_state = if let Some(state) = outbound_watch.state() {
// Assert matching parameters
if state.params() != &desired {
veilid_log!(self warn "watch being reconciled should have had matching current and desired parameters");
return;
}
state
let (mut per_node_state, per_node_params) = if let Some(state) = outbound_watch.state()
{
let per_node_state = state
.nodes()
.iter()
.map(|pnk| {
@ -624,9 +641,12 @@ impl StorageManager {
.unwrap(),
)
})
.collect()
.collect();
let per_node_params = state.get_per_node_params(&desired);
(per_node_state, per_node_params)
} else {
HashMap::new()
(HashMap::new(), desired)
};
// Add in any inactive per node states
@ -641,15 +661,13 @@ impl StorageManager {
}
}
let reconcile_params = desired.clone();
(per_node_state, reconcile_params)
(per_node_state, per_node_params)
};
// Now fan out with parameters and get new per node watches
let cur_ts = Timestamp::now();
let res = self
.outbound_watch_value(watch_lock.clone(), reconcile_params, per_node_state)
.outbound_watch_value(watch_lock.clone(), per_node_params, per_node_state)
.await;
{
@ -762,6 +780,7 @@ impl StorageManager {
}
state.edit(&inner.outbound_watch_manager.per_node_states, |editor| {
editor.set_params(desired);
editor.retain_nodes(|x| !remove_nodes.contains(x));
editor.add_nodes(added_nodes);
});
@ -816,7 +835,7 @@ impl StorageManager {
}
};
return Some(pin_dyn_future!(fut));
} else if outbound_watch.needs_renew(&registry, cur_ts) {
} else if outbound_watch.needs_renew(&registry, consensus_count, cur_ts) {
// Outbound watch expired but can be renewed
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
@ -850,6 +869,101 @@ impl StorageManager {
None
}
/// Perform an inspection of the record's subkeys to see if we have the latest data
/// If not, then get the first changed subkey and post a ValueChanged update about it
/// Can be processed in the foreground, or by the background operation queue
pub(super) fn get_change_inspection_operation(
&self,
record_key: TypedKey,
subkeys: ValueSubkeyRangeSet,
) -> PinBoxFutureStatic<()> {
let fut = {
let registry = self.registry();
async move {
let this = registry.storage_manager();
let report = match this
.inspect_record(record_key, subkeys.clone(), DHTReportScope::SyncGet)
.await
{
Ok(v) => v,
Err(e) => {
veilid_log!(this debug "Failed to inspect record for changes: {}", e);
return;
}
};
let mut changed_subkeys = report.changed_subkeys();
// Get changed first changed subkey until we find one to report
let mut n = 0;
while !changed_subkeys.is_empty() {
let first_changed_subkey = changed_subkeys.first().unwrap();
let value = match this.get_value(record_key, first_changed_subkey, true).await {
Ok(v) => v,
Err(e) => {
veilid_log!(this debug "Failed to get changed record: {}", e);
return;
}
};
if let Some(value) = value {
if value.seq() > report.local_seqs()[n] {
// Calculate the update
let (changed_subkeys, remaining_count, value) = {
let _watch_lock =
this.outbound_watch_lock_table.lock_tag(record_key).await;
let inner = &mut *this.inner.lock().await;
// Get the outbound watch
let Some(outbound_watch) = inner
.outbound_watch_manager
.outbound_watches
.get_mut(&record_key)
else {
// No outbound watch means no callback
return;
};
let Some(state) = outbound_watch.state_mut() else {
// No outbound watch current state means no callback
return;
};
// the remaining updates count
let remaining_count = state.remaining_count().saturating_sub(1);
state.edit(
&inner.outbound_watch_manager.per_node_states,
|editor| {
editor.set_remaining_count(remaining_count);
},
);
(changed_subkeys, remaining_count, value)
};
// Send the update
this.update_callback_value_change(
record_key,
changed_subkeys,
remaining_count,
Some(value),
);
// Update was sent, we're done
return;
}
}
// If we didn't send an update, remove the first changed subkey and try again
changed_subkeys.pop_first();
n += 1;
}
}
};
pin_dyn_future!(fut)
}
/// Handle a received 'Watch Value' query
#[allow(clippy::too_many_arguments)]
#[instrument(level = "trace", target = "dht", skip_all)]
@ -897,20 +1011,20 @@ impl StorageManager {
pub async fn inbound_value_changed(
&self,
record_key: TypedKey,
subkeys: ValueSubkeyRangeSet,
mut subkeys: ValueSubkeyRangeSet,
count: u32,
value: Option<Arc<SignedValueData>>,
inbound_node_id: TypedKey,
watch_id: u64,
) -> VeilidAPIResult<NetworkResult<()>> {
// Operate on the watch for this record
let _watch_lock = self.outbound_watch_lock_table.lock_tag(record_key).await;
let watch_lock = self.outbound_watch_lock_table.lock_tag(record_key).await;
// Update local record store with new value
let (is_value_seq_newer, value, remaining_count) = {
let (report_value_change, value, remaining_count, reportable_subkeys) = {
let inner = &mut *self.inner.lock().await;
{
let watched_subkeys = {
// Get the outbound watch
let Some(outbound_watch) = inner
.outbound_watch_manager
@ -922,7 +1036,12 @@ impl StorageManager {
};
let Some(state) = outbound_watch.state() else {
// No outbound watch current state means no callback
// No outbound watch current state means no callback (we haven't reconciled yet)
return Ok(NetworkResult::value(()));
};
let Some(desired) = outbound_watch.desired() else {
// No outbound watch desired state means no callback (we are cancelling)
return Ok(NetworkResult::value(()));
};
@ -947,7 +1066,7 @@ impl StorageManager {
// If watch id doesn't match it's for an older watch and should be ignored
if per_node_state.watch_id != watch_id {
// No per node state means no callback
veilid_log!(self warn "incorred watch id for per node state in outbound watch: {:?} {} != {}", pnk, per_node_state.watch_id, watch_id);
veilid_log!(self warn "incorrect watch id for per node state in outbound watch: {:?} {} != {}", pnk, per_node_state.watch_id, watch_id);
return Ok(NetworkResult::value(()));
}
@ -991,18 +1110,22 @@ impl StorageManager {
);
per_node_state.count = count;
}
}
desired.subkeys
};
// No subkeys means remote node cancelled, but we already captured that with the
// assignment of 'count' to the per_node_state above, so we can just jump out here
let Some(mut first_subkey) = subkeys.first() else {
return Ok(NetworkResult::value(()));
};
// Null out default value
let value = value.filter(|value| *value.value_data() != ValueData::default());
// Set the local value
let mut is_value_seq_newer = false;
let mut report_value_change = false;
if let Some(value) = &value {
let Some(first_subkey) = subkeys.first() else {
apibail_internal!("should not have value without first subkey");
};
let last_get_result =
Self::handle_get_local_value_inner(inner, record_key, first_subkey, true)
.await?;
@ -1025,14 +1148,25 @@ impl StorageManager {
}
// Make sure this value would actually be newer
is_value_seq_newer = true;
report_value_change = true;
if let Some(last_value) = &last_get_result.opt_value {
if value.value_data().seq() <= last_value.value_data().seq() {
// inbound value is older than or equal to the sequence number that we have, just return the one we have
is_value_seq_newer = false;
// inbound value is older than or equal to the sequence number that we have
// so we're not going to report this
report_value_change = false;
// Shrink up the subkey range because we're removing the first value from the things we'd possibly report on
subkeys.pop_first().unwrap();
if subkeys.is_empty() {
// If there's nothing left to report, just return no
return Ok(NetworkResult::value(()));
}
first_subkey = subkeys.first().unwrap();
}
}
if is_value_seq_newer {
// Keep the value because it is newer than the one we have
if report_value_change {
Self::handle_set_local_value_inner(
inner,
record_key,
@ -1044,38 +1178,71 @@ impl StorageManager {
}
}
// If we got an actual update, decrement the total remaining watch count
// If our watched subkey range differs from the reported change's range
// we should only report changes that we care about
let reportable_subkeys = subkeys.intersect(&watched_subkeys);
if let Some(first_reportable_subkey) = reportable_subkeys.first() {
if first_reportable_subkey != first_subkey {
report_value_change = false;
}
} else {
report_value_change = false;
}
// Get the outbound watch
let outbound_watch = inner
let Some(outbound_watch) = inner
.outbound_watch_manager
.outbound_watches
.get_mut(&record_key)
.unwrap();
else {
// No outbound watch means no callback
return Ok(NetworkResult::value(()));
};
let state = outbound_watch.state_mut().unwrap();
let Some(state) = outbound_watch.state_mut() else {
// No outbound watch current state means no callback
return Ok(NetworkResult::value(()));
};
if is_value_seq_newer {
// If we're going to report, update the remaining change count
if report_value_change {
let remaining_count = state.remaining_count().saturating_sub(1);
state.edit(&inner.outbound_watch_manager.per_node_states, |editor| {
editor.set_remaining_count(remaining_count);
});
}
(is_value_seq_newer, value, state.remaining_count())
(
report_value_change,
value,
state.remaining_count(),
reportable_subkeys,
)
};
drop(watch_lock);
// Announce ValueChanged VeilidUpdate
// * if the value in the update had a newer sequence number
// * if more than a single subkey has changed
// * cancellations (count=0) are sent by process_outbound_watch_dead(), not here
let do_update = is_value_seq_newer || subkeys.len() > 1;
if do_update {
let value = if is_value_seq_newer {
Some(value.unwrap().value_data().clone())
} else {
None
};
self.update_callback_value_change(record_key, subkeys, remaining_count, value);
// Cancellations (count=0) are sent by process_outbound_watch_dead(), not here
if report_value_change {
// We have a value with a newer sequence number to report
let value = value.unwrap().value_data().clone();
self.update_callback_value_change(
record_key,
reportable_subkeys,
remaining_count,
Some(value),
);
} else if reportable_subkeys.len() > 0 {
// We have subkeys that have be reported as possibly changed
// but not a specific record reported, so we should defer reporting and
// inspect the range to see what changed
// Queue this up for inspection
let inner = &mut *self.inner.lock().await;
inner
.outbound_watch_manager
.enqueue_change_inspect(record_key, reportable_subkeys);
}
Ok(NetworkResult::value(()))

View File

@ -1494,6 +1494,11 @@ impl VeilidAPI {
let registry = self.core_context()?.registry();
let storage_manager = registry.storage_manager();
self.with_debug_cache(|dc| {
dc.opened_record_contexts.clear();
});
storage_manager.close_all_records().await?;
let scope = get_debug_argument_at(&args, 1, "debug_record_purge", "scope", get_string)?;
let bytes = get_debug_argument_at(&args, 2, "debug_record_purge", "bytes", get_number).ok();
let out = match scope.as_str() {

View File

@ -51,6 +51,20 @@ impl DHTRecordReport {
pub fn network_seqs(&self) -> &[ValueSeqNum] {
&self.network_seqs
}
pub fn changed_subkeys(&self) -> ValueSubkeyRangeSet {
let mut changed = ValueSubkeyRangeSet::new();
for ((sk, lseq), nseq) in self
.subkeys
.iter()
.zip(self.local_seqs.iter())
.zip(self.network_seqs.iter())
{
if nseq > lseq {
changed.insert(sk);
}
}
changed
}
}
impl fmt::Debug for DHTRecordReport {

View File

@ -1,5 +1,4 @@
# Routing context veilid tests
from typing import Any, Awaitable, Callable, Optional
import pytest
import asyncio
@ -8,6 +7,7 @@ import os
import veilid
from veilid import ValueSubkey, Timestamp, SafetySelection
from veilid.types import VeilidJSONEncoder
##################################################################
BOGUS_KEY = veilid.TypedKey.from_value(
@ -275,6 +275,13 @@ async def test_watch_dht_values():
await api1.debug("record purge local")
await api1.debug("record purge remote")
# Clear the change queue if record purge cancels old watches
while True:
try:
upd = await asyncio.wait_for(value_change_queue.get(), timeout=3)
except asyncio.TimeoutError:
break
# make routing contexts
rc0 = await api0.new_routing_context()
rc1 = await api1.new_routing_context()
@ -287,6 +294,8 @@ async def test_watch_dht_values():
vd = await rc0.set_dht_value(rec0.key, ValueSubkey(3), b"BLAH")
assert vd is None
await sync(rc0, [rec0])
# Server 0: Make a watch on all the subkeys
ts = await rc0.watch_dht_values(rec0.key, [], Timestamp(0), 0xFFFFFFFF)
assert ts != 0
@ -297,18 +306,20 @@ async def test_watch_dht_values():
# Server 1: Now set the subkey and trigger an update
vd = await rc1.set_dht_value(rec1.key, ValueSubkey(3), b"BLAH")
assert vd is None
await sync(rc1, [rec1])
# Server 0: Now we should NOT get an update because the update is the same as our local copy
update = None
upd = None
try:
update = await asyncio.wait_for(value_change_queue.get(), timeout=10)
upd = await asyncio.wait_for(value_change_queue.get(), timeout=10)
except asyncio.TimeoutError:
pass
assert update is None
assert upd is None
# Server 1: Now set subkey and trigger an update
vd = await rc1.set_dht_value(rec1.key, ValueSubkey(3), b"BLAH BLAH")
assert vd is None
await sync(rc1, [rec1])
# Server 0: Wait for the update
upd = await asyncio.wait_for(value_change_queue.get(), timeout=10)
@ -322,6 +333,7 @@ async def test_watch_dht_values():
# Server 1: Now set subkey and trigger an update
vd = await rc1.set_dht_value(rec1.key, ValueSubkey(4), b"BZORT")
assert vd is None
await sync(rc1, [rec1])
# Server 0: Wait for the update
upd = await asyncio.wait_for(value_change_queue.get(), timeout=10)
@ -339,6 +351,7 @@ async def test_watch_dht_values():
# Server 1: Now set multiple subkeys and trigger an update
vd = await asyncio.gather(*[rc1.set_dht_value(rec1.key, ValueSubkey(3), b"BLAH BLAH BLAH"), rc1.set_dht_value(rec1.key, ValueSubkey(4), b"BZORT BZORT")])
assert vd == [None, None]
await sync(rc1, [rec1])
# Server 0: Wait for the update
upd = await asyncio.wait_for(value_change_queue.get(), timeout=10)
@ -350,28 +363,42 @@ async def test_watch_dht_values():
assert upd.detail.value.data == b"BZORT BZORT"
# Server 0: Now we should NOT get any other update
update = None
upd = None
try:
update = await asyncio.wait_for(value_change_queue.get(), timeout=10)
upd = await asyncio.wait_for(value_change_queue.get(), timeout=10)
except asyncio.TimeoutError:
pass
assert update is None
if upd is not None:
print(f"bad update: {VeilidJSONEncoder.dumps(upd)}")
assert upd is None
# Now cancel the update
still_active = await rc0.cancel_dht_watch(rec0.key, [(ValueSubkey(3), ValueSubkey(9))])
assert not still_active
# Server 0: Wait for the cancellation update
upd = await asyncio.wait_for(value_change_queue.get(), timeout=10)
# Server 0: Verify only one update came back
assert upd.detail.key == rec0.key
assert upd.detail.count == 0
assert upd.detail.subkeys == []
assert upd.detail.value is None
# Now set multiple subkeys
vd = await asyncio.gather(*[rc1.set_dht_value(rec1.key, ValueSubkey(3), b"BLAH BLAH BLAH BLAH"), rc1.set_dht_value(rec1.key, ValueSubkey(5), b"BZORT BZORT BZORT")])
assert vd == [None, None]
await sync(rc1, [rec1])
# Now we should NOT get an update
update = None
upd = None
try:
update = await asyncio.wait_for(value_change_queue.get(), timeout=10)
upd = await asyncio.wait_for(value_change_queue.get(), timeout=10)
except asyncio.TimeoutError:
pass
assert update is None
if upd is not None:
print(f"bad update: {VeilidJSONEncoder.dumps(upd)}")
assert upd is None
# Clean up
await rc1.close_dht_record(rec1.key)
@ -380,7 +407,6 @@ async def test_watch_dht_values():
await rc0.delete_dht_record(rec0.key)
@pytest.mark.skipif(os.getenv("INTEGRATION") != "1", reason="integration test requires two servers running")
@pytest.mark.skipif(os.getenv("STRESS") != "1", reason="stress test takes a long time")
@pytest.mark.asyncio
@ -817,21 +843,23 @@ async def test_dht_write_read_full_subkeys_local():
async def sync(rc: veilid.RoutingContext, records: list[veilid.DHTRecordDescriptor]):
print('syncing records to the network')
syncrecords = records.copy()
while len(syncrecords) > 0:
if len(syncrecords) == 0:
return
while True:
donerecords = set()
subkeysleft = 0
for desc in records:
rr = await rc.inspect_dht_record(desc.key, [])
left = 0; [left := left + (x[1]-x[0]+1) for x in rr.offline_subkeys]
if left == 0:
if veilid.ValueSeqNum.NONE not in rr.local_seqs:
donerecords.add(desc)
donerecords.add(desc)
else:
subkeysleft += left
syncrecords = [x for x in syncrecords if x not in donerecords]
print(f' {len(syncrecords)} records {subkeysleft} subkeys left')
if len(syncrecords) == 0:
break
print(f' syncing {len(syncrecords)} records {subkeysleft} subkeys left')
time.sleep(1)

View File

@ -59,6 +59,8 @@ class VeilidStateAttachment:
j["attached_uptime"],
)
def to_json(self) -> dict:
return self.__dict__
class AnswerStats:
@ -114,6 +116,9 @@ class AnswerStats:
j["consecutive_lost_answers_minimum"],
)
def to_json(self) -> dict:
return self.__dict__
class RPCStats:
messages_sent: int
messages_rcvd: int
@ -172,6 +177,9 @@ class RPCStats:
AnswerStats.from_json(j["answer_ordered"]),
)
def to_json(self) -> dict:
return self.__dict__
class LatencyStats:
fastest: TimestampDuration
@ -213,6 +221,9 @@ class LatencyStats:
TimestampDuration(j["p75"]),
)
def to_json(self) -> dict:
return self.__dict__
class TransferStats:
total: ByteCount
@ -365,6 +376,9 @@ class PeerStats:
StateStats.from_json(j["state"]),
)
def to_json(self) -> dict:
return self.__dict__
class PeerTableData:
node_ids: list[str]
@ -381,6 +395,9 @@ class PeerTableData:
"""JSON object hook"""
return cls(j["node_ids"], j["peer_address"], PeerStats.from_json(j["peer_stats"]))
def to_json(self) -> dict:
return self.__dict__
class VeilidStateNetwork:
started: bool
@ -410,6 +427,9 @@ class VeilidStateNetwork:
[PeerTableData.from_json(peer) for peer in j["peers"]],
)
def to_json(self) -> dict:
return self.__dict__
class VeilidStateConfig:
config: VeilidConfig
@ -422,6 +442,9 @@ class VeilidStateConfig:
"""JSON object hook"""
return cls(VeilidConfig.from_json(j["config"]))
def to_json(self) -> dict:
return self.__dict__
class VeilidState:
attachment: VeilidStateAttachment
@ -447,6 +470,9 @@ class VeilidState:
VeilidStateConfig.from_json(j["config"]),
)
def to_json(self) -> dict:
return self.__dict__
class VeilidLog:
log_level: VeilidLogLevel
@ -463,6 +489,9 @@ class VeilidLog:
"""JSON object hook"""
return cls(VeilidLogLevel(j["log_level"]), j["message"], j["backtrace"])
def to_json(self) -> dict:
return self.__dict__
class VeilidAppMessage:
sender: Optional[TypedKey]
@ -483,6 +512,9 @@ class VeilidAppMessage:
urlsafe_b64decode_no_pad(j["message"]),
)
def to_json(self) -> dict:
return self.__dict__
class VeilidAppCall:
sender: Optional[TypedKey]
@ -506,6 +538,9 @@ class VeilidAppCall:
OperationId(j["call_id"]),
)
def to_json(self) -> dict:
return self.__dict__
class VeilidRouteChange:
dead_routes: list[RouteId]
@ -523,6 +558,9 @@ class VeilidRouteChange:
[RouteId(route) for route in j["dead_remote_routes"]],
)
def to_json(self) -> dict:
return self.__dict__
class VeilidValueChange:
key: TypedKey
@ -546,6 +584,9 @@ class VeilidValueChange:
None if j["value"] is None else ValueData.from_json(j["value"]),
)
def to_json(self) -> dict:
return self.__dict__
class VeilidUpdateKind(StrEnum):
LOG = "Log"
@ -610,3 +651,6 @@ class VeilidUpdate:
case _:
raise ValueError("Unknown VeilidUpdateKind")
return cls(kind, detail)
def to_json(self) -> dict:
return self.__dict__