more inspect work

This commit is contained in:
Christien Rioux 2024-03-11 22:47:16 -04:00
parent ea74d646f8
commit 7fdd5f9555
9 changed files with 289 additions and 89 deletions

View File

@ -16,8 +16,6 @@ struct OutboundGetValueContext {
pub(super) struct OutboundGetValueResult {
/// Fanout result
pub fanout_result: FanoutResult,
/// Consensus count for this operation,
pub consensus_count: usize,
/// The subkey that was retrieved
pub get_result: GetResult,
}
@ -205,7 +203,6 @@ impl StorageManager {
Ok(OutboundGetValueResult {
fanout_result,
consensus_count,
get_result: GetResult {
opt_value: ctx.value.clone(),
opt_descriptor: ctx.descriptor.clone(),

View File

@ -44,8 +44,6 @@ struct OutboundInspectValueContext {
pub(super) struct OutboundInspectValueResult {
/// Fanout results for each subkey
pub fanout_results: Vec<FanoutResult>,
/// Required count for consensus for this operation,
pub consensus_count: usize,
/// The inspection that was retrieved
pub inspect_result: InspectResult,
}
@ -58,7 +56,7 @@ impl StorageManager {
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
safety_selection: SafetySelection,
mut local_inspect_result: InspectResult,
local_inspect_result: InspectResult,
use_set_scope: bool,
) -> VeilidAPIResult<OutboundInspectValueResult> {
let routing_table = rpc_processor.routing_table();
@ -69,11 +67,6 @@ impl StorageManager {
let c = self.unlocked_inner.config.get();
if use_set_scope {
// If we're simulating a set, increase the previous sequence number we have by 1
for seq in &mut local_inspect_result.seqs {
*seq += 1;
}
(
c.network.dht.max_find_node_count as usize,
c.network.dht.set_value_count as usize,
@ -288,7 +281,6 @@ impl StorageManager {
Ok(OutboundInspectValueResult {
fanout_results,
consensus_count,
inspect_result: InspectResult {
subkeys: ctx
.opt_descriptor_info

View File

@ -408,7 +408,11 @@ impl StorageManager {
// Keep the list of nodes that returned a value for later reference
let mut inner = self.lock().await?;
inner.set_value_nodes(key, result.fanout_result.value_nodes)?;
inner.process_fanout_results(
key,
core::iter::once((subkey, &result.fanout_result)),
false,
)?;
// If we got a new value back then write it to the opened record
if Some(get_result_value.value_data().seq()) != opt_last_seq {
@ -541,7 +545,11 @@ impl StorageManager {
// Keep the list of nodes that returned a value for later reference
let mut inner = self.lock().await?;
inner.set_value_nodes(key, result.fanout_result.value_nodes)?;
inner.process_fanout_results(
key,
core::iter::once((subkey, &result.fanout_result)),
true,
)?;
// Return the new value if it differs from what was asked to set
if result.signed_value_data.value_data() != signed_value_data.value_data() {
@ -739,6 +747,12 @@ impl StorageManager {
subkeys: ValueSubkeyRangeSet,
scope: DHTReportScope,
) -> VeilidAPIResult<DHTRecordReport> {
let subkeys = if subkeys.is_empty() {
ValueSubkeyRangeSet::full()
} else {
subkeys
};
let mut inner = self.lock().await?;
let safety_selection = {
let Some(opened_record) = inner.opened_records.get(&key) else {
@ -748,15 +762,25 @@ impl StorageManager {
};
// See if the requested record is our local record store
let local_inspect_result = inner
let mut local_inspect_result = inner
.handle_inspect_local_value(key, subkeys.clone(), true)
.await?;
assert!(
local_inspect_result.subkeys.len() == local_inspect_result.seqs.len(),
"mismatch between local subkeys returned and sequence number list returned"
);
assert!(
local_inspect_result.subkeys.is_subset(&subkeys),
"mismatch between local subkeys returned and sequence number list returned"
);
// If this is the maximum scope we're interested in, return the report
if matches!(scope, DHTReportScope::Local) {
return Ok(DHTRecordReport::new(
local_inspect_result.subkeys,
local_inspect_result.seqs,
vec![],
));
}
@ -768,6 +792,13 @@ impl StorageManager {
// Drop the lock for network access
drop(inner);
// If we're simulating a set, increase the previous sequence number we have by 1
if matches!(scope, DHTReportScope::UpdateSet) {
for seq in &mut local_inspect_result.seqs {
*seq = seq.overflowing_add(1).0;
}
}
// Get the inspect record report from the network
let result = self
.outbound_inspect_value(
@ -775,24 +806,40 @@ impl StorageManager {
key,
subkeys,
safety_selection,
local_inspect_result,
matches!(scope, DHTReportScope::NetworkSet),
if matches!(scope, DHTReportScope::SyncGet | DHTReportScope::SyncSet) {
InspectResult::default()
} else {
local_inspect_result.clone()
},
matches!(scope, DHTReportScope::UpdateSet | DHTReportScope::SyncSet),
)
.await?;
// See if we got a seqs list back
if result.inspect_result.seqs.is_empty() {
// If we got nothing back then we also had nothing beforehand, return nothing
return Ok(DHTRecordReport::default());
};
// Sanity check before zip
assert!(
result.inspect_result.subkeys.len() == result.fanout_results.len(),
"mismatch between subkeys returned and fanout results returned"
);
assert!(
local_inspect_result.subkeys.is_empty()
|| result.inspect_result.subkeys.is_empty()
|| result.inspect_result.subkeys.len() == local_inspect_result.subkeys.len(),
"mismatch between local subkeys returned and network results returned"
);
// Keep the list of nodes that returned a value for later reference
// xxx switch 'value nodes' to keeping ranges of subkeys per node
// let mut inner = self.lock().await?;
// inner.set_value_nodes(key, result.fanout_results.value_nodes)?;
let mut inner = self.lock().await?;
let results_iter = result
.inspect_result
.subkeys
.iter()
.zip(result.fanout_results.iter());
inner.process_fanout_results(key, results_iter, false)?;
Ok(DHTRecordReport::new(
result.inspect_result.subkeys,
local_inspect_result.seqs,
result.inspect_result.seqs,
))
}

View File

@ -1,5 +1,13 @@
use super::*;
/// Information about nodes that cache a local record remotely
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub(in crate::storage_manager) struct PerNodeRecordDetail {
pub last_set: Timestamp,
pub last_seen: Timestamp,
pub subkeys: ValueSubkeyRangeSet,
}
/// Information required to handle locally opened records
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub(in crate::storage_manager) struct LocalRecordDetail {
@ -8,5 +16,14 @@ pub(in crate::storage_manager) struct LocalRecordDetail {
pub safety_selection: SafetySelection,
/// The nodes that we have seen this record cached on recently
#[serde(default)]
pub value_nodes: Vec<PublicKey>,
pub nodes: HashMap<PublicKey, PerNodeRecordDetail>,
}
impl LocalRecordDetail {
pub fn new(safety_selection: SafetySelection) -> Self {
Self {
safety_selection,
nodes: Default::default(),
}
}
}

View File

@ -78,7 +78,7 @@ where
}
/// The result of the do_get_value_operation
#[derive(Default, Debug)]
#[derive(Default, Clone, Debug)]
pub struct GetResult {
/// The subkey value if we got one
pub opt_value: Option<Arc<SignedValueData>>,
@ -87,7 +87,7 @@ pub struct GetResult {
}
/// The result of the do_inspect_value_operation
#[derive(Default, Debug)]
#[derive(Default, Clone, Debug)]
pub struct InspectResult {
/// The actual in-schema subkey range being reported on
pub subkeys: ValueSubkeyRangeSet,

View File

@ -16,8 +16,6 @@ struct OutboundSetValueContext {
pub(super) struct OutboundSetValueResult {
/// Fanout result
pub fanout_result: FanoutResult,
/// Consensus count for this operation,
pub consensus_count: usize,
/// The value that was set
pub signed_value_data: Arc<SignedValueData>,
}
@ -189,7 +187,6 @@ impl StorageManager {
Ok(OutboundSetValueResult {
fanout_result,
consensus_count,
signed_value_data: ctx.value.clone(),
})
}

View File

@ -32,6 +32,9 @@ pub(super) struct StorageManagerInner {
pub tick_future: Option<SendPinBoxFuture<()>>,
/// Update callback to send ValueChanged notification to
pub update_callback: Option<UpdateCallback>,
/// The maximum consensus count
set_consensus_count: usize,
}
fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
@ -72,6 +75,7 @@ fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
impl StorageManagerInner {
pub fn new(unlocked_inner: Arc<StorageManagerUnlockedInner>) -> Self {
let set_consensus_count = unlocked_inner.config.get().network.dht.set_value_count as usize;
Self {
unlocked_inner,
initialized: false,
@ -84,6 +88,7 @@ impl StorageManagerInner {
opt_routing_table: Default::default(),
tick_future: Default::default(),
update_callback: None,
set_consensus_count,
}
}
@ -242,10 +247,7 @@ impl StorageManagerInner {
// Add new local value record
let cur_ts = get_aligned_timestamp();
let local_record_detail = LocalRecordDetail {
safety_selection,
value_nodes: vec![],
};
let local_record_detail = LocalRecordDetail::new(safety_selection);
let record =
Record::<LocalRecordDetail>::new(cur_ts, signed_value_descriptor, local_record_detail)?;
@ -284,10 +286,7 @@ impl StorageManagerInner {
let local_record = Record::new(
cur_ts,
remote_record.descriptor().clone(),
LocalRecordDetail {
safety_selection,
value_nodes: vec![],
},
LocalRecordDetail::new(safety_selection),
)?;
local_record_store.new_record(key, local_record).await?;
@ -425,10 +424,7 @@ impl StorageManagerInner {
let record = Record::<LocalRecordDetail>::new(
get_aligned_timestamp(),
signed_value_descriptor,
LocalRecordDetail {
safety_selection,
value_nodes: vec![],
},
LocalRecordDetail::new(safety_selection),
)?;
local_record_store.new_record(key, record).await?;
@ -462,8 +458,8 @@ impl StorageManagerInner {
let opt_value_nodes = local_record_store.peek_record(key, |r| {
let d = r.detail();
d.value_nodes
.iter()
d.nodes
.keys()
.copied()
.filter_map(|x| {
routing_table
@ -477,21 +473,46 @@ impl StorageManagerInner {
Ok(opt_value_nodes)
}
pub fn set_value_nodes(
pub fn process_fanout_results<'a, I: IntoIterator<Item = (ValueSubkey, &'a FanoutResult)>>(
&mut self,
key: TypedKey,
value_nodes: Vec<NodeRef>,
subkey_results_iter: I,
is_set: bool,
) -> VeilidAPIResult<()> {
// Get local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
let cur_ts = get_aligned_timestamp();
local_record_store.with_record_mut(key, |r| {
let d = r.detail_mut();
d.value_nodes = value_nodes
.into_iter()
for (subkey, fanout_result) in subkey_results_iter {
for node_id in fanout_result
.value_nodes
.iter()
.filter_map(|x| x.node_ids().get(key.kind).map(|k| k.value))
.collect();
{
let pnd = d.nodes.entry(node_id).or_default();
if is_set || pnd.last_set == Timestamp::default() {
pnd.last_set = cur_ts;
}
pnd.last_seen = cur_ts;
pnd.subkeys.insert(subkey);
}
}
// Purge nodes down to the N most recently seen, where N is the consensus count for a set operation
let mut nodes_ts = d
.nodes
.iter()
.map(|kv| (*kv.0, kv.1.last_seen))
.collect::<Vec<_>>();
nodes_ts.sort_by(|a, b| b.1.cmp(&a.1));
for dead_node_key in nodes_ts.iter().skip(self.set_consensus_count) {
d.nodes.remove(&dead_node_key.0);
}
});
Ok(())
}

View File

@ -78,13 +78,24 @@ fn get_data(text: &str) -> Option<Vec<u8>> {
}
fn get_subkeys(text: &str) -> Option<ValueSubkeyRangeSet> {
if let Some(n) = get_number(text) {
Some(ValueSubkeyRangeSet::single(n.try_into().ok()?))
if let Some(n) = get_number::<u32>(text) {
Some(ValueSubkeyRangeSet::single(n))
} else {
ValueSubkeyRangeSet::from_str(text).ok()
}
}
fn get_dht_report_scope(text: &str) -> Option<DHTReportScope> {
match text.to_ascii_lowercase().trim() {
"local" => Some(DHTReportScope::Local),
"syncget" => Some(DHTReportScope::SyncGet),
"syncset" => Some(DHTReportScope::SyncSet),
"updateget" => Some(DHTReportScope::UpdateGet),
"updateset" => Some(DHTReportScope::UpdateSet),
_ => None,
}
}
fn get_route_id(
rss: RouteSpecStore,
allow_allocated: bool,
@ -287,8 +298,8 @@ fn get_destination(
}
}
fn get_number(text: &str) -> Option<usize> {
usize::from_str(text).ok()
fn get_number<T: num_traits::Num + FromStr>(text: &str) -> Option<T> {
T::from_str(text).ok()
}
fn get_typed_key(text: &str) -> Option<TypedKey> {
@ -1458,7 +1469,7 @@ impl VeilidAPI {
rc
};
// Do a record get
// Do a record create
let record = match rc.create_dht_record(schema, Some(csv.kind())).await {
Err(e) => return Ok(format!("Can't open DHT record: {}", e)),
Ok(v) => v,
@ -1532,7 +1543,7 @@ impl VeilidAPI {
1 + opt_arg_add,
"debug_record_set",
"subkey",
get_number,
get_number::<u32>,
)?;
let data =
get_debug_argument_at(&args, 2 + opt_arg_add, "debug_record_set", "data", get_data)?;
@ -1576,7 +1587,7 @@ impl VeilidAPI {
1 + opt_arg_add,
"debug_record_get",
"subkey",
get_number,
get_number::<u32>,
)?;
let force_refresh = if args.len() >= 4 {
Some(get_debug_argument_at(
@ -1642,15 +1653,21 @@ impl VeilidAPI {
let key =
get_debug_argument_at(&args, 1, "debug_record_info", "key", get_dht_key_no_safety)?;
let subkey =
get_debug_argument_at(&args, 2, "debug_record_info", "subkey", get_number).ok();
let subkey = get_debug_argument_at(
&args,
2,
"debug_record_info",
"subkey",
get_number::<ValueSubkey>,
)
.ok();
let out = if let Some(subkey) = subkey {
let li = storage_manager
.debug_local_record_subkey_info(key, subkey as ValueSubkey)
.debug_local_record_subkey_info(key, subkey)
.await;
let ri = storage_manager
.debug_remote_record_subkey_info(key, subkey as ValueSubkey)
.debug_remote_record_subkey_info(key, subkey)
.await;
format!(
"Local Subkey Info:\n{}\n\nRemote Subkey Info:\n{}\n",
@ -1672,6 +1689,8 @@ impl VeilidAPI {
};
let (key, rc) = get_opened_dht_record_context(&args, "debug_record_watch", "key", 1)?;
let mut rest_defaults = false;
let subkeys = get_debug_argument_at(
&args,
1 + opt_arg_add,
@ -1680,8 +1699,15 @@ impl VeilidAPI {
get_subkeys,
)
.ok()
.unwrap_or_default();
let expiration = get_debug_argument_at(
.unwrap_or_else(|| {
rest_defaults = true;
Default::default()
});
let expiration = if rest_defaults {
Default::default()
} else {
get_debug_argument_at(
&args,
2 + opt_arg_add,
"debug_record_watch",
@ -1689,8 +1715,15 @@ impl VeilidAPI {
parse_duration,
)
.ok()
.unwrap_or_default();
let count = get_debug_argument_at(
.unwrap_or_else(|| {
rest_defaults = true;
Default::default()
})
};
let count = if rest_defaults {
Default::default()
} else {
get_debug_argument_at(
&args,
3 + opt_arg_add,
"debug_record_watch",
@ -1698,7 +1731,11 @@ impl VeilidAPI {
get_number,
)
.ok()
.unwrap_or(usize::MAX) as u32;
.unwrap_or_else(|| {
rest_defaults = true;
u32::MAX
})
};
// Do a record watch
let ts = match rc
@ -1749,6 +1786,57 @@ impl VeilidAPI {
})
}
async fn debug_record_inspect(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let opt_arg_add = if args.len() >= 2 && get_dht_key_no_safety(&args[1]).is_some() {
1
} else {
0
};
let (key, rc) = get_opened_dht_record_context(&args, "debug_record_watch", "key", 1)?;
let mut rest_defaults = false;
let subkeys = get_debug_argument_at(
&args,
1 + opt_arg_add,
"debug_record_inspect",
"subkeys",
get_subkeys,
)
.ok()
.unwrap_or_else(|| {
rest_defaults = true;
Default::default()
});
let scope = if rest_defaults {
Default::default()
} else {
get_debug_argument_at(
&args,
2 + opt_arg_add,
"debug_record_inspect",
"scope",
get_dht_report_scope,
)
.ok()
.unwrap_or_else(|| {
rest_defaults = true;
Default::default()
})
};
// Do a record inspect
let report = match rc.inspect_dht_record(key, subkeys, scope).await {
Err(e) => {
return Ok(format!("Can't inspect DHT record: {}", e));
}
Ok(v) => v,
};
Ok(format!("Success: report={:?}", report))
}
async fn debug_record(&self, args: String) -> VeilidAPIResult<String> {
let args: Vec<String> =
shell_words::split(&args).map_err(|e| VeilidAPIError::parse_error(e, args))?;
@ -1777,6 +1865,8 @@ impl VeilidAPI {
self.debug_record_watch(args).await
} else if command == "cancel" {
self.debug_record_cancel(args).await
} else if command == "inspect" {
self.debug_record_inspect(args).await
} else {
Ok(">>> Unknown command\n".to_owned())
}
@ -1857,6 +1947,7 @@ record list <local|remote>
info [<key>] [subkey]
watch [<key>] [<subkeys>] [<expiration>] [<count>]
cancel [<key>] [<subkeys>]
inspect [<key>] [<subkeys>] [<scope>]
--------------------------------------------------------------------
<key> is: VLD0:GsgXCRPrzSK6oBNgxhNpm-rTYFd02R0ySx6j9vbQBG4
* also <node>, <relay>, <target>, <route>
@ -1874,6 +1965,7 @@ record list <local|remote>
<routingdomain> is: public|local
<cryptokind> is: VLD0
<dhtschema> is: a json dht schema, default is '{"kind":"DFLT","o_cnt":1}'
<scope> is: local, syncget, syncset, updateget, updateset
<subkey> is: a number: 2
<subkeys> is:
* a number: 2

View File

@ -14,18 +14,34 @@ pub struct DHTRecordReport {
/// This may be a subset of the requested range if it exceeds the schema limits
/// or has more than 512 subkeys
subkeys: ValueSubkeyRangeSet,
/// The sequence numbers of each subkey requested from a DHT Record
seqs: Vec<ValueSeqNum>,
/// The sequence numbers of each subkey requested from a locally stored DHT Record
local_seqs: Vec<ValueSeqNum>,
/// The sequence numbers of each subkey requested from the DHT over the network
network_seqs: Vec<ValueSeqNum>,
}
from_impl_to_jsvalue!(DHTRecordReport);
impl DHTRecordReport {
pub fn new(subkeys: ValueSubkeyRangeSet, seqs: Vec<ValueSeqNum>) -> Self {
Self { subkeys, seqs }
pub fn new(
subkeys: ValueSubkeyRangeSet,
local_seqs: Vec<ValueSeqNum>,
network_seqs: Vec<ValueSeqNum>,
) -> Self {
Self {
subkeys,
local_seqs,
network_seqs,
}
}
pub fn seqs(&self) -> &[ValueSeqNum] {
&self.seqs
pub fn subkeys(&self) -> &ValueSubkeyRangeSet {
&self.subkeys
}
pub fn local_seqs(&self) -> &[ValueSeqNum] {
&self.local_seqs
}
pub fn network_seqs(&self) -> &[ValueSeqNum] {
&self.network_seqs
}
}
@ -35,9 +51,30 @@ impl DHTRecordReport {
)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify), tsify(from_wasm_abi, namespace))]
pub enum DHTReportScope {
/// Return only the local copy sequence numbers
/// Useful for seeing what subkeys you have locally and which ones have not been retrieved
Local = 0,
NetworkGet = 1,
NetworkSet = 2,
/// Return the local sequence numbers and the network sequence numbers with GetValue fanout parameters
/// Provides an independent view of both the local sequence numbers and the network sequence numbers for nodes that
/// would be reached as if the local copy did not exist locally.
/// Useful for determining if the current local copy should be updated from the network.
SyncGet = 1,
/// Return the local sequence numbers and the network sequence numbers with SetValue fanout parameters
/// Provides an independent view of both the local sequence numbers and the network sequence numbers for nodes that
/// would be reached as if the local copy did not exist locally.
/// Useful for determining if the unchanged local copy should be pushed to the network.
SyncSet = 2,
/// Return the local sequence numbers and the network sequence numbers with GetValue fanout parameters
/// Provides an view of both the local sequence numbers and the network sequence numbers for nodes that
/// would be reached as if a GetValue operation were being performed, including accepting newer values from the network.
/// Useful for determining which subkeys would change with a GetValue operation
UpdateGet = 3,
/// Return the local sequence numbers and the network sequence numbers with SetValue fanout parameters
/// Provides an view of both the local sequence numbers and the network sequence numbers for nodes that
/// would be reached as if a SetValue operation were being performed, including accepting newer values from the network.
/// This simulates a SetValue with the initial sequence number incremented by 1, like a real SetValue would when updating.
/// Useful for determine which subkeys would change on an SetValue operation
UpdateSet = 4,
}
impl Default for DHTReportScope {
fn default() -> Self {