proper valuesubkeyrangeset truncation

This commit is contained in:
Christien Rioux 2024-03-12 22:36:04 -04:00
parent d67ef0eb2c
commit 64832748a9
12 changed files with 185 additions and 55 deletions

View File

@ -27,6 +27,7 @@ mod tunnel;
mod typed_key;
mod typed_signature;
pub(crate) use operations::MAX_INSPECT_VALUE_A_SEQS_LEN;
pub(in crate::rpc_processor) use operations::*;
pub(crate) use address::*;

View File

@ -62,3 +62,5 @@ pub(in crate::rpc_processor) use operation_complete_tunnel::*;
pub(in crate::rpc_processor) use operation_start_tunnel::*;
use super::*;
pub(crate) use operation_inspect_value::MAX_INSPECT_VALUE_A_SEQS_LEN;

View File

@ -2,7 +2,7 @@ use super::*;
use crate::storage_manager::SignedValueDescriptor;
const MAX_INSPECT_VALUE_Q_SUBKEY_RANGES_LEN: usize = 512;
const MAX_INSPECT_VALUE_A_SEQS_LEN: usize = 512;
pub(crate) const MAX_INSPECT_VALUE_A_SEQS_LEN: usize = 512;
const MAX_INSPECT_VALUE_A_PEERS_LEN: usize = 20;
#[derive(Clone)]

View File

@ -92,7 +92,7 @@ impl StorageManager {
// Keep the value if we got one and it is newer and it passes schema validation
if let Some(value) = gva.answer.value {
log_stor!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq());
log_dht!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq());
let mut ctx = context.lock();
// Ensure we have a schema and descriptor
@ -189,7 +189,7 @@ impl StorageManager {
// Failed
TimeoutOr::Value(Err(e)) => {
// If we finished with an error, return that
log_stor!(debug "GetValue Fanout Error: {}", e);
log_dht!(debug "GetValue Fanout Error: {}", e);
return Err(e.into());
}
};
@ -199,7 +199,7 @@ impl StorageManager {
kind,
value_nodes: ctx.value_nodes.clone(),
};
log_stor!(debug "GetValue Fanout: {:?}", fanout_result);
log_dht!(debug "GetValue Fanout: {:?}", fanout_result);
Ok(OutboundGetValueResult {
fanout_result,

View File

@ -139,7 +139,7 @@ impl StorageManager {
// Keep the value if we got one and it is newer and it passes schema validation
if !iva.answer.seqs.is_empty() {
log_stor!(debug "Got seqs back: len={}", iva.answer.seqs.len());
log_dht!(debug "Got seqs back: len={}", iva.answer.seqs.len());
let mut ctx = context.lock();
// Ensure we have a schema and descriptor etc
@ -257,7 +257,7 @@ impl StorageManager {
// Failed
TimeoutOr::Value(Err(e)) => {
// If we finished with an error, return that
log_stor!(debug "InspectValue Fanout Error: {}", e);
log_dht!(debug "InspectValue Fanout Error: {}", e);
return Err(e.into());
}
};
@ -277,7 +277,7 @@ impl StorageManager {
fanout_results.push(fanout_result);
}
log_stor!(debug "InspectValue Fanout ({:?}): {:?}", kind, fanout_results.iter().map(|fr| (fr.kind, fr.value_nodes.len())).collect::<Vec<_>>());
log_dht!(debug "InspectValue Fanout ({:?}): {:?}", kind, fanout_results.iter().map(|fr| (fr.kind, fr.value_nodes.len())).collect::<Vec<_>>());
Ok(OutboundInspectValueResult {
fanout_results,

View File

@ -0,0 +1,79 @@
use super::*;
const L2_CACHE_DEPTH: usize = 4; // XXX: i just picked this. we could probably do better than this someday
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct InspectCacheL2Value {
pub seqs: Vec<ValueSeqNum>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
struct InspectCacheL2 {
pub cache: LruCache<ValueSubkeyRangeSet, InspectCacheL2Value>,
}
impl InspectCacheL2 {
pub fn new(l2_cache_limit: usize) -> Self {
Self {
cache: LruCache::new(l2_cache_limit),
}
}
}
pub struct InspectCache {
cache: LruCache<TypedKey, InspectCacheL2>,
}
impl InspectCache {
pub fn new(l1_cache_limit: usize) -> Self {
Self {
cache: LruCache::new(l1_cache_limit),
}
}
pub fn get(
&mut self,
key: &TypedKey,
subkeys: &ValueSubkeyRangeSet,
) -> Option<InspectCacheL2Value> {
if let Some(l2c) = self.cache.get_mut(key) {
if let Some(l2v) = l2c.cache.get(subkeys) {
return Some(l2v.clone());
}
}
None
}
pub fn put(&mut self, key: TypedKey, subkeys: ValueSubkeyRangeSet, value: InspectCacheL2Value) {
self.cache
.entry(key)
.or_insert_with(|| InspectCacheL2::new(L2_CACHE_DEPTH))
.cache
.insert(subkeys, value);
}
pub fn invalidate(&mut self, key: &TypedKey) {
self.cache.remove(key);
}
pub fn replace_subkey_seq(&mut self, key: &TypedKey, subkey: ValueSubkey, seq: ValueSeqNum) {
let Some(l2) = self.cache.get_mut(key) else {
return;
};
for entry in &mut l2.cache {
let Some(idx) = entry.0.idx_of_subkey(subkey) else {
continue;
};
if idx < entry.1.seqs.len() {
entry.1.seqs[idx] = seq;
} else if idx > entry.1.seqs.len() {
panic!(
"representational error in l2 inspect cache: {} > {}",
idx,
entry.1.seqs.len()
)
}
}
}
}

View File

@ -61,9 +61,3 @@ impl TryFrom<&[u8]> for SubkeyTableKey {
Ok(SubkeyTableKey { key, subkey })
}
}
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct SeqsCacheKey {
pub key: TypedKey,
pub subkeys: ValueSubkeyRangeSet,
}

View File

@ -4,6 +4,7 @@
/// This store does not perform any validation on the schema, and all ValueRecordData passed in must have been previously validated.
/// Uses an in-memory store for the records, backed by the TableStore. Subkey data is LRU cached and rotated out by a limits policy,
/// and backed to the TableStore for persistence.
mod inspect_cache;
mod keys;
mod limited_size;
mod local_record_detail;
@ -14,6 +15,7 @@ mod record_store_limits;
mod remote_record_detail;
mod watch;
pub(super) use inspect_cache::*;
pub(super) use keys::*;
pub(super) use limited_size::*;
pub(super) use local_record_detail::*;
@ -60,7 +62,7 @@ where
/// The in-memory cache of commonly accessed subkey data so we don't have to keep hitting the db
subkey_cache: LruCache<SubkeyTableKey, RecordData>,
/// The in-memory cache of commonly accessed sequence number data so we don't have to keep hitting the db
seqs_cache: LruCache<SeqsCacheKey, Vec<ValueSeqNum>>,
inspect_cache: InspectCache,
/// Total storage space or subkey data inclusive of structures in memory
subkey_cache_total_size: LimitedSize<usize>,
/// Total storage space of records in the tabledb inclusive of subkey data and structures
@ -118,7 +120,7 @@ where
subkey_table: None,
record_index: LruCache::new(limits.max_records.unwrap_or(usize::MAX)),
subkey_cache: LruCache::new(subkey_cache_size),
seqs_cache: LruCache::new(subkey_cache_size),
inspect_cache: InspectCache::new(subkey_cache_size),
subkey_cache_total_size: LimitedSize::new(
"subkey_cache_total_size",
0,
@ -425,6 +427,9 @@ where
// Remove watch changes
self.changed_watched_values.remove(&rtk);
// Invalidate inspect cache for this key
self.inspect_cache.invalidate(&rtk.key);
// Remove from table store immediately
self.add_dead_record(rtk, record);
self.purge_dead_records(false).await;
@ -540,7 +545,7 @@ where
// If subkey exists in subkey cache, use that
let stk = SubkeyTableKey { key, subkey };
if let Some(record_data) = self.subkey_cache.get_mut(&stk) {
if let Some(record_data) = self.subkey_cache.get(&stk) {
let out = record_data.signed_value_data().clone();
return Ok(Some(GetResult {
@ -754,6 +759,13 @@ where
.await
.map_err(VeilidAPIError::internal)?;
// Write to inspect cache
self.inspect_cache.replace_subkey_seq(
&stk.key,
subkey,
subkey_record_data.signed_value_data().value_data().seq(),
);
// Write to subkey cache
self.add_to_subkey_cache(stk, subkey_record_data);
@ -780,6 +792,11 @@ where
subkeys: ValueSubkeyRangeSet,
want_descriptor: bool,
) -> VeilidAPIResult<Option<InspectResult>> {
// Get subkey table
let Some(subkey_table) = self.subkey_table.clone() else {
apibail_internal!("record store not initialized");
};
// Get record from index
let Some((subkeys, opt_descriptor)) = self.with_record(key, |record| {
// Get number of subkeys from schema and ensure we are getting the
@ -788,8 +805,21 @@ where
0,
record.schema().max_subkey(),
));
// Cap the number of total subkeys being inspected to the amount we can send across the wire
let truncated_subkeys = if let Some(nth_subkey) =
in_schema_subkeys.nth_subkey(MAX_INSPECT_VALUE_A_SEQS_LEN)
{
in_schema_subkeys.difference(&ValueSubkeyRangeSet::single_range(
nth_subkey,
ValueSubkey::MAX,
))
} else {
in_schema_subkeys
};
(
in_schema_subkeys,
truncated_subkeys,
if want_descriptor {
Some(record.descriptor().clone())
} else {
@ -807,23 +837,14 @@ where
}
// See if we have this inspection cached
let sck = SeqsCacheKey {
key,
subkeys: subkeys.clone(),
};
if let Some(seqs) = self.seqs_cache.get(&sck) {
if let Some(icv) = self.inspect_cache.get(&key, &subkeys) {
return Ok(Some(InspectResult {
subkeys,
seqs: seqs.clone(),
seqs: icv.seqs,
opt_descriptor,
}));
}
// Get subkey table
let Some(subkey_table) = self.subkey_table.clone() else {
apibail_internal!("record store not initialized");
};
// Build sequence number list to return
let mut seqs = Vec::with_capacity(subkeys.len());
for subkey in subkeys.iter() {
@ -847,6 +868,13 @@ where
seqs.push(seq)
}
// Save seqs cache
self.inspect_cache.put(
key,
subkeys.clone(),
InspectCacheL2Value { seqs: seqs.clone() },
);
Ok(Some(InspectResult {
subkeys,
seqs,

View File

@ -88,7 +88,7 @@ impl StorageManager {
// Keep the value if we got one and it is newer and it passes schema validation
if let Some(value) = sva.answer.value {
log_stor!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq());
log_dht!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq());
// Validate with schema
if !ctx.schema.check_subkey_value_data(
@ -174,7 +174,7 @@ impl StorageManager {
// Failed
TimeoutOr::Value(Err(e)) => {
// If we finished with an error, return that
log_stor!(debug "SetValue Fanout Error: {}", e);
log_dht!(debug "SetValue Fanout Error: {}", e);
return Err(e.into());
}
};
@ -183,7 +183,7 @@ impl StorageManager {
kind,
value_nodes: ctx.value_nodes.clone(),
};
log_stor!(debug "SetValue Fanout: {:?}", fanout_result);
log_dht!(debug "SetValue Fanout: {:?}", fanout_result);
Ok(OutboundSetValueResult {
fanout_result,

View File

@ -93,10 +93,10 @@ impl StorageManager {
if wva.answer.accepted {
if wva.answer.expiration_ts.as_u64() > 0 {
// If the expiration time is greater than zero this watch is active
log_stor!(debug "Watch active: id={} expiration_ts={}", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64()));
log_dht!(debug "Watch active: id={} expiration_ts={}", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64()));
} else {
// If the returned expiration time is zero, this watch was cancelled, or inactive
log_stor!(debug "Watch inactive: id={}", wva.answer.watch_id);
log_dht!(debug "Watch inactive: id={}", wva.answer.watch_id);
}
let mut ctx = context.lock();
ctx.opt_watch_value_result = Some(OutboundWatchValueResult {
@ -143,9 +143,9 @@ impl StorageManager {
// Return the best answer we've got
let ctx = context.lock();
if ctx.opt_watch_value_result.is_some() {
log_stor!(debug "WatchValue Fanout Timeout Success");
log_dht!(debug "WatchValue Fanout Timeout Success");
} else {
log_stor!(debug "WatchValue Fanout Timeout Failure");
log_dht!(debug "WatchValue Fanout Timeout Failure");
}
Ok(ctx.opt_watch_value_result.clone())
}
@ -154,9 +154,9 @@ impl StorageManager {
// Return the best answer we've got
let ctx = context.lock();
if ctx.opt_watch_value_result.is_some() {
log_stor!(debug "WatchValue Fanout Success");
log_dht!(debug "WatchValue Fanout Success");
} else {
log_stor!(debug "WatchValue Fanout Failure");
log_dht!(debug "WatchValue Fanout Failure");
}
Ok(ctx.opt_watch_value_result.clone())
}
@ -165,16 +165,16 @@ impl StorageManager {
// Return the best answer we've got
let ctx = context.lock();
if ctx.opt_watch_value_result.is_some() {
log_stor!(debug "WatchValue Fanout Exhausted Success");
log_dht!(debug "WatchValue Fanout Exhausted Success");
} else {
log_stor!(debug "WatchValue Fanout Exhausted Failure");
log_dht!(debug "WatchValue Fanout Exhausted Failure");
}
Ok(ctx.opt_watch_value_result.clone())
}
// Failed
TimeoutOr::Value(Err(e)) => {
// If we finished with an error, return that
log_stor!(debug "WatchValue Fanout Error: {}", e);
log_dht!(debug "WatchValue Fanout Error: {}", e);
Err(e.into())
}
}

View File

@ -1796,25 +1796,13 @@ impl VeilidAPI {
let (key, rc) = get_opened_dht_record_context(&args, "debug_record_watch", "key", 1)?;
let mut rest_defaults = false;
let subkeys = get_debug_argument_at(
&args,
1 + opt_arg_add,
"debug_record_inspect",
"subkeys",
get_subkeys,
)
.ok()
.unwrap_or_else(|| {
rest_defaults = true;
Default::default()
});
let scope = if rest_defaults {
Default::default()
} else {
get_debug_argument_at(
&args,
2 + opt_arg_add,
1 + opt_arg_add,
"debug_record_inspect",
"scope",
get_dht_report_scope,
@ -1826,6 +1814,19 @@ impl VeilidAPI {
})
};
let subkeys = get_debug_argument_at(
&args,
2 + opt_arg_add,
"debug_record_inspect",
"subkeys",
get_subkeys,
)
.ok()
.unwrap_or_else(|| {
rest_defaults = true;
Default::default()
});
// Do a record inspect
let report = match rc.inspect_dht_record(key, subkeys, scope).await {
Err(e) => {
@ -1945,9 +1946,9 @@ record list <local|remote>
get [<key>] <subkey> [force]
delete <key>
info [<key>] [subkey]
watch [<key>] [<subkeys>] [<expiration>] [<count>]
watch [<key>] [<subkeys> [<expiration> [<count>]]]
cancel [<key>] [<subkeys>]
inspect [<key>] [<subkeys>] [<scope>]
inspect [<key>] [<scope> [<subkeys>]]
--------------------------------------------------------------------
<key> is: VLD0:GsgXCRPrzSK6oBNgxhNpm-rTYFd02R0ySx6j9vbQBG4
* also <node>, <relay>, <target>, <route>

View File

@ -53,6 +53,31 @@ impl ValueSubkeyRangeSet {
pub fn into_data(self) -> RangeSetBlaze<ValueSubkey> {
self.data
}
pub fn nth_subkey(&self, idx: usize) -> Option<ValueSubkey> {
let mut idxleft = idx;
for range in self.data.ranges() {
let range_len = (*range.end() - *range.start() + 1) as usize;
if idxleft < range_len {
return Some(*range.start() + idxleft as u32);
}
idxleft -= range_len;
}
None
}
pub fn idx_of_subkey(&self, subkey: ValueSubkey) -> Option<usize> {
let mut idx = 0usize;
for range in self.data.ranges() {
if range.contains(&subkey) {
idx += (subkey - *range.start()) as usize;
return Some(idx);
} else {
idx += (*range.end() - *range.start() + 1) as usize;
}
}
None
}
}
impl FromStr for ValueSubkeyRangeSet {