more inspectvalue, en route to rehydration

This commit is contained in:
Christien Rioux 2024-03-10 23:48:59 -04:00
parent 7a1e6f96e6
commit ea74d646f8
11 changed files with 405 additions and 229 deletions

View File

@ -1,8 +1,8 @@
use super::*;
use crate::storage_manager::SignedValueDescriptor;
const MAX_INSPECT_VALUE_Q_SUBKEYS_LEN: usize = 512;
const MAX_INSPECT_VALUE_A_SEQS_LEN: usize = MAX_INSPECT_VALUE_Q_SUBKEYS_LEN;
const MAX_INSPECT_VALUE_Q_SUBKEY_RANGES_LEN: usize = 512;
const MAX_INSPECT_VALUE_A_SEQS_LEN: usize = 512;
const MAX_INSPECT_VALUE_A_PEERS_LEN: usize = 20;
#[derive(Clone)]
@ -34,12 +34,6 @@ impl RPCOperationInspectValueQ {
subkeys: ValueSubkeyRangeSet,
want_descriptor: bool,
) -> Result<Self, RPCError> {
// Needed because RangeSetBlaze uses different types here all the time
#[allow(clippy::unnecessary_cast)]
let subkeys_len = subkeys.len() as usize;
if subkeys_len > MAX_INSPECT_VALUE_Q_SUBKEYS_LEN {
return Err(RPCError::protocol("InspectValue subkeys length too long"));
}
Ok(Self {
key,
subkeys,
@ -70,7 +64,7 @@ impl RPCOperationInspectValueQ {
let key = decode_typed_key(&k_reader)?;
let sk_reader = reader.get_subkeys().map_err(RPCError::protocol)?;
// Maximum number of ranges that can hold the maximum number of subkeys is one subkey per range
if sk_reader.len() as usize > MAX_INSPECT_VALUE_Q_SUBKEYS_LEN {
if sk_reader.len() as usize > MAX_INSPECT_VALUE_Q_SUBKEY_RANGES_LEN {
return Err(RPCError::protocol("InspectValueQ too many subkey ranges"));
}
let mut subkeys = ValueSubkeyRangeSet::new();
@ -88,11 +82,6 @@ impl RPCOperationInspectValueQ {
}
subkeys.ranges_insert(vskr.0..=vskr.1);
}
// Needed because RangeSetBlaze uses different types here all the time
#[allow(clippy::unnecessary_cast)]
if subkeys.len() as usize > MAX_INSPECT_VALUE_Q_SUBKEYS_LEN {
return Err(RPCError::protocol("InspectValueQ too many subkey ranges"));
}
let want_descriptor = reader.reborrow().get_want_descriptor();
Ok(Self {
@ -218,6 +207,11 @@ impl RPCOperationInspectValueA {
) -> Result<Self, RPCError> {
let seqs = if reader.has_seqs() {
let seqs_reader = reader.get_seqs().map_err(RPCError::protocol)?;
if seqs_reader.len() as usize > MAX_INSPECT_VALUE_A_SEQS_LEN {
return Err(RPCError::protocol(
"decoded InspectValueA seqs length too long",
));
}
let Some(seqs) = seqs_reader.as_slice().map(|s| s.to_vec()) else {
return Err(RPCError::protocol("invalid decoded InspectValueA seqs"));
};

View File

@ -8,6 +8,19 @@ where
result: Option<Result<R, RPCError>>,
}
#[derive(Debug, Copy, Clone)]
pub(crate) enum FanoutResultKind {
Timeout,
Finished,
Exhausted,
}
#[derive(Debug, Clone)]
pub(crate) struct FanoutResult {
pub kind: FanoutResultKind,
pub value_nodes: Vec<NodeRef>,
}
pub(crate) type FanoutCallReturnType = RPCNetworkResult<Vec<PeerInfo>>;
pub(crate) type FanoutNodeInfoFilter = Arc<dyn Fn(&[TypedKey], &NodeInfo) -> bool + Send + Sync>;

View File

@ -14,10 +14,12 @@ struct OutboundGetValueContext {
/// The result of the outbound_get_value operation
pub(super) struct OutboundGetValueResult {
/// Fanout result
pub fanout_result: FanoutResult,
/// Consensus count for this operation,
pub consensus_count: usize,
/// The subkey that was retrieved
pub get_result: GetResult,
/// And where it was retrieved from
pub value_nodes: Vec<NodeRef>,
}
impl StorageManager {
@ -79,7 +81,13 @@ impl StorageManager {
if let Some(descriptor) = gva.answer.descriptor {
let mut ctx = context.lock();
if ctx.descriptor.is_none() && ctx.schema.is_none() {
ctx.schema = Some(descriptor.schema().map_err(RPCError::invalid_format)?);
let schema = match descriptor.schema() {
Ok(v) => v,
Err(e) => {
return Ok(NetworkResult::invalid_message(e));
}
};
ctx.schema = Some(schema);
ctx.descriptor = Some(Arc::new(descriptor));
}
}
@ -173,65 +181,36 @@ impl StorageManager {
check_done,
);
match fanout_call.run(vec![]).await {
let kind = match fanout_call.run(vec![]).await {
// If we don't finish in the timeout (too much time passed checking for consensus)
TimeoutOr::Timeout => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "GetValue Fanout Timeout Consensus");
} else {
log_stor!(debug "GetValue Fanout Timeout Non-Consensus: {}", ctx.value_nodes.len());
}
Ok(OutboundGetValueResult {
get_result: GetResult {
opt_value: ctx.value.clone(),
opt_descriptor: ctx.descriptor.clone(),
},
value_nodes: ctx.value_nodes.clone(),
})
}
// If we finished with consensus (enough nodes returning the same value)
TimeoutOr::Value(Ok(Some(()))) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "GetValue Fanout Consensus");
} else {
log_stor!(debug "GetValue Fanout Non-Consensus: {}", ctx.value_nodes.len());
}
Ok(OutboundGetValueResult {
get_result: GetResult {
opt_value: ctx.value.clone(),
opt_descriptor: ctx.descriptor.clone(),
},
value_nodes: ctx.value_nodes.clone(),
})
}
// If we finished without consensus (ran out of nodes before getting consensus)
TimeoutOr::Value(Ok(None)) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "GetValue Fanout Exhausted Consensus");
} else {
log_stor!(debug "GetValue Fanout Exhausted Non-Consensus: {}", ctx.value_nodes.len());
}
Ok(OutboundGetValueResult {
get_result: GetResult {
opt_value: ctx.value.clone(),
opt_descriptor: ctx.descriptor.clone(),
},
value_nodes: ctx.value_nodes.clone(),
})
}
TimeoutOr::Timeout => FanoutResultKind::Timeout,
// If we finished with or without consensus (enough nodes returning the same value)
TimeoutOr::Value(Ok(Some(()))) => FanoutResultKind::Finished,
// If we ran out of nodes before getting consensus)
TimeoutOr::Value(Ok(None)) => FanoutResultKind::Exhausted,
// Failed
TimeoutOr::Value(Err(e)) => {
// If we finished with an error, return that
log_stor!(debug "GetValue Fanout Error: {}", e);
Err(e.into())
}
return Err(e.into());
}
};
let ctx = context.lock();
let fanout_result = FanoutResult {
kind,
value_nodes: ctx.value_nodes.clone(),
};
log_stor!(debug "GetValue Fanout: {:?}", fanout_result);
Ok(OutboundGetValueResult {
fanout_result,
consensus_count,
get_result: GetResult {
opt_value: ctx.value.clone(),
opt_descriptor: ctx.descriptor.clone(),
},
})
}
/// Handle a received 'Get Value' query

View File

@ -1,23 +1,53 @@
use super::*;
/// The fully parsed descriptor
struct DescriptorInfo {
/// The descriptor itself
descriptor: Arc<SignedValueDescriptor>,
/// The in-schema subkeys that overlap the inspected range
subkeys: ValueSubkeyRangeSet,
}
impl DescriptorInfo {
pub fn new(
descriptor: Arc<SignedValueDescriptor>,
subkeys: &ValueSubkeyRangeSet,
) -> VeilidAPIResult<Self> {
let schema = descriptor.schema().map_err(RPCError::invalid_format)?;
let subkeys = subkeys.intersect(&ValueSubkeyRangeSet::single_range(0, schema.max_subkey()));
Ok(Self {
descriptor,
subkeys,
})
}
}
/// Info tracked per subkey
struct SubkeySeqCount {
/// The newest sequence number found for a subkey
pub seq: ValueSeqNum,
/// The nodes that have returned the value so far (up to the consensus count)
pub value_nodes: Vec<NodeRef>,
}
/// The context of the outbound_get_value operation
struct OutboundInspectValueContext {
/// The combined sequence map so far
pub seqs: Vec<ValueSeqNum>,
/// The nodes that have returned the value so far (up to the consensus count)
pub value_nodes: Vec<NodeRef>,
/// The combined sequence numbers and result counts so far
pub seqcounts: Vec<SubkeySeqCount>,
/// The descriptor if we got a fresh one or empty if no descriptor was needed
pub descriptor: Option<Arc<SignedValueDescriptor>>,
/// The parsed schema from the descriptor if we have one
pub schema: Option<DHTSchema>,
pub opt_descriptor_info: Option<DescriptorInfo>,
}
/// The result of the outbound_get_value operation
pub(super) struct OutboundInspectValueResult {
/// The subkey that was retrieved
/// Fanout results for each subkey
pub fanout_results: Vec<FanoutResult>,
/// Required count for consensus for this operation,
pub consensus_count: usize,
/// The inspection that was retrieved
pub inspect_result: InspectResult,
/// And where it was retrieved from
pub value_nodes: Vec<NodeRef>,
}
impl StorageManager {
@ -28,39 +58,62 @@ impl StorageManager {
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
safety_selection: SafetySelection,
last_inspect_result: InspectResult,
mut local_inspect_result: InspectResult,
use_set_scope: bool,
) -> VeilidAPIResult<OutboundInspectValueResult> {
let routing_table = rpc_processor.routing_table();
// Get the DHT parameters for 'InspectValue' (the same as for 'GetValue')
// Get the DHT parameters for 'InspectValue'
// Can use either 'get scope' or 'set scope' depending on the purpose of the inspection
let (key_count, consensus_count, fanout, timeout_us) = {
let c = self.unlocked_inner.config.get();
if use_set_scope {
// If we're simulating a set, increase the previous sequence number we have by 1
for seq in &mut local_inspect_result.seqs {
*seq += 1;
}
(
c.network.dht.max_find_node_count as usize,
c.network.dht.set_value_count as usize,
c.network.dht.set_value_fanout as usize,
TimestampDuration::from(ms_to_us(c.network.dht.set_value_timeout_ms)),
)
} else {
(
c.network.dht.max_find_node_count as usize,
c.network.dht.get_value_count as usize,
c.network.dht.get_value_fanout as usize,
TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)),
)
}
};
// Make do-inspect-value answer context
let schema = if let Some(d) = &last_inspect_result.opt_descriptor {
Some(d.schema()?)
let opt_descriptor_info = if let Some(descriptor) = &local_inspect_result.opt_descriptor {
Some(DescriptorInfo::new(descriptor.clone(), &subkeys)?)
} else {
None
};
let context = Arc::new(Mutex::new(OutboundInspectValueContext {
seqs: last_inspect_result.seqs,
seqcounts: local_inspect_result
.seqs
.iter()
.map(|s| SubkeySeqCount {
seq: *s,
value_nodes: vec![],
descriptor: last_inspect_result.opt_descriptor.clone(),
schema,
})
.collect(),
opt_descriptor_info,
}));
// Routine to call to generate fanout
let call_routine = |next_node: NodeRef| {
let rpc_processor = rpc_processor.clone();
let context = context.clone();
let last_descriptor = last_inspect_result.opt_descriptor.clone();
let opt_descriptor = local_inspect_result.opt_descriptor.clone();
let subkeys = subkeys.clone();
async move {
let iva = network_result_try!(
@ -70,7 +123,7 @@ impl StorageManager {
Destination::direct(next_node.clone()).with_safety(safety_selection),
key,
subkeys.clone(),
last_descriptor.map(|x| (*x).clone()),
opt_descriptor.map(|x| (*x).clone()),
)
.await?
);
@ -79,9 +132,15 @@ impl StorageManager {
// already be validated by rpc_call_inspect_value
if let Some(descriptor) = iva.answer.descriptor {
let mut ctx = context.lock();
if ctx.descriptor.is_none() && ctx.schema.is_none() {
ctx.schema = Some(descriptor.schema().map_err(RPCError::invalid_format)?);
ctx.descriptor = Some(Arc::new(descriptor));
if ctx.opt_descriptor_info.is_none() {
let descriptor_info =
match DescriptorInfo::new(Arc::new(descriptor.clone()), &subkeys) {
Ok(v) => v,
Err(e) => {
return Ok(NetworkResult::invalid_message(e));
}
};
ctx.opt_descriptor_info = Some(descriptor_info);
}
}
@ -90,58 +149,71 @@ impl StorageManager {
log_stor!(debug "Got seqs back: len={}", iva.answer.seqs.len());
let mut ctx = context.lock();
// Ensure we have a schema and descriptor
let (Some(_descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else {
// Ensure we have a schema and descriptor etc
let Some(descriptor_info) = &ctx.opt_descriptor_info else {
// Got a value but no descriptor for it
// Move to the next node
return Ok(NetworkResult::invalid_message(
"Got value with no descriptor",
"Got inspection with no descriptor",
));
};
// Get number of subkeys from schema and ensure we are getting the
// right number of sequence numbers betwen that and what we asked for
let in_schema_subkeys = subkeys
.intersect(&ValueSubkeyRangeSet::single_range(0, schema.max_subkey()));
if iva.answer.seqs.len() != in_schema_subkeys.len() {
if iva.answer.seqs.len() != descriptor_info.subkeys.len() {
// Not the right number of sequence numbers
// Move to the next node
return Ok(NetworkResult::invalid_message(format!(
"wrong number of seqs returned {} (wanted {})",
iva.answer.seqs.len(),
in_schema_subkeys.len()
descriptor_info.subkeys.len()
)));
}
// If we have a prior seqs list, merge in the new seqs
if ctx.seqs.is_empty() {
ctx.seqs = iva.answer.seqs.clone();
if ctx.seqcounts.is_empty() {
ctx.seqcounts = iva
.answer
.seqs
.iter()
.map(|s| SubkeySeqCount {
seq: *s,
// One node has shown us the newest sequence numbers so far
ctx.value_nodes = vec![next_node];
value_nodes: vec![next_node.clone()],
})
.collect();
} else {
if ctx.seqs.len() != iva.answer.seqs.len() {
if ctx.seqcounts.len() != iva.answer.seqs.len() {
return Err(RPCError::internal(
"seqs list length should always be equal by now",
));
}
let mut newer_seq = false;
for pair in ctx.seqs.iter_mut().zip(iva.answer.seqs.iter()) {
for pair in ctx.seqcounts.iter_mut().zip(iva.answer.seqs.iter()) {
let ctx_seqcnt = pair.0;
let answer_seq = *pair.1;
// If we already have consensus for this subkey, don't bother updating it any more
// While we may find a better sequence number if we keep looking, this does not mimic the behavior
// of get and set unless we stop here
if ctx_seqcnt.value_nodes.len() >= consensus_count {
continue;
}
// If the new seq isn't undefined and is better than the old seq (either greater or old is undefined)
// Then take that sequence number and note that we have gotten newer sequence numbers so we keep
// looking for consensus
if *pair.1 != ValueSeqNum::MAX
&& (*pair.0 == ValueSeqNum::MAX || pair.1 > pair.0)
// If the sequence number matches the old sequence number, then we keep the value node for reference later
if answer_seq != ValueSeqNum::MAX {
if ctx_seqcnt.seq == ValueSeqNum::MAX || answer_seq > ctx_seqcnt.seq
{
newer_seq = true;
*pair.0 = *pair.1;
}
}
if newer_seq {
// One node has shown us the latest sequence numbers so far
ctx.value_nodes = vec![next_node];
} else {
// Increase the consensus count for the seqs list
ctx.value_nodes.push(next_node);
ctx_seqcnt.seq = answer_seq;
ctx_seqcnt.value_nodes = vec![next_node.clone()];
} else if answer_seq == ctx_seqcnt.seq {
// Keep the nodes that showed us the latest values
ctx_seqcnt.value_nodes.push(next_node.clone());
}
}
}
}
}
@ -155,12 +227,16 @@ impl StorageManager {
// Routine to call to check if we're done at each step
let check_done = |_closest_nodes: &[NodeRef]| {
// If we have reached sufficient consensus, return done
// If we have reached sufficient consensus on all subkeys, return done
let ctx = context.lock();
if !ctx.seqs.is_empty()
&& ctx.descriptor.is_some()
&& ctx.value_nodes.len() >= consensus_count
{
let mut has_consensus = true;
for cs in ctx.seqcounts.iter() {
if cs.value_nodes.len() < consensus_count {
has_consensus = false;
break;
}
}
if !ctx.seqcounts.is_empty() && ctx.opt_descriptor_info.is_some() && has_consensus {
return Some(());
}
None
@ -178,65 +254,54 @@ impl StorageManager {
check_done,
);
match fanout_call.run(vec![]).await {
let kind = match fanout_call.run(vec![]).await {
// If we don't finish in the timeout (too much time passed checking for consensus)
TimeoutOr::Timeout => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "InspectValue Fanout Timeout Consensus");
} else {
log_stor!(debug "InspectValue Fanout Timeout Non-Consensus: {}", ctx.value_nodes.len());
}
Ok(OutboundInspectValueResult {
inspect_result: InspectResult {
seqs: ctx.seqs.clone(),
opt_descriptor: ctx.descriptor.clone(),
},
value_nodes: ctx.value_nodes.clone(),
})
}
// If we finished with consensus (enough nodes returning the same value)
TimeoutOr::Value(Ok(Some(()))) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "InspectValue Fanout Consensus");
} else {
log_stor!(debug "InspectValue Fanout Non-Consensus: {}", ctx.value_nodes.len());
}
Ok(OutboundInspectValueResult {
inspect_result: InspectResult {
seqs: ctx.seqs.clone(),
opt_descriptor: ctx.descriptor.clone(),
},
value_nodes: ctx.value_nodes.clone(),
})
}
// If we finished without consensus (ran out of nodes before getting consensus)
TimeoutOr::Value(Ok(None)) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "InspectValue Fanout Exhausted Consensus");
} else {
log_stor!(debug "InspectValue Fanout Exhausted Non-Consensus: {}", ctx.value_nodes.len());
}
Ok(OutboundInspectValueResult {
inspect_result: InspectResult {
seqs: ctx.seqs.clone(),
opt_descriptor: ctx.descriptor.clone(),
},
value_nodes: ctx.value_nodes.clone(),
})
}
TimeoutOr::Timeout => FanoutResultKind::Timeout,
// If we finished with or without consensus (enough nodes returning the same value)
TimeoutOr::Value(Ok(Some(()))) => FanoutResultKind::Finished,
// If we ran out of nodes before getting consensus)
TimeoutOr::Value(Ok(None)) => FanoutResultKind::Exhausted,
// Failed
TimeoutOr::Value(Err(e)) => {
// If we finished with an error, return that
log_stor!(debug "InspectValue Fanout Error: {}", e);
Err(e.into())
return Err(e.into());
}
};
let ctx = context.lock();
let mut fanout_results = vec![];
for cs in &ctx.seqcounts {
let has_consensus = cs.value_nodes.len() > consensus_count;
let fanout_result = FanoutResult {
kind: if has_consensus {
FanoutResultKind::Finished
} else {
kind
},
value_nodes: cs.value_nodes.clone(),
};
fanout_results.push(fanout_result);
}
log_stor!(debug "InspectValue Fanout ({:?}): {:?}", kind, fanout_results.iter().map(|fr| (fr.kind, fr.value_nodes.len())).collect::<Vec<_>>());
Ok(OutboundInspectValueResult {
fanout_results,
consensus_count,
inspect_result: InspectResult {
subkeys: ctx
.opt_descriptor_info
.as_ref()
.map(|d| d.subkeys.clone())
.unwrap_or_default(),
seqs: ctx.seqcounts.iter().map(|cs| cs.seq).collect(),
opt_descriptor: ctx
.opt_descriptor_info
.as_ref()
.map(|d| d.descriptor.clone()),
},
})
}
/// Handle a received 'Inspect Value' query

View File

@ -408,7 +408,7 @@ impl StorageManager {
// Keep the list of nodes that returned a value for later reference
let mut inner = self.lock().await?;
inner.set_value_nodes(key, result.value_nodes)?;
inner.set_value_nodes(key, result.fanout_result.value_nodes)?;
// If we got a new value back then write it to the opened record
if Some(get_result_value.value_data().seq()) != opt_last_seq {
@ -541,7 +541,7 @@ impl StorageManager {
// Keep the list of nodes that returned a value for later reference
let mut inner = self.lock().await?;
inner.set_value_nodes(key, result.value_nodes)?;
inner.set_value_nodes(key, result.fanout_result.value_nodes)?;
// Return the new value if it differs from what was asked to set
if result.signed_value_data.value_data() != signed_value_data.value_data() {
@ -732,6 +732,71 @@ impl StorageManager {
Ok(true)
}
/// Inspect an opened DHT record for its subkey sequence numbers
pub async fn inspect_record(
&self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
scope: DHTReportScope,
) -> VeilidAPIResult<DHTRecordReport> {
let mut inner = self.lock().await?;
let safety_selection = {
let Some(opened_record) = inner.opened_records.get(&key) else {
apibail_generic!("record not open");
};
opened_record.safety_selection()
};
// See if the requested record is our local record store
let local_inspect_result = inner
.handle_inspect_local_value(key, subkeys.clone(), true)
.await?;
// If this is the maximum scope we're interested in, return the report
if matches!(scope, DHTReportScope::Local) {
return Ok(DHTRecordReport::new(
local_inspect_result.subkeys,
local_inspect_result.seqs,
));
}
// Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = Self::online_ready_inner(&inner) else {
apibail_try_again!("offline, try again later");
};
// Drop the lock for network access
drop(inner);
// Get the inspect record report from the network
let result = self
.outbound_inspect_value(
rpc_processor,
key,
subkeys,
safety_selection,
local_inspect_result,
matches!(scope, DHTReportScope::NetworkSet),
)
.await?;
// See if we got a seqs list back
if result.inspect_result.seqs.is_empty() {
// If we got nothing back then we also had nothing beforehand, return nothing
return Ok(DHTRecordReport::default());
};
// Keep the list of nodes that returned a value for later reference
// xxx switch 'value nodes' to keeping ranges of subkeys per node
// let mut inner = self.lock().await?;
// inner.set_value_nodes(key, result.fanout_results.value_nodes)?;
Ok(DHTRecordReport::new(
result.inspect_result.subkeys,
result.inspect_result.seqs,
))
}
// Send single value change out to the network
#[instrument(level = "trace", skip(self), err)]
async fn send_value_change(&self, vc: ValueChangedInfo) -> VeilidAPIResult<()> {

View File

@ -89,6 +89,8 @@ pub struct GetResult {
/// The result of the do_inspect_value_operation
#[derive(Default, Debug)]
pub struct InspectResult {
/// The actual in-schema subkey range being reported on
pub subkeys: ValueSubkeyRangeSet,
/// The sequence map
pub seqs: Vec<ValueSeqNum>,
/// The descriptor if we got a fresh one or empty if no descriptor was needed
@ -811,6 +813,7 @@ where
};
if let Some(seqs) = self.seqs_cache.get(&sck) {
return Ok(Some(InspectResult {
subkeys,
seqs: seqs.clone(),
opt_descriptor,
}));
@ -845,6 +848,7 @@ where
}
Ok(Some(InspectResult {
subkeys,
seqs,
opt_descriptor,
}))

View File

@ -14,10 +14,12 @@ struct OutboundSetValueContext {
/// The result of the outbound_set_value operation
pub(super) struct OutboundSetValueResult {
/// Fanout result
pub fanout_result: FanoutResult,
/// Consensus count for this operation,
pub consensus_count: usize,
/// The value that was set
pub signed_value_data: Arc<SignedValueData>,
/// And where it was set to
pub value_nodes: Vec<NodeRef>,
}
impl StorageManager {
@ -164,57 +166,32 @@ impl StorageManager {
check_done,
);
match fanout_call.run(vec![]).await {
let kind = match fanout_call.run(vec![]).await {
// If we don't finish in the timeout (too much time passed checking for consensus)
TimeoutOr::Timeout => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "SetValue Fanout Timeout Consensus");
} else {
log_stor!(debug "SetValue Fanout Timeout Non-Consensus: {}", ctx.value_nodes.len());
}
Ok(OutboundSetValueResult {
signed_value_data: ctx.value.clone(),
value_nodes: ctx.value_nodes.clone(),
})
}
TimeoutOr::Timeout => FanoutResultKind::Timeout,
// If we finished with or without consensus (enough nodes returning the same value)
TimeoutOr::Value(Ok(Some(()))) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "SetValue Fanout Consensus");
} else {
log_stor!(debug "SetValue Fanout Non-Consensus: {}", ctx.value_nodes.len());
}
Ok(OutboundSetValueResult {
signed_value_data: ctx.value.clone(),
value_nodes: ctx.value_nodes.clone(),
})
}
TimeoutOr::Value(Ok(Some(()))) => FanoutResultKind::Finished,
// If we ran out of nodes before getting consensus)
TimeoutOr::Value(Ok(None)) => {
// Return the best answer we've got
let ctx = context.lock();
if ctx.value_nodes.len() >= consensus_count {
log_stor!(debug "SetValue Fanout Exhausted Consensus");
} else {
log_stor!(debug "SetValue Fanout Exhausted Non-Consensus: {}", ctx.value_nodes.len());
}
Ok(OutboundSetValueResult {
signed_value_data: ctx.value.clone(),
value_nodes: ctx.value_nodes.clone(),
})
}
TimeoutOr::Value(Ok(None)) => FanoutResultKind::Exhausted,
// Failed
TimeoutOr::Value(Err(e)) => {
// If we finished with an error, return that
log_stor!(debug "SetValue Fanout Error: {}", e);
Err(e.into())
}
return Err(e.into());
}
};
let ctx = context.lock();
let fanout_result = FanoutResult {
kind,
value_nodes: ctx.value_nodes.clone(),
};
log_stor!(debug "SetValue Fanout: {:?}", fanout_result);
Ok(OutboundSetValueResult {
fanout_result,
consensus_count,
signed_value_data: ctx.value.clone(),
})
}
/// Handle a received 'Set Value' query

View File

@ -568,6 +568,7 @@ impl StorageManagerInner {
}
Ok(InspectResult {
subkeys: ValueSubkeyRangeSet::new(),
seqs: vec![],
opt_descriptor: None,
})
@ -648,6 +649,7 @@ impl StorageManagerInner {
}
Ok(InspectResult {
subkeys: ValueSubkeyRangeSet::new(),
seqs: vec![],
opt_descriptor: None,
})

View File

@ -410,6 +410,35 @@ impl RoutingContext {
storage_manager.cancel_watch_values(key, subkeys).await
}
/// Inspects a DHT record for subkey state.
///
/// * `key` is the record key to watch. it must first be opened for reading or writing.
/// * `subkeys` is the the range of subkeys to inspect. The range must not exceed 512 discrete non-overlapping or adjacent subranges.
/// If no range is specified, this is equivalent to inspecting the entire range of subkeys. In total, the list of subkeys returned will be truncated at 512 elements.
/// * `scope` is what kind of range the inspection has:
/// If scope is set to DHTReportScope::Local, results will be only for a locally stored record
/// If scope is set to DHTReportScope::NetworkGet, results will be as if each subkey were get over the network as a GetValue
/// If scope is set to DHTReportScope::NetworkSet, results will be as if each subkey were set over the network as a SetValue
///
/// This is useful for checking if you should push new subkeys to the network, or retrieve the current state of a record from the network
/// to see what needs updating locally.
///
/// Returns a DHTRecordReport with the subkey ranges that were returned that overlapped the schema, and sequence numbers for each of the subkeys in the range.
#[instrument(target = "veilid_api", level = "debug", ret, err)]
pub async fn inspect_dht_record(
&self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
scope: DHTReportScope,
) -> VeilidAPIResult<DHTRecordReport> {
event!(target: "veilid_api", Level::DEBUG,
"RoutingContext::inspect_dht_record(self: {:?}, key: {:?}, subkeys: {:?}, scope: {:?})", self, key, subkeys, scope);
Crypto::validate_crypto_kind(key.kind)?;
let storage_manager = self.api.storage_manager()?;
storage_manager.inspect_record(key, subkeys, scope).await
}
///////////////////////////////////
/// Block Store

View File

@ -0,0 +1,46 @@
use super::*;
/// DHT Record Report
#[derive(
Debug, Default, Clone, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize, JsonSchema,
)]
#[cfg_attr(
target_arch = "wasm32",
derive(Tsify),
tsify(from_wasm_abi, into_wasm_abi)
)]
pub struct DHTRecordReport {
/// The actual subkey range within the schema being reported on
/// This may be a subset of the requested range if it exceeds the schema limits
/// or has more than 512 subkeys
subkeys: ValueSubkeyRangeSet,
/// The sequence numbers of each subkey requested from a DHT Record
seqs: Vec<ValueSeqNum>,
}
from_impl_to_jsvalue!(DHTRecordReport);
impl DHTRecordReport {
pub fn new(subkeys: ValueSubkeyRangeSet, seqs: Vec<ValueSeqNum>) -> Self {
Self { subkeys, seqs }
}
pub fn seqs(&self) -> &[ValueSeqNum] {
&self.seqs
}
}
/// DHT Record Report Scope
#[derive(
Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema,
)]
#[cfg_attr(target_arch = "wasm32", derive(Tsify), tsify(from_wasm_abi, namespace))]
pub enum DHTReportScope {
Local = 0,
NetworkGet = 1,
NetworkSet = 2,
}
impl Default for DHTReportScope {
fn default() -> Self {
Self::Local
}
}

View File

@ -1,4 +1,5 @@
mod dht_record_descriptor;
mod dht_record_report;
mod schema;
mod value_data;
mod value_subkey_range_set;
@ -6,6 +7,7 @@ mod value_subkey_range_set;
use super::*;
pub use dht_record_descriptor::*;
pub use dht_record_report::*;
pub use schema::*;
pub use value_data::*;
pub use value_subkey_range_set::*;