Local Rehydration

This commit is contained in:
Christien Rioux 2025-04-25 17:18:39 -04:00
parent b964d0db40
commit c194f61644
48 changed files with 10889 additions and 11940 deletions

View file

@ -1,5 +1,7 @@
use super::*;
impl_veilid_log_facility!("net");
impl NetworkManager {
// Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism)
#[instrument(level = "trace", target = "net", skip(self), ret, err)]
@ -16,6 +18,8 @@ impl NetworkManager {
.collect();
let json_bytes = serialize_json(bootstrap_peerinfo).as_bytes().to_vec();
veilid_log!(self trace "BOOT reponse: {}", String::from_utf8_lossy(&json_bytes));
// Reply with a chunk of signed routing table
let net = self.net();
match pin_future_closure!(net.send_data_to_existing_flow(flow, json_bytes)).await? {

View file

@ -899,44 +899,49 @@ impl RoutingTable {
return false;
}
// does it have some dial info we need?
let filter = |n: &NodeInfo| {
let mut keep = false;
// Bootstraps must have -only- inbound capable network class
if !matches!(n.network_class(), NetworkClass::InboundCapable) {
// Only nodes with direct publicinternet node info
let Some(signed_node_info) = e.signed_node_info(RoutingDomain::PublicInternet)
else {
return false;
};
let SignedNodeInfo::Direct(signed_direct_node_info) = signed_node_info else {
return false;
};
let node_info = signed_direct_node_info.node_info();
// Bootstraps must have -only- inbound capable network class
if !matches!(node_info.network_class(), NetworkClass::InboundCapable) {
return false;
}
// Check for direct dialinfo and a good mix of protocol and address types
let mut keep = false;
for did in node_info.dial_info_detail_list() {
// Bootstraps must have -only- direct dial info
if !matches!(did.class, DialInfoClass::Direct) {
return false;
}
for did in n.dial_info_detail_list() {
// Bootstraps must have -only- direct dial info
if !matches!(did.class, DialInfoClass::Direct) {
return false;
}
if matches!(did.dial_info.address_type(), AddressType::IPV4) {
for (n, protocol_type) in protocol_types.iter().enumerate() {
if nodes_proto_v4[n] < max_per_type
&& did.dial_info.protocol_type() == *protocol_type
{
nodes_proto_v4[n] += 1;
keep = true;
}
if matches!(did.dial_info.address_type(), AddressType::IPV4) {
for (n, protocol_type) in protocol_types.iter().enumerate() {
if nodes_proto_v4[n] < max_per_type
&& did.dial_info.protocol_type() == *protocol_type
{
nodes_proto_v4[n] += 1;
keep = true;
}
} else if matches!(did.dial_info.address_type(), AddressType::IPV6) {
for (n, protocol_type) in protocol_types.iter().enumerate() {
if nodes_proto_v6[n] < max_per_type
&& did.dial_info.protocol_type() == *protocol_type
{
nodes_proto_v6[n] += 1;
keep = true;
}
}
} else if matches!(did.dial_info.address_type(), AddressType::IPV6) {
for (n, protocol_type) in protocol_types.iter().enumerate() {
if nodes_proto_v6[n] < max_per_type
&& did.dial_info.protocol_type() == *protocol_type
{
nodes_proto_v6[n] += 1;
keep = true;
}
}
}
keep
};
e.node_info(RoutingDomain::PublicInternet)
.map(filter)
.unwrap_or(false)
}
keep
})
},
) as RoutingTableEntryFilter;

View file

@ -5,22 +5,13 @@ const MAX_INSPECT_VALUE_Q_SUBKEY_RANGES_LEN: usize = 512;
pub const MAX_INSPECT_VALUE_A_SEQS_LEN: usize = 512;
const MAX_INSPECT_VALUE_A_PEERS_LEN: usize = 20;
#[derive(Clone)]
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct ValidateInspectValueContext {
pub last_descriptor: Option<SignedValueDescriptor>,
pub subkeys: ValueSubkeyRangeSet,
pub crypto_kind: CryptoKind,
}
impl fmt::Debug for ValidateInspectValueContext {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ValidateInspectValueContext")
.field("last_descriptor", &self.last_descriptor)
.field("crypto_kind", &self.crypto_kind)
.finish()
}
}
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct RPCOperationInspectValueQ {
key: TypedKey,
@ -161,12 +152,20 @@ impl RPCOperationInspectValueA {
};
// Ensure seqs returned does not exceeed subkeys requested
#[allow(clippy::unnecessary_cast)]
if self.seqs.len() as u64 > inspect_value_context.subkeys.len() as u64 {
let subkey_count = if inspect_value_context.subkeys.is_empty()
|| inspect_value_context.subkeys.is_full()
|| inspect_value_context.subkeys.len() > MAX_INSPECT_VALUE_A_SEQS_LEN as u64
{
MAX_INSPECT_VALUE_A_SEQS_LEN as u64
} else {
inspect_value_context.subkeys.len()
};
if self.seqs.len() as u64 > subkey_count {
return Err(RPCError::protocol(format!(
"InspectValue seqs length is greater than subkeys requested: {} > {}",
"InspectValue seqs length is greater than subkeys requested: {} > {}: {:#?}",
self.seqs.len(),
inspect_value_context.subkeys.len()
subkey_count,
inspect_value_context
)));
}

View file

@ -5,7 +5,7 @@ impl_veilid_log_facility!("rpc");
#[derive(Clone, Debug)]
pub struct InspectValueAnswer {
pub seqs: Vec<ValueSeqNum>,
pub seqs: Vec<Option<ValueSeqNum>>,
pub peers: Vec<Arc<PeerInfo>>,
pub descriptor: Option<SignedValueDescriptor>,
}
@ -110,6 +110,11 @@ impl RPCProcessor {
};
let (seqs, peers, descriptor) = inspect_value_a.destructure();
let seqs = seqs
.into_iter()
.map(|x| if x == ValueSeqNum::MAX { None } else { Some(x) })
.collect::<Vec<_>>();
if debug_target_enabled!("dht") {
let debug_string_answer = format!(
"OUT <== InspectValueA({} {} peers={}) <= {} seqs:\n{}",
@ -232,8 +237,15 @@ impl RPCProcessor {
.inbound_inspect_value(key, subkeys, want_descriptor)
.await
.map_err(RPCError::internal)?);
(inspect_result.seqs, inspect_result.opt_descriptor)
(
inspect_result.seqs().to_vec(),
inspect_result.opt_descriptor(),
)
};
let inspect_result_seqs = inspect_result_seqs
.into_iter()
.map(|x| if let Some(s) = x { s } else { ValueSubkey::MAX })
.collect::<Vec<_>>();
if debug_target_enabled!("dht") {
let debug_string_answer = format!(

View file

@ -79,7 +79,7 @@ impl StorageManager {
pub async fn debug_local_record_subkey_info(
&self,
key: TypedKey,
record_key: TypedKey,
subkey: ValueSubkey,
) -> String {
let inner = self.inner.lock().await;
@ -87,12 +87,12 @@ impl StorageManager {
return "not initialized".to_owned();
};
local_record_store
.debug_record_subkey_info(key, subkey)
.debug_record_subkey_info(record_key, subkey)
.await
}
pub async fn debug_remote_record_subkey_info(
&self,
key: TypedKey,
record_key: TypedKey,
subkey: ValueSubkey,
) -> String {
let inner = self.inner.lock().await;
@ -100,17 +100,17 @@ impl StorageManager {
return "not initialized".to_owned();
};
remote_record_store
.debug_record_subkey_info(key, subkey)
.debug_record_subkey_info(record_key, subkey)
.await
}
pub async fn debug_local_record_info(&self, key: TypedKey) -> String {
pub async fn debug_local_record_info(&self, record_key: TypedKey) -> String {
let inner = self.inner.lock().await;
let Some(local_record_store) = &inner.local_record_store else {
return "not initialized".to_owned();
};
let local_debug = local_record_store.debug_record_info(key);
let local_debug = local_record_store.debug_record_info(record_key);
let opened_debug = if let Some(o) = inner.opened_records.get(&key) {
let opened_debug = if let Some(o) = inner.opened_records.get(&record_key) {
format!("Opened Record: {:#?}\n", o)
} else {
"".to_owned()
@ -119,11 +119,11 @@ impl StorageManager {
format!("{}\n{}", local_debug, opened_debug)
}
pub async fn debug_remote_record_info(&self, key: TypedKey) -> String {
pub async fn debug_remote_record_info(&self, record_key: TypedKey) -> String {
let inner = self.inner.lock().await;
let Some(remote_record_store) = &inner.remote_record_store else {
return "not initialized".to_owned();
};
remote_record_store.debug_record_info(key)
remote_record_store.debug_record_info(record_key)
}
}

View file

@ -28,7 +28,7 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_get_value(
&self,
key: TypedKey,
record_key: TypedKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
last_get_result: GetResult,
@ -47,7 +47,7 @@ impl StorageManager {
// Get the nodes we know are caching this value to seed the fanout
let init_fanout_queue = {
self.get_value_nodes(key)
self.get_value_nodes(record_key)
.await?
.unwrap_or_default()
.into_iter()
@ -93,7 +93,7 @@ impl StorageManager {
.rpc_call_get_value(
Destination::direct(next_node.routing_domain_filtered(routing_domain))
.with_safety(safety_selection),
key,
record_key,
subkey,
last_descriptor.map(|x| (*x).clone()),
)
@ -255,7 +255,7 @@ impl StorageManager {
let routing_table = registry.routing_table();
let fanout_call = FanoutCall::new(
&routing_table,
key,
record_key,
key_count,
fanout,
consensus_count,

View file

@ -28,7 +28,7 @@ impl DescriptorInfo {
/// Info tracked per subkey
struct SubkeySeqCount {
/// The newest sequence number found for a subkey
pub seq: ValueSeqNum,
pub seq: Option<ValueSeqNum>,
/// The set of nodes that had the most recent value for this subkey
pub consensus_nodes: Vec<NodeRef>,
/// The set of nodes that had any value for this subkey
@ -44,6 +44,7 @@ struct OutboundInspectValueContext {
}
/// The result of the outbound_get_value operation
#[derive(Debug, Clone)]
pub(super) struct OutboundInspectValueResult {
/// Fanout results for each subkey
pub subkey_fanout_results: Vec<FanoutResult>,
@ -56,13 +57,14 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_inspect_value(
&self,
key: TypedKey,
record_key: TypedKey,
subkeys: ValueSubkeyRangeSet,
safety_selection: SafetySelection,
local_inspect_result: InspectResult,
use_set_scope: bool,
) -> VeilidAPIResult<OutboundInspectValueResult> {
let routing_domain = RoutingDomain::PublicInternet;
let requested_subkeys = subkeys.clone();
// Get the DHT parameters for 'InspectValue'
// Can use either 'get scope' or 'set scope' depending on the purpose of the inspection
@ -86,7 +88,7 @@ impl StorageManager {
// Get the nodes we know are caching this value to seed the fanout
let init_fanout_queue = {
self.get_value_nodes(key)
self.get_value_nodes(record_key)
.await?
.unwrap_or_default()
.into_iter()
@ -99,16 +101,16 @@ impl StorageManager {
};
// Make do-inspect-value answer context
let opt_descriptor_info = if let Some(descriptor) = &local_inspect_result.opt_descriptor {
let opt_descriptor_info = if let Some(descriptor) = local_inspect_result.opt_descriptor() {
// Get the descriptor info. This also truncates the subkeys list to what can be returned from the network.
Some(DescriptorInfo::new(descriptor.clone(), &subkeys)?)
Some(DescriptorInfo::new(descriptor, &subkeys)?)
} else {
None
};
let context = Arc::new(Mutex::new(OutboundInspectValueContext {
seqcounts: local_inspect_result
.seqs
.seqs()
.iter()
.map(|s| SubkeySeqCount {
seq: *s,
@ -127,7 +129,7 @@ impl StorageManager {
move |next_node: NodeRef| -> PinBoxFutureStatic<FanoutCallResult> {
let context = context.clone();
let registry = registry.clone();
let opt_descriptor = local_inspect_result.opt_descriptor.clone();
let opt_descriptor = local_inspect_result.opt_descriptor();
let subkeys = subkeys.clone();
Box::pin(async move {
let rpc_processor = registry.rpc_processor();
@ -136,7 +138,7 @@ impl StorageManager {
rpc_processor
.rpc_call_inspect_value(
Destination::direct(next_node.routing_domain_filtered(routing_domain)).with_safety(safety_selection),
key,
record_key,
subkeys.clone(),
opt_descriptor.map(|x| (*x).clone()),
)
@ -237,13 +239,13 @@ impl StorageManager {
// Then take that sequence number and note that we have gotten newer sequence numbers so we keep
// looking for consensus
// If the sequence number matches the old sequence number, then we keep the value node for reference later
if answer_seq != ValueSeqNum::MAX {
if ctx_seqcnt.seq == ValueSeqNum::MAX || answer_seq > ctx_seqcnt.seq
if let Some(answer_seq) = answer_seq {
if ctx_seqcnt.seq.is_none() || answer_seq > ctx_seqcnt.seq.unwrap()
{
// One node has shown us the latest sequence numbers so far
ctx_seqcnt.seq = answer_seq;
ctx_seqcnt.seq = Some(answer_seq);
ctx_seqcnt.consensus_nodes = vec![next_node.clone()];
} else if answer_seq == ctx_seqcnt.seq {
} else if answer_seq == ctx_seqcnt.seq.unwrap() {
// Keep the nodes that showed us the latest values
ctx_seqcnt.consensus_nodes.push(next_node.clone());
}
@ -288,7 +290,7 @@ impl StorageManager {
let routing_table = self.routing_table();
let fanout_call = FanoutCall::new(
&routing_table,
key,
record_key,
key_count,
fanout,
consensus_count,
@ -322,28 +324,41 @@ impl StorageManager {
veilid_log!(self debug "InspectValue Fanout: {:#}:\n{}", fanout_result, debug_fanout_results(&subkey_fanout_results));
}
Ok(OutboundInspectValueResult {
let result = OutboundInspectValueResult {
subkey_fanout_results,
inspect_result: InspectResult {
subkeys: ctx
.opt_descriptor_info
inspect_result: InspectResult::new(
self,
requested_subkeys,
"outbound_inspect_value",
ctx.opt_descriptor_info
.as_ref()
.map(|d| d.subkeys.clone())
.unwrap_or_default(),
seqs: ctx.seqcounts.iter().map(|cs| cs.seq).collect(),
opt_descriptor: ctx
.opt_descriptor_info
ctx.seqcounts.iter().map(|cs| cs.seq).collect(),
ctx.opt_descriptor_info
.as_ref()
.map(|d| d.descriptor.clone()),
},
})
)?,
};
#[allow(clippy::unnecessary_cast)]
{
if result.inspect_result.subkeys().len() as u64
!= result.subkey_fanout_results.len() as u64
{
veilid_log!(self error "mismatch between subkeys returned and fanout results returned: {}!={}", result.inspect_result.subkeys().len(), result.subkey_fanout_results.len());
apibail_internal!("subkey and fanout list length mismatched");
}
}
Ok(result)
}
/// Handle a received 'Inspect Value' query
#[instrument(level = "trace", target = "dht", skip_all)]
pub async fn inbound_inspect_value(
&self,
key: TypedKey,
record_key: TypedKey,
subkeys: ValueSubkeyRangeSet,
want_descriptor: bool,
) -> VeilidAPIResult<NetworkResult<InspectResult>> {
@ -352,24 +367,25 @@ impl StorageManager {
// See if this is a remote or local value
let (_is_local, inspect_result) = {
// See if the subkey we are getting has a last known local value
let mut local_inspect_result =
Self::handle_inspect_local_value_inner(&mut inner, key, subkeys.clone(), true)
.await?;
let mut local_inspect_result = self
.handle_inspect_local_value_inner(&mut inner, record_key, subkeys.clone(), true)
.await?;
// If this is local, it must have a descriptor already
if local_inspect_result.opt_descriptor.is_some() {
if local_inspect_result.opt_descriptor().is_some() {
if !want_descriptor {
local_inspect_result.opt_descriptor = None;
local_inspect_result.drop_descriptor();
}
(true, local_inspect_result)
} else {
// See if the subkey we are getting has a last known remote value
let remote_inspect_result = Self::handle_inspect_remote_value_inner(
&mut inner,
key,
subkeys,
want_descriptor,
)
.await?;
let remote_inspect_result = self
.handle_inspect_remote_value_inner(
&mut inner,
record_key,
subkeys,
want_descriptor,
)
.await?;
(false, remote_inspect_result)
}
};

File diff suppressed because it is too large Load diff

View file

@ -133,7 +133,7 @@ impl OutboundWatchManager {
// Watch does not exist, add one if that's what is desired
if let Some(desired) = desired_watch {
self.outbound_watches
.insert(record_key, OutboundWatch::new(desired));
.insert(record_key, OutboundWatch::new(record_key, desired));
}
}
}

View file

@ -4,6 +4,9 @@ impl_veilid_log_facility!("stor");
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(in crate::storage_manager) struct OutboundWatch {
/// Record key being watched
record_key: TypedKey,
/// Current state
/// None means inactive/cancelled
state: Option<OutboundWatchState>,
@ -34,12 +37,14 @@ impl fmt::Display for OutboundWatch {
impl OutboundWatch {
/// Create new outbound watch with desired parameters
pub fn new(desired: OutboundWatchParameters) -> Self {
pub fn new(record_key: TypedKey, desired: OutboundWatchParameters) -> Self {
Self {
record_key,
state: None,
desired: Some(desired),
}
}
/// Get current watch state if it exists
pub fn state(&self) -> Option<&OutboundWatchState> {
self.state.as_ref()
@ -107,7 +112,7 @@ impl OutboundWatch {
/// Returns true if this outbound watch needs to be cancelled
pub fn needs_cancel(&self, registry: &VeilidComponentRegistry) -> bool {
if self.is_dead() {
veilid_log!(registry warn "should have checked for is_dead first");
veilid_log!(registry warn "Should have checked for is_dead first");
return false;
}
@ -118,6 +123,7 @@ impl OutboundWatch {
// If the desired parameters is None then cancel
let Some(_desired) = self.desired.as_ref() else {
veilid_log!(registry debug target: "dht", "OutboundWatch({}): needs_cancel because desired is None", self.record_key);
return true;
};
@ -132,7 +138,7 @@ impl OutboundWatch {
cur_ts: Timestamp,
) -> bool {
if self.is_dead() || self.needs_cancel(registry) {
veilid_log!(registry warn "should have checked for is_dead and needs_cancel first");
veilid_log!(registry warn "Should have checked for is_dead and needs_cancel first");
return false;
}
@ -156,11 +162,17 @@ impl OutboundWatch {
// If we have a consensus but need to renew because some per-node watches
// either expired or had their routes die, do it
if self.wants_per_node_watch_update(registry, state, cur_ts) {
veilid_log!(registry debug target: "dht", "OutboundWatch({}): needs_renew because per_node_watch wants update", self.record_key);
return true;
}
// If the desired parameters have changed, then we should renew with them
state.params() != desired
if state.params() != desired {
veilid_log!(registry debug target: "dht", "OutboundWatch({}): needs_renew because desired params have changed: {} != {}", self.record_key, state.params(), desired);
return true;
}
false
}
/// Returns true if there is work to be done on getting the outbound
@ -175,7 +187,7 @@ impl OutboundWatch {
|| self.needs_cancel(registry)
|| self.needs_renew(registry, consensus_count, cur_ts)
{
veilid_log!(registry warn "should have checked for is_dead, needs_cancel, needs_renew first");
veilid_log!(registry warn "Should have checked for is_dead, needs_cancel, needs_renew first");
return false;
}
@ -187,6 +199,7 @@ impl OutboundWatch {
// If there is a desired watch but no current state, then reconcile
let Some(state) = self.state() else {
veilid_log!(registry debug target: "dht", "OutboundWatch({}): needs_reconcile because state is empty", self.record_key);
return true;
};
@ -195,13 +208,17 @@ impl OutboundWatch {
if state.nodes().len() < consensus_count
&& cur_ts >= state.next_reconcile_ts().unwrap_or_default()
{
veilid_log!(registry debug target: "dht", "OutboundWatch({}): needs_reconcile because consensus count is too low {} < {}", self.record_key, state.nodes().len(), consensus_count);
return true;
}
// Try to reconcile if our number of nodes currently is less than what we got from
// the previous reconciliation attempt
if let Some(last_consensus_node_count) = state.last_consensus_node_count() {
if state.nodes().len() < last_consensus_node_count {
if state.nodes().len() < last_consensus_node_count
&& state.nodes().len() < consensus_count
{
veilid_log!(registry debug target: "dht", "OutboundWatch({}): needs_reconcile because node count is less than last consensus {} < {}", self.record_key, state.nodes().len(), last_consensus_node_count);
return true;
}
}
@ -209,11 +226,17 @@ impl OutboundWatch {
// If we have a consensus, or are not attempting consensus at this time,
// but need to reconcile because some per-node watches either expired or had their routes die, do it
if self.wants_per_node_watch_update(registry, state, cur_ts) {
veilid_log!(registry debug target: "dht", "OutboundWatch({}): needs_reconcile because per_node_watch wants update", self.record_key);
return true;
}
// If the desired parameters have changed, then we should reconcile with them
state.params() != desired
if state.params() != desired {
veilid_log!(registry debug target: "dht", "OutboundWatch({}): needs_reconcile because desired params have changed: {} != {}", self.record_key, state.params(), desired);
return true;
}
false
}
/// Returns true if we need to update our per-node watches due to expiration,
@ -233,6 +256,7 @@ impl OutboundWatch {
&& (state.params().expiration_ts.as_u64() == 0
|| renew_ts < state.params().expiration_ts)
{
veilid_log!(registry debug target: "dht", "OutboundWatch({}): wants_per_node_watch_update because cur_ts is in expiration renew window", self.record_key);
return true;
}
@ -244,6 +268,7 @@ impl OutboundWatch {
for vcr in state.value_changed_routes() {
if rss.get_route_id_for_key(vcr).is_none() {
// Route we would receive value changes on is dead
veilid_log!(registry debug target: "dht", "OutboundWatch({}): wants_per_node_watch_update because route is dead: {}", self.record_key, vcr);
return true;
}
}

View file

@ -4,7 +4,7 @@ const L2_CACHE_DEPTH: usize = 4; // XXX: i just picked this. we could probably d
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct InspectCacheL2Value {
pub seqs: Vec<ValueSeqNum>,
pub seqs: Vec<Option<ValueSeqNum>>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
@ -67,7 +67,7 @@ impl InspectCache {
continue;
};
if idx < entry.1.seqs.len() {
entry.1.seqs[idx] = seq;
entry.1.seqs[idx] = Some(seq);
} else {
panic!(
"representational error in l2 inspect cache: {} >= {}",

View file

@ -128,11 +128,55 @@ pub struct GetResult {
#[derive(Default, Clone, Debug)]
pub struct InspectResult {
/// The actual in-schema subkey range being reported on
pub subkeys: ValueSubkeyRangeSet,
subkeys: ValueSubkeyRangeSet,
/// The sequence map
pub seqs: Vec<ValueSeqNum>,
seqs: Vec<Option<ValueSeqNum>>,
/// The descriptor if we got a fresh one or empty if no descriptor was needed
pub opt_descriptor: Option<Arc<SignedValueDescriptor>>,
opt_descriptor: Option<Arc<SignedValueDescriptor>>,
}
impl InspectResult {
pub fn new(
registry_accessor: &impl VeilidComponentRegistryAccessor,
requested_subkeys: ValueSubkeyRangeSet,
log_context: &str,
subkeys: ValueSubkeyRangeSet,
seqs: Vec<Option<ValueSeqNum>>,
opt_descriptor: Option<Arc<SignedValueDescriptor>>,
) -> VeilidAPIResult<Self> {
#[allow(clippy::unnecessary_cast)]
{
if subkeys.len() as u64 != seqs.len() as u64 {
veilid_log!(registry_accessor error "{}: mismatch between subkeys returned and sequence number list returned: {}!={}", log_context, subkeys.len(), seqs.len());
apibail_internal!("list length mismatch");
}
}
if !subkeys.is_subset(&requested_subkeys) {
veilid_log!(registry_accessor error "{}: more subkeys returned than requested: {} not a subset of {}", log_context, subkeys, requested_subkeys);
apibail_internal!("invalid subkeys returned");
}
Ok(InspectResult {
subkeys,
seqs,
opt_descriptor,
})
}
pub fn subkeys(&self) -> &ValueSubkeyRangeSet {
&self.subkeys
}
pub fn seqs(&self) -> &[Option<ValueSeqNum>] {
&self.seqs
}
pub fn seqs_mut(&mut self) -> &mut [Option<ValueSeqNum>] {
&mut self.seqs
}
pub fn opt_descriptor(&self) -> Option<Arc<SignedValueDescriptor>> {
self.opt_descriptor.clone()
}
pub fn drop_descriptor(&mut self) {
self.opt_descriptor = None;
}
}
impl<D> RecordStore<D>
@ -822,18 +866,18 @@ where
pub async fn inspect_record(
&mut self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
subkeys: &ValueSubkeyRangeSet,
want_descriptor: bool,
) -> VeilidAPIResult<Option<InspectResult>> {
// Get record from index
let Some((subkeys, opt_descriptor)) = self.with_record(key, |record| {
let Some((schema_subkeys, opt_descriptor)) = self.with_record(key, |record| {
// Get number of subkeys from schema and ensure we are getting the
// right number of sequence numbers betwen that and what we asked for
let truncated_subkeys = record
let schema_subkeys = record
.schema()
.truncate_subkeys(&subkeys, Some(MAX_INSPECT_VALUE_A_SEQS_LEN));
.truncate_subkeys(subkeys, Some(MAX_INSPECT_VALUE_A_SEQS_LEN));
(
truncated_subkeys,
schema_subkeys,
if want_descriptor {
Some(record.descriptor().clone())
} else {
@ -846,56 +890,60 @@ where
};
// Check if we can return some subkeys
if subkeys.is_empty() {
apibail_invalid_argument!("subkeys set does not overlap schema", "subkeys", subkeys);
if schema_subkeys.is_empty() {
apibail_invalid_argument!(
"subkeys set does not overlap schema",
"subkeys",
schema_subkeys
);
}
// See if we have this inspection cached
if let Some(icv) = self.inspect_cache.get(&key, &subkeys) {
return Ok(Some(InspectResult {
subkeys,
seqs: icv.seqs,
if let Some(icv) = self.inspect_cache.get(&key, &schema_subkeys) {
return Ok(Some(InspectResult::new(
self,
subkeys.clone(),
"inspect_record",
schema_subkeys.clone(),
icv.seqs,
opt_descriptor,
}));
)?));
}
// Build sequence number list to return
#[allow(clippy::unnecessary_cast)]
let mut seqs = Vec::with_capacity(subkeys.len() as usize);
for subkey in subkeys.iter() {
let mut seqs = Vec::with_capacity(schema_subkeys.len() as usize);
for subkey in schema_subkeys.iter() {
let stk = SubkeyTableKey { key, subkey };
let seq = if let Some(record_data) = self.subkey_cache.peek(&stk) {
record_data.signed_value_data().value_data().seq()
let opt_seq = if let Some(record_data) = self.subkey_cache.peek(&stk) {
Some(record_data.signed_value_data().value_data().seq())
} else {
// If not in cache, try to pull from table store if it is in our stored subkey set
// XXX: This would be better if it didn't have to pull the whole record data to get the seq.
if let Some(record_data) = self
.subkey_table
self.subkey_table
.load_json::<RecordData>(0, &stk.bytes())
.await
.map_err(VeilidAPIError::internal)?
{
record_data.signed_value_data().value_data().seq()
} else {
// Subkey not written to
ValueSubkey::MAX
}
.map(|record_data| record_data.signed_value_data().value_data().seq())
};
seqs.push(seq)
seqs.push(opt_seq)
}
// Save seqs cache
self.inspect_cache.put(
key,
subkeys.clone(),
schema_subkeys.clone(),
InspectCacheL2Value { seqs: seqs.clone() },
);
Ok(Some(InspectResult {
subkeys,
Ok(Some(InspectResult::new(
self,
subkeys.clone(),
"inspect_record",
schema_subkeys,
seqs,
opt_descriptor,
}))
)?))
}
#[instrument(level = "trace", target = "stor", skip_all, err)]
@ -1242,7 +1290,7 @@ where
changes.push(ValueChangedInfo {
target: evci.target,
key: evci.key,
record_key: evci.key,
subkeys: evci.subkeys,
count: evci.count,
watch_id: evci.watch_id,

View file

@ -0,0 +1,271 @@
use super::{inspect_value::OutboundInspectValueResult, *};
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct RehydrateReport {
/// The record key rehydrated
record_key: TypedKey,
/// The requested range of subkeys to rehydrate if necessary
subkeys: ValueSubkeyRangeSet,
/// The requested consensus count,
consensus_count: usize,
/// The range of subkeys that wanted rehydration
wanted: ValueSubkeyRangeSet,
/// The range of subkeys that actually could be rehydrated
rehydrated: ValueSubkeyRangeSet,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(super) struct RehydrationRequest {
pub subkeys: ValueSubkeyRangeSet,
pub consensus_count: usize,
}
impl StorageManager {
/// Add a background rehydration request
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn add_rehydration_request(
&self,
record_key: TypedKey,
subkeys: ValueSubkeyRangeSet,
consensus_count: usize,
) {
let req = RehydrationRequest {
subkeys,
consensus_count,
};
veilid_log!(self debug "Adding rehydration request: {} {:?}", record_key, req);
let mut inner = self.inner.lock().await;
inner
.rehydration_requests
.entry(record_key)
.and_modify(|r| {
r.subkeys = r.subkeys.union(&req.subkeys);
r.consensus_count.max_assign(req.consensus_count);
})
.or_insert(req);
}
/// Sends the local copies of all of a record's subkeys back to the network
/// Triggers a subkey update if the consensus on the subkey is less than
/// the specified 'consensus_count'.
/// The subkey updates are performed in the background if rehydration was
/// determined to be necessary.
/// If a newer copy of a subkey's data is available online, the background
/// write will pick up the newest subkey data as it does the SetValue fanout
/// and will drive the newest values to consensus.
#[instrument(level = "trace", target = "stor", skip(self), ret, err)]
pub(super) async fn rehydrate_record(
&self,
record_key: TypedKey,
subkeys: ValueSubkeyRangeSet,
consensus_count: usize,
) -> VeilidAPIResult<RehydrateReport> {
veilid_log!(self debug "Checking for record rehydration: {} {} @ consensus {}", record_key, subkeys, consensus_count);
// Get subkey range for consideration
let subkeys = if subkeys.is_empty() {
ValueSubkeyRangeSet::full()
} else {
subkeys
};
// Get safety selection
let mut inner = self.inner.lock().await;
let safety_selection = {
if let Some(opened_record) = inner.opened_records.get(&record_key) {
opened_record.safety_selection()
} else {
// See if it's in the local record store
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
let Some(safety_selection) =
local_record_store.with_record(record_key, |rec| rec.detail().safety_selection)
else {
apibail_key_not_found!(record_key);
};
safety_selection
}
};
// See if the requested record is our local record store
let local_inspect_result = self
.handle_inspect_local_value_inner(&mut inner, record_key, subkeys.clone(), true)
.await?;
// Get rpc processor and drop mutex so we don't block while getting the value from the network
if !self.dht_is_online() {
apibail_try_again!("offline, try again later");
};
// Drop the lock for network access
drop(inner);
// Get the inspect record report from the network
let result = self
.outbound_inspect_value(
record_key,
subkeys.clone(),
safety_selection,
InspectResult::default(),
true,
)
.await?;
// If online result had no subkeys, then trigger writing the entire record in the background
if result.inspect_result.subkeys().is_empty()
|| result.inspect_result.opt_descriptor().is_none()
{
return self
.rehydrate_all_subkeys(
record_key,
subkeys,
consensus_count,
safety_selection,
local_inspect_result,
)
.await;
}
return self
.rehydrate_required_subkeys(
record_key,
subkeys,
consensus_count,
safety_selection,
local_inspect_result,
result,
)
.await;
}
#[instrument(level = "trace", target = "stor", skip(self), ret, err)]
pub(super) async fn rehydrate_all_subkeys(
&self,
record_key: TypedKey,
subkeys: ValueSubkeyRangeSet,
consensus_count: usize,
safety_selection: SafetySelection,
local_inspect_result: InspectResult,
) -> VeilidAPIResult<RehydrateReport> {
let mut inner = self.inner.lock().await;
veilid_log!(self debug "Rehydrating all subkeys: record={} subkeys={}", record_key, local_inspect_result.subkeys());
let mut rehydrated = ValueSubkeyRangeSet::new();
for (n, subkey) in local_inspect_result.subkeys().iter().enumerate() {
if local_inspect_result.seqs()[n].is_some() {
// Add to offline writes to flush
veilid_log!(self debug "Rehydrating: record={} subkey={}", record_key, subkey);
rehydrated.insert(subkey);
Self::add_offline_subkey_write_inner(
&mut inner,
record_key,
subkey,
safety_selection,
);
}
}
if rehydrated.is_empty() {
veilid_log!(self debug "Record wanted full rehydrating, but no subkey data available: record={} subkeys={}", record_key, subkeys);
} else {
veilid_log!(self debug "Record full rehydrating: record={} subkeys={} rehydrated={}", record_key, subkeys, rehydrated);
}
return Ok(RehydrateReport {
record_key,
subkeys,
consensus_count,
wanted: local_inspect_result.subkeys().clone(),
rehydrated,
});
}
#[instrument(level = "trace", target = "stor", skip(self), ret, err)]
pub(super) async fn rehydrate_required_subkeys(
&self,
record_key: TypedKey,
subkeys: ValueSubkeyRangeSet,
consensus_count: usize,
safety_selection: SafetySelection,
local_inspect_result: InspectResult,
outbound_inspect_result: OutboundInspectValueResult,
) -> VeilidAPIResult<RehydrateReport> {
let mut inner = self.inner.lock().await;
// Get cryptosystem
let crypto = self.crypto();
let Some(vcrypto) = crypto.get(record_key.kind) else {
apibail_generic!("unsupported cryptosystem");
};
if local_inspect_result.subkeys().len()
!= outbound_inspect_result.subkey_fanout_results.len() as u64
{
veilid_log!(self debug "Subkey count mismatch when rehydrating required subkeys: record={} {} != {}",
record_key, local_inspect_result.subkeys().len(), outbound_inspect_result.subkey_fanout_results.len());
apibail_internal!("subkey count mismatch");
}
// For each subkey, determine if we should rehydrate it
let mut wanted = ValueSubkeyRangeSet::new();
let mut rehydrated = ValueSubkeyRangeSet::new();
for (n, subkey) in local_inspect_result.subkeys().iter().enumerate() {
let sfr = outbound_inspect_result
.subkey_fanout_results
.get(n)
.unwrap();
// Does the online subkey have enough consensus?
// If not, schedule it to be written in the background
if sfr.consensus_nodes.len() < consensus_count {
wanted.insert(subkey);
if local_inspect_result.seqs()[n].is_some() {
// Add to offline writes to flush
veilid_log!(self debug "Rehydrating: record={} subkey={}", record_key, subkey);
rehydrated.insert(subkey);
Self::add_offline_subkey_write_inner(
&mut inner,
record_key,
subkey,
safety_selection,
);
}
}
}
if wanted.is_empty() {
veilid_log!(self debug "Record did not need rehydrating: record={} local_subkeys={}", record_key, local_inspect_result.subkeys());
} else if rehydrated.is_empty() {
veilid_log!(self debug "Record wanted rehydrating, but no subkey data available: record={} local_subkeys={} wanted={}", record_key, local_inspect_result.subkeys(), wanted);
} else {
veilid_log!(self debug "Record rehydrating: record={} local_subkeys={} wanted={} rehydrated={}", record_key, local_inspect_result.subkeys(), wanted, rehydrated);
}
// Keep the list of nodes that returned a value for later reference
let results_iter = outbound_inspect_result
.inspect_result
.subkeys()
.iter()
.map(ValueSubkeyRangeSet::single)
.zip(outbound_inspect_result.subkey_fanout_results.into_iter());
Self::process_fanout_results_inner(
&mut inner,
&vcrypto,
record_key,
results_iter,
false,
self.config()
.with(|c| c.network.dht.set_value_count as usize),
);
Ok(RehydrateReport {
record_key,
subkeys,
consensus_count,
wanted,
rehydrated,
})
}
}

View file

@ -28,7 +28,7 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_set_value(
&self,
key: TypedKey,
record_key: TypedKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
value: Arc<SignedValueData>,
@ -48,7 +48,7 @@ impl StorageManager {
// Get the nodes we know are caching this value to seed the fanout
let init_fanout_queue = {
self.get_value_nodes(key)
self.get_value_nodes(record_key)
.await?
.unwrap_or_default()
.into_iter()
@ -99,7 +99,7 @@ impl StorageManager {
.rpc_call_set_value(
Destination::direct(next_node.routing_domain_filtered(routing_domain))
.with_safety(safety_selection),
key,
record_key,
subkey,
(*value).clone(),
(*descriptor).clone(),
@ -228,7 +228,7 @@ impl StorageManager {
let routing_table = registry.routing_table();
let fanout_call = FanoutCall::new(
&routing_table,
key,
record_key,
key_count,
fanout,
consensus_count,

View file

@ -2,6 +2,7 @@ pub mod check_inbound_watches;
pub mod check_outbound_watches;
pub mod flush_record_stores;
pub mod offline_subkey_writes;
pub mod rehydrate_records;
pub mod save_metadata;
pub mod send_value_changes;
@ -55,6 +56,15 @@ impl StorageManager {
check_inbound_watches_task,
check_inbound_watches_task_routine
);
// Set rehydrate records tick task
veilid_log!(self debug "starting rehydrate records task");
impl_setup_task!(
self,
Self,
rehydrate_records_task,
rehydrate_records_task_routine
);
}
#[instrument(parent = None, level = "trace", target = "stor", name = "StorageManager::tick", skip_all, err)]
@ -78,6 +88,11 @@ impl StorageManager {
self.offline_subkey_writes_task.tick().await?;
}
// Do requested rehydrations
if self.has_rehydration_requests().await {
self.rehydrate_records_task.tick().await?;
}
// Send value changed notifications
self.send_value_changes_task.tick().await?;
}
@ -106,5 +121,9 @@ impl StorageManager {
if let Err(e) = self.offline_subkey_writes_task.stop().await {
veilid_log!(self warn "offline_subkey_writes_task not stopped: {}", e);
}
veilid_log!(self debug "stopping record rehydration task");
if let Err(e) = self.rehydrate_records_task.stop().await {
veilid_log!(self warn "rehydrate_records_task not stopped: {}", e);
}
}
}

View file

@ -0,0 +1,50 @@
use super::*;
impl_veilid_log_facility!("stor");
impl StorageManager {
/// Process background rehydration requests
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn rehydrate_records_task_routine(
&self,
stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
let reqs = {
let mut inner = self.inner.lock().await;
core::mem::take(&mut inner.rehydration_requests)
};
let mut futs = Vec::new();
for req in reqs {
futs.push(async move {
let res = self
.rehydrate_record(req.0, req.1.subkeys.clone(), req.1.consensus_count)
.await;
(req, res)
});
}
process_batched_future_queue(
futs,
REHYDRATE_BATCH_SIZE,
stop_token,
|(req, res)| async move {
let _report = match res {
Ok(v) => v,
Err(e) => {
veilid_log!(self debug "Rehydration request failed: {}", e);
// Try again later
self.add_rehydration_request(req.0, req.1.subkeys, req.1.consensus_count)
.await;
return;
}
};
},
)
.await;
Ok(())
}
}

View file

@ -263,7 +263,7 @@ impl StorageManager {
let disposition = if wva.answer.accepted {
if wva.answer.expiration_ts.as_u64() > 0 {
// If the expiration time is greater than zero this watch is active
veilid_log!(registry debug "WatchValue accepted for {}: id={} expiration_ts={} ({})", record_key, wva.answer.watch_id, display_ts(wva.answer.expiration_ts.as_u64()), next_node);
veilid_log!(registry debug target:"dht", "WatchValue accepted for {}: id={} expiration_ts={} ({})", record_key, wva.answer.watch_id, display_ts(wva.answer.expiration_ts.as_u64()), next_node);
// Add to accepted watches
let mut ctx = context.lock();
@ -279,7 +279,7 @@ impl StorageManager {
// If the returned expiration time is zero, this watch was cancelled
// If the expiration time is greater than zero this watch is active
veilid_log!(registry debug "WatchValue rejected for {}: id={} expiration_ts={} ({})", record_key, wva.answer.watch_id, display_ts(wva.answer.expiration_ts.as_u64()), next_node);
veilid_log!(registry debug target:"dht", "WatchValue rejected for {}: id={} expiration_ts={} ({})", record_key, wva.answer.watch_id, display_ts(wva.answer.expiration_ts.as_u64()), next_node);
// Add to rejected watches
let mut ctx = context.lock();
@ -344,10 +344,10 @@ impl StorageManager {
let fanout_result = fanout_call.run(init_fanout_queue).await.inspect_err(|e| {
// If we finished with an error, return that
veilid_log!(self debug "WatchValue fanout error: {}", e);
veilid_log!(self debug target:"dht", "WatchValue fanout error: {}", e);
})?;
veilid_log!(self debug "WatchValue Fanout: {:#}", fanout_result);
veilid_log!(self debug target:"dht", "WatchValue Fanout: {:#}", fanout_result);
// Get cryptosystem
let crypto = self.crypto();
@ -476,7 +476,7 @@ impl StorageManager {
cancelled.push(pnk);
}
Err(e) => {
veilid_log!(self debug "outbound watch cancel error: {}", e);
veilid_log!(self debug "Outbound watch cancel error: {}", e);
// xxx should do something different for network unreachable vs host unreachable
// Leave in the 'per node states' for now because we couldn't contact the node
@ -604,7 +604,7 @@ impl StorageManager {
};
}
Err(e) => {
veilid_log!(self debug "outbound watch change error: {}", e);
veilid_log!(self debug "Outbound watch change error: {}", e);
}
}
}
@ -718,7 +718,7 @@ impl StorageManager {
});
}
Err(e) => {
veilid_log!(self debug "outbound watch fanout error: {}", e);
veilid_log!(self debug "Outbound watch fanout error: {}", e);
}
}
@ -742,11 +742,11 @@ impl StorageManager {
.outbound_watches
.get_mut(&record_key)
else {
veilid_log!(self warn "outbound watch should have still been in the table");
veilid_log!(self warn "Outbound watch should have still been in the table");
return;
};
let Some(desired) = outbound_watch.desired() else {
veilid_log!(self warn "watch with result should have desired params");
veilid_log!(self warn "Watch with result should have desired params");
return;
};
@ -852,6 +852,10 @@ impl StorageManager {
.config()
.with(|c| c.network.dht.get_value_count as usize);
// Operate on this watch only if it isn't already being operated on
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
// Terminate the 'desired' params for watches
// that have no remaining count or have expired
outbound_watch.try_expire_desired_state(cur_ts);
@ -859,9 +863,6 @@ impl StorageManager {
// Check states
if outbound_watch.is_dead() {
// Outbound watch is dead
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
let fut = {
let registry = self.registry();
async move {
@ -874,9 +875,6 @@ impl StorageManager {
return Some(pin_dyn_future!(fut));
} else if outbound_watch.needs_cancel(&registry) {
// Outbound watch needs to be cancelled
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
let fut = {
let registry = self.registry();
async move {
@ -889,9 +887,6 @@ impl StorageManager {
return Some(pin_dyn_future!(fut));
} else if outbound_watch.needs_renew(&registry, consensus_count, cur_ts) {
// Outbound watch expired but can be renewed
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
let fut = {
let registry = self.registry();
async move {
@ -904,9 +899,6 @@ impl StorageManager {
return Some(pin_dyn_future!(fut));
} else if outbound_watch.needs_reconcile(&registry, consensus_count, cur_ts) {
// Outbound watch parameters have changed or it needs more nodes
let watch_lock =
opt_watch_lock.or_else(|| self.outbound_watch_lock_table.try_lock_tag(key))?;
let fut = {
let registry = self.registry();
async move {
@ -944,12 +936,12 @@ impl StorageManager {
return;
}
};
let mut changed_subkeys = report.changed_subkeys();
let mut newer_online_subkeys = report.newer_online_subkeys();
// Get changed first changed subkey until we find one to report
let mut n = 0;
while !changed_subkeys.is_empty() {
let first_changed_subkey = changed_subkeys.first().unwrap();
while !newer_online_subkeys.is_empty() {
let first_changed_subkey = newer_online_subkeys.first().unwrap();
let value = match this.get_value(record_key, first_changed_subkey, true).await {
Ok(v) => v,
@ -960,7 +952,8 @@ impl StorageManager {
};
if let Some(value) = value {
if value.seq() > report.local_seqs()[n] {
let opt_local_seq = report.local_seqs()[n];
if opt_local_seq.is_none() || value.seq() > opt_local_seq.unwrap() {
// Calculate the update
let (changed_subkeys, remaining_count, value) = {
let _watch_lock =
@ -991,7 +984,7 @@ impl StorageManager {
},
);
(changed_subkeys, remaining_count, value)
(newer_online_subkeys, remaining_count, value)
};
// Send the update
@ -1008,7 +1001,7 @@ impl StorageManager {
}
// If we didn't send an update, remove the first changed subkey and try again
changed_subkeys.pop_first();
newer_online_subkeys.pop_first();
n += 1;
}
}
@ -1111,14 +1104,14 @@ impl StorageManager {
inner.outbound_watch_manager.per_node_states.get_mut(&pnk)
else {
// No per node state means no callback
veilid_log!(self warn "missing per node state in outbound watch: {:?}", pnk);
veilid_log!(self warn "Missing per node state in outbound watch: {:?}", pnk);
return Ok(NetworkResult::value(()));
};
// If watch id doesn't match it's for an older watch and should be ignored
if per_node_state.watch_id != watch_id {
// No per node state means no callback
veilid_log!(self warn "incorrect watch id for per node state in outbound watch: {:?} {} != {}", pnk, per_node_state.watch_id, watch_id);
veilid_log!(self warn "Incorrect watch id for per node state in outbound watch: {:?} {} != {}", pnk, per_node_state.watch_id, watch_id);
return Ok(NetworkResult::value(()));
}
@ -1127,7 +1120,7 @@ impl StorageManager {
// If count is greater than our requested count then this is invalid, cancel the watch
// XXX: Should this be a punishment?
veilid_log!(self debug
"watch count went backward: {} @ {} id={}: {} > {}",
"Watch count went backward: {} @ {} id={}: {} > {}",
record_key,
inbound_node_id,
watch_id,
@ -1143,7 +1136,7 @@ impl StorageManager {
// Log this because watch counts should always be decrementing non a per-node basis.
// XXX: Should this be a punishment?
veilid_log!(self debug
"watch count duplicate: {} @ {} id={}: {} == {}",
"Watch count duplicate: {} @ {} id={}: {} == {}",
record_key,
inbound_node_id,
watch_id,
@ -1153,7 +1146,7 @@ impl StorageManager {
} else {
// Reduce the per-node watch count
veilid_log!(self debug
"watch count decremented: {} @ {} id={}: {} < {}",
"Watch count decremented: {} @ {} id={}: {} < {}",
record_key,
inbound_node_id,
watch_id,
@ -1285,7 +1278,7 @@ impl StorageManager {
remaining_count,
Some(value),
);
} else if reportable_subkeys.len() > 0 {
} else if !reportable_subkeys.is_empty() {
// We have subkeys that have be reported as possibly changed
// but not a specific record reported, so we should defer reporting and
// inspect the range to see what changed

View file

@ -1902,7 +1902,7 @@ impl VeilidAPI {
let (key, rc) =
self.clone()
.get_opened_dht_record_context(&args, "debug_record_watch", "key", 1)?;
.get_opened_dht_record_context(&args, "debug_record_inspect", "key", 1)?;
let mut rest_defaults = false;
@ -1947,6 +1947,62 @@ impl VeilidAPI {
Ok(format!("Success: report={:?}", report))
}
async fn debug_record_rehydrate(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let registry = self.core_context()?.registry();
let storage_manager = registry.storage_manager();
let key = get_debug_argument_at(
&args,
1,
"debug_record_rehydrate",
"key",
get_dht_key_no_safety,
)?;
let mut rest_defaults = false;
let subkeys = if rest_defaults {
None
} else {
get_debug_argument_at(&args, 2, "debug_record_rehydrate", "subkeys", get_subkeys)
.inspect_err(|_| {
rest_defaults = true;
})
.ok()
};
let consensus_count = if rest_defaults {
None
} else {
get_debug_argument_at(
&args,
3,
"debug_record_rehydrate",
"consensus_count",
get_number,
)
.inspect_err(|_| {
rest_defaults = true;
})
.ok()
};
// Do a record rehydrate
storage_manager
.add_rehydration_request(
key,
subkeys.unwrap_or_default(),
consensus_count.unwrap_or_else(|| {
registry
.config()
.with(|c| c.network.dht.get_value_count as usize)
}),
)
.await;
Ok("Request added".to_owned())
}
async fn debug_record(&self, args: String) -> VeilidAPIResult<String> {
let args: Vec<String> =
shell_words::split(&args).map_err(|e| VeilidAPIError::parse_error(e, args))?;
@ -1977,6 +2033,8 @@ impl VeilidAPI {
self.debug_record_cancel(args).await
} else if command == "inspect" {
self.debug_record_inspect(args).await
} else if command == "rehydrate" {
self.debug_record_rehydrate(args).await
} else {
Ok(">>> Unknown command\n".to_owned())
}
@ -2144,6 +2202,7 @@ DHT Operations:
watch [<key>] [<subkeys> [<expiration> [<count>]]] - watch a record for changes
cancel [<key>] [<subkeys>] - cancel a dht record watch
inspect [<key>] [<scope> [<subkeys>]] - display a dht record's subkey status
rehydrate <key> [<subkeys>] [<consensus count>] - send a dht record's expired local data back to the network
TableDB Operations:
table list - list the names of all the tables in the TableDB

View file

@ -26,7 +26,7 @@ pub struct DHTRecordDescriptor {
from_impl_to_jsvalue!(DHTRecordDescriptor);
impl DHTRecordDescriptor {
pub fn new(
pub(crate) fn new(
key: TypedKey,
owner: PublicKey,
owner_secret: Option<SecretKey>,

View file

@ -16,25 +16,56 @@ pub struct DHTRecordReport {
/// The subkeys that have been writen offline that still need to be flushed
offline_subkeys: ValueSubkeyRangeSet,
/// The sequence numbers of each subkey requested from a locally stored DHT Record
local_seqs: Vec<ValueSeqNum>,
local_seqs: Vec<Option<ValueSeqNum>>,
/// The sequence numbers of each subkey requested from the DHT over the network
network_seqs: Vec<ValueSeqNum>,
network_seqs: Vec<Option<ValueSeqNum>>,
}
from_impl_to_jsvalue!(DHTRecordReport);
impl DHTRecordReport {
pub fn new(
pub(crate) fn new(
subkeys: ValueSubkeyRangeSet,
offline_subkeys: ValueSubkeyRangeSet,
local_seqs: Vec<ValueSeqNum>,
network_seqs: Vec<ValueSeqNum>,
) -> Self {
Self {
local_seqs: Vec<Option<ValueSeqNum>>,
network_seqs: Vec<Option<ValueSeqNum>>,
) -> VeilidAPIResult<Self> {
if subkeys.is_full() {
apibail_invalid_argument!("subkeys range should be exact", "subkeys", subkeys);
}
if subkeys.is_empty() {
apibail_invalid_argument!("subkeys range should not be empty", "subkeys", subkeys);
}
if subkeys.len() > MAX_INSPECT_VALUE_A_SEQS_LEN as u64 {
apibail_invalid_argument!("subkeys range is too large", "subkeys", subkeys);
}
if subkeys.len() != local_seqs.len() as u64 {
apibail_invalid_argument!(
"local seqs list does not match subkey length",
"local_seqs",
local_seqs.len()
);
}
if subkeys.len() != network_seqs.len() as u64 {
apibail_invalid_argument!(
"network seqs list does not match subkey length",
"network_seqs",
network_seqs.len()
);
}
if !offline_subkeys.is_subset(&subkeys) {
apibail_invalid_argument!(
"offline subkeys is not a subset of the whole subkey set",
"offline_subkeys",
offline_subkeys
);
}
Ok(Self {
subkeys,
offline_subkeys,
local_seqs,
network_seqs,
}
})
}
pub fn subkeys(&self) -> &ValueSubkeyRangeSet {
@ -44,26 +75,28 @@ impl DHTRecordReport {
&self.offline_subkeys
}
#[must_use]
pub fn local_seqs(&self) -> &[ValueSeqNum] {
pub fn local_seqs(&self) -> &[Option<ValueSeqNum>] {
&self.local_seqs
}
#[must_use]
pub fn network_seqs(&self) -> &[ValueSeqNum] {
pub fn network_seqs(&self) -> &[Option<ValueSeqNum>] {
&self.network_seqs
}
pub fn changed_subkeys(&self) -> ValueSubkeyRangeSet {
let mut changed = ValueSubkeyRangeSet::new();
pub fn newer_online_subkeys(&self) -> ValueSubkeyRangeSet {
let mut newer_online = ValueSubkeyRangeSet::new();
for ((sk, lseq), nseq) in self
.subkeys
.iter()
.zip(self.local_seqs.iter())
.zip(self.network_seqs.iter())
{
if nseq > lseq {
changed.insert(sk);
if let Some(nseq) = nseq {
if lseq.is_none() || *nseq > lseq.unwrap() {
newer_online.insert(sk);
}
}
}
changed
newer_online
}
}

View file

@ -19,7 +19,7 @@ pub type ValueSubkey = u32;
#[cfg_attr(all(target_arch = "wasm32", target_os = "unknown"), declare)]
pub type ValueSeqNum = u32;
pub(crate) fn debug_seqs(seqs: &[ValueSeqNum]) -> String {
pub(crate) fn debug_seqs(seqs: &[Option<ValueSeqNum>]) -> String {
let mut col = 0;
let mut out = String::new();
let mut left = seqs.len();
@ -27,10 +27,10 @@ pub(crate) fn debug_seqs(seqs: &[ValueSeqNum]) -> String {
if col == 0 {
out += " ";
}
let sc = if *s == ValueSeqNum::MAX {
"-".to_owned()
} else {
let sc = if let Some(s) = s {
s.to_string()
} else {
"-".to_owned()
};
out += &sc;
out += ",";

View file

@ -53,6 +53,24 @@ impl ValueSubkeyRangeSet {
Self::new_with_data(&self.data | &other.data)
}
#[must_use]
#[allow(clippy::unnecessary_cast)]
pub fn len(&self) -> u64 {
self.data.len() as u64
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
#[must_use]
pub fn is_full(&self) -> bool {
self.data.ranges_len() == 1
&& self.data.first().unwrap() == u32::MIN
&& self.data.last().unwrap() == u32::MAX
}
#[must_use]
pub fn data(&self) -> &RangeSetBlaze<ValueSubkey> {
&self.data