From 64832748a993f290598cadb51aafa5eed1e2d0bc Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Tue, 12 Mar 2024 22:36:04 -0400 Subject: [PATCH] proper valuesubkeyrangeset truncation --- veilid-core/src/rpc_processor/coders/mod.rs | 1 + .../rpc_processor/coders/operations/mod.rs | 2 + .../operations/operation_inspect_value.rs | 2 +- veilid-core/src/storage_manager/get_value.rs | 6 +- .../src/storage_manager/inspect_value.rs | 6 +- .../record_store/inspect_cache.rs | 79 +++++++++++++++++++ .../src/storage_manager/record_store/keys.rs | 6 -- .../src/storage_manager/record_store/mod.rs | 58 ++++++++++---- veilid-core/src/storage_manager/set_value.rs | 6 +- .../src/storage_manager/watch_value.rs | 18 ++--- veilid-core/src/veilid_api/debug.rs | 31 ++++---- .../types/dht/value_subkey_range_set.rs | 25 ++++++ 12 files changed, 185 insertions(+), 55 deletions(-) create mode 100644 veilid-core/src/storage_manager/record_store/inspect_cache.rs diff --git a/veilid-core/src/rpc_processor/coders/mod.rs b/veilid-core/src/rpc_processor/coders/mod.rs index 62650336..3778d953 100644 --- a/veilid-core/src/rpc_processor/coders/mod.rs +++ b/veilid-core/src/rpc_processor/coders/mod.rs @@ -27,6 +27,7 @@ mod tunnel; mod typed_key; mod typed_signature; +pub(crate) use operations::MAX_INSPECT_VALUE_A_SEQS_LEN; pub(in crate::rpc_processor) use operations::*; pub(crate) use address::*; diff --git a/veilid-core/src/rpc_processor/coders/operations/mod.rs b/veilid-core/src/rpc_processor/coders/operations/mod.rs index 00d20f48..dfeab022 100644 --- a/veilid-core/src/rpc_processor/coders/operations/mod.rs +++ b/veilid-core/src/rpc_processor/coders/operations/mod.rs @@ -62,3 +62,5 @@ pub(in crate::rpc_processor) use operation_complete_tunnel::*; pub(in crate::rpc_processor) use operation_start_tunnel::*; use super::*; + +pub(crate) use operation_inspect_value::MAX_INSPECT_VALUE_A_SEQS_LEN; diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_inspect_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_inspect_value.rs index 26ccec60..7d1ce70a 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_inspect_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_inspect_value.rs @@ -2,7 +2,7 @@ use super::*; use crate::storage_manager::SignedValueDescriptor; const MAX_INSPECT_VALUE_Q_SUBKEY_RANGES_LEN: usize = 512; -const MAX_INSPECT_VALUE_A_SEQS_LEN: usize = 512; +pub(crate) const MAX_INSPECT_VALUE_A_SEQS_LEN: usize = 512; const MAX_INSPECT_VALUE_A_PEERS_LEN: usize = 20; #[derive(Clone)] diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index 3a1eb27e..6519454c 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -92,7 +92,7 @@ impl StorageManager { // Keep the value if we got one and it is newer and it passes schema validation if let Some(value) = gva.answer.value { - log_stor!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq()); + log_dht!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq()); let mut ctx = context.lock(); // Ensure we have a schema and descriptor @@ -189,7 +189,7 @@ impl StorageManager { // Failed TimeoutOr::Value(Err(e)) => { // If we finished with an error, return that - log_stor!(debug "GetValue Fanout Error: {}", e); + log_dht!(debug "GetValue Fanout Error: {}", e); return Err(e.into()); } }; @@ -199,7 +199,7 @@ impl StorageManager { kind, value_nodes: ctx.value_nodes.clone(), }; - log_stor!(debug "GetValue Fanout: {:?}", fanout_result); + log_dht!(debug "GetValue Fanout: {:?}", fanout_result); Ok(OutboundGetValueResult { fanout_result, diff --git a/veilid-core/src/storage_manager/inspect_value.rs b/veilid-core/src/storage_manager/inspect_value.rs index 7377ca82..7a504796 100644 --- a/veilid-core/src/storage_manager/inspect_value.rs +++ b/veilid-core/src/storage_manager/inspect_value.rs @@ -139,7 +139,7 @@ impl StorageManager { // Keep the value if we got one and it is newer and it passes schema validation if !iva.answer.seqs.is_empty() { - log_stor!(debug "Got seqs back: len={}", iva.answer.seqs.len()); + log_dht!(debug "Got seqs back: len={}", iva.answer.seqs.len()); let mut ctx = context.lock(); // Ensure we have a schema and descriptor etc @@ -257,7 +257,7 @@ impl StorageManager { // Failed TimeoutOr::Value(Err(e)) => { // If we finished with an error, return that - log_stor!(debug "InspectValue Fanout Error: {}", e); + log_dht!(debug "InspectValue Fanout Error: {}", e); return Err(e.into()); } }; @@ -277,7 +277,7 @@ impl StorageManager { fanout_results.push(fanout_result); } - log_stor!(debug "InspectValue Fanout ({:?}): {:?}", kind, fanout_results.iter().map(|fr| (fr.kind, fr.value_nodes.len())).collect::>()); + log_dht!(debug "InspectValue Fanout ({:?}): {:?}", kind, fanout_results.iter().map(|fr| (fr.kind, fr.value_nodes.len())).collect::>()); Ok(OutboundInspectValueResult { fanout_results, diff --git a/veilid-core/src/storage_manager/record_store/inspect_cache.rs b/veilid-core/src/storage_manager/record_store/inspect_cache.rs new file mode 100644 index 00000000..d3b09189 --- /dev/null +++ b/veilid-core/src/storage_manager/record_store/inspect_cache.rs @@ -0,0 +1,79 @@ +use super::*; + +const L2_CACHE_DEPTH: usize = 4; // XXX: i just picked this. we could probably do better than this someday + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct InspectCacheL2Value { + pub seqs: Vec, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +struct InspectCacheL2 { + pub cache: LruCache, +} + +impl InspectCacheL2 { + pub fn new(l2_cache_limit: usize) -> Self { + Self { + cache: LruCache::new(l2_cache_limit), + } + } +} + +pub struct InspectCache { + cache: LruCache, +} + +impl InspectCache { + pub fn new(l1_cache_limit: usize) -> Self { + Self { + cache: LruCache::new(l1_cache_limit), + } + } + + pub fn get( + &mut self, + key: &TypedKey, + subkeys: &ValueSubkeyRangeSet, + ) -> Option { + if let Some(l2c) = self.cache.get_mut(key) { + if let Some(l2v) = l2c.cache.get(subkeys) { + return Some(l2v.clone()); + } + } + None + } + + pub fn put(&mut self, key: TypedKey, subkeys: ValueSubkeyRangeSet, value: InspectCacheL2Value) { + self.cache + .entry(key) + .or_insert_with(|| InspectCacheL2::new(L2_CACHE_DEPTH)) + .cache + .insert(subkeys, value); + } + + pub fn invalidate(&mut self, key: &TypedKey) { + self.cache.remove(key); + } + + pub fn replace_subkey_seq(&mut self, key: &TypedKey, subkey: ValueSubkey, seq: ValueSeqNum) { + let Some(l2) = self.cache.get_mut(key) else { + return; + }; + + for entry in &mut l2.cache { + let Some(idx) = entry.0.idx_of_subkey(subkey) else { + continue; + }; + if idx < entry.1.seqs.len() { + entry.1.seqs[idx] = seq; + } else if idx > entry.1.seqs.len() { + panic!( + "representational error in l2 inspect cache: {} > {}", + idx, + entry.1.seqs.len() + ) + } + } + } +} diff --git a/veilid-core/src/storage_manager/record_store/keys.rs b/veilid-core/src/storage_manager/record_store/keys.rs index 3a78c7ad..547e4aa9 100644 --- a/veilid-core/src/storage_manager/record_store/keys.rs +++ b/veilid-core/src/storage_manager/record_store/keys.rs @@ -61,9 +61,3 @@ impl TryFrom<&[u8]> for SubkeyTableKey { Ok(SubkeyTableKey { key, subkey }) } } - -#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] -pub struct SeqsCacheKey { - pub key: TypedKey, - pub subkeys: ValueSubkeyRangeSet, -} diff --git a/veilid-core/src/storage_manager/record_store/mod.rs b/veilid-core/src/storage_manager/record_store/mod.rs index dd68891d..fd3eba42 100644 --- a/veilid-core/src/storage_manager/record_store/mod.rs +++ b/veilid-core/src/storage_manager/record_store/mod.rs @@ -4,6 +4,7 @@ /// This store does not perform any validation on the schema, and all ValueRecordData passed in must have been previously validated. /// Uses an in-memory store for the records, backed by the TableStore. Subkey data is LRU cached and rotated out by a limits policy, /// and backed to the TableStore for persistence. +mod inspect_cache; mod keys; mod limited_size; mod local_record_detail; @@ -14,6 +15,7 @@ mod record_store_limits; mod remote_record_detail; mod watch; +pub(super) use inspect_cache::*; pub(super) use keys::*; pub(super) use limited_size::*; pub(super) use local_record_detail::*; @@ -60,7 +62,7 @@ where /// The in-memory cache of commonly accessed subkey data so we don't have to keep hitting the db subkey_cache: LruCache, /// The in-memory cache of commonly accessed sequence number data so we don't have to keep hitting the db - seqs_cache: LruCache>, + inspect_cache: InspectCache, /// Total storage space or subkey data inclusive of structures in memory subkey_cache_total_size: LimitedSize, /// Total storage space of records in the tabledb inclusive of subkey data and structures @@ -118,7 +120,7 @@ where subkey_table: None, record_index: LruCache::new(limits.max_records.unwrap_or(usize::MAX)), subkey_cache: LruCache::new(subkey_cache_size), - seqs_cache: LruCache::new(subkey_cache_size), + inspect_cache: InspectCache::new(subkey_cache_size), subkey_cache_total_size: LimitedSize::new( "subkey_cache_total_size", 0, @@ -425,6 +427,9 @@ where // Remove watch changes self.changed_watched_values.remove(&rtk); + // Invalidate inspect cache for this key + self.inspect_cache.invalidate(&rtk.key); + // Remove from table store immediately self.add_dead_record(rtk, record); self.purge_dead_records(false).await; @@ -540,7 +545,7 @@ where // If subkey exists in subkey cache, use that let stk = SubkeyTableKey { key, subkey }; - if let Some(record_data) = self.subkey_cache.get_mut(&stk) { + if let Some(record_data) = self.subkey_cache.get(&stk) { let out = record_data.signed_value_data().clone(); return Ok(Some(GetResult { @@ -754,6 +759,13 @@ where .await .map_err(VeilidAPIError::internal)?; + // Write to inspect cache + self.inspect_cache.replace_subkey_seq( + &stk.key, + subkey, + subkey_record_data.signed_value_data().value_data().seq(), + ); + // Write to subkey cache self.add_to_subkey_cache(stk, subkey_record_data); @@ -780,6 +792,11 @@ where subkeys: ValueSubkeyRangeSet, want_descriptor: bool, ) -> VeilidAPIResult> { + // Get subkey table + let Some(subkey_table) = self.subkey_table.clone() else { + apibail_internal!("record store not initialized"); + }; + // Get record from index let Some((subkeys, opt_descriptor)) = self.with_record(key, |record| { // Get number of subkeys from schema and ensure we are getting the @@ -788,8 +805,21 @@ where 0, record.schema().max_subkey(), )); + + // Cap the number of total subkeys being inspected to the amount we can send across the wire + let truncated_subkeys = if let Some(nth_subkey) = + in_schema_subkeys.nth_subkey(MAX_INSPECT_VALUE_A_SEQS_LEN) + { + in_schema_subkeys.difference(&ValueSubkeyRangeSet::single_range( + nth_subkey, + ValueSubkey::MAX, + )) + } else { + in_schema_subkeys + }; + ( - in_schema_subkeys, + truncated_subkeys, if want_descriptor { Some(record.descriptor().clone()) } else { @@ -807,23 +837,14 @@ where } // See if we have this inspection cached - let sck = SeqsCacheKey { - key, - subkeys: subkeys.clone(), - }; - if let Some(seqs) = self.seqs_cache.get(&sck) { + if let Some(icv) = self.inspect_cache.get(&key, &subkeys) { return Ok(Some(InspectResult { subkeys, - seqs: seqs.clone(), + seqs: icv.seqs, opt_descriptor, })); } - // Get subkey table - let Some(subkey_table) = self.subkey_table.clone() else { - apibail_internal!("record store not initialized"); - }; - // Build sequence number list to return let mut seqs = Vec::with_capacity(subkeys.len()); for subkey in subkeys.iter() { @@ -847,6 +868,13 @@ where seqs.push(seq) } + // Save seqs cache + self.inspect_cache.put( + key, + subkeys.clone(), + InspectCacheL2Value { seqs: seqs.clone() }, + ); + Ok(Some(InspectResult { subkeys, seqs, diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index 93357ab1..ed0d6523 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -88,7 +88,7 @@ impl StorageManager { // Keep the value if we got one and it is newer and it passes schema validation if let Some(value) = sva.answer.value { - log_stor!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq()); + log_dht!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq()); // Validate with schema if !ctx.schema.check_subkey_value_data( @@ -174,7 +174,7 @@ impl StorageManager { // Failed TimeoutOr::Value(Err(e)) => { // If we finished with an error, return that - log_stor!(debug "SetValue Fanout Error: {}", e); + log_dht!(debug "SetValue Fanout Error: {}", e); return Err(e.into()); } }; @@ -183,7 +183,7 @@ impl StorageManager { kind, value_nodes: ctx.value_nodes.clone(), }; - log_stor!(debug "SetValue Fanout: {:?}", fanout_result); + log_dht!(debug "SetValue Fanout: {:?}", fanout_result); Ok(OutboundSetValueResult { fanout_result, diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 3d60fc4b..1aa1b6f3 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -93,10 +93,10 @@ impl StorageManager { if wva.answer.accepted { if wva.answer.expiration_ts.as_u64() > 0 { // If the expiration time is greater than zero this watch is active - log_stor!(debug "Watch active: id={} expiration_ts={}", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64())); + log_dht!(debug "Watch active: id={} expiration_ts={}", wva.answer.watch_id, debug_ts(wva.answer.expiration_ts.as_u64())); } else { // If the returned expiration time is zero, this watch was cancelled, or inactive - log_stor!(debug "Watch inactive: id={}", wva.answer.watch_id); + log_dht!(debug "Watch inactive: id={}", wva.answer.watch_id); } let mut ctx = context.lock(); ctx.opt_watch_value_result = Some(OutboundWatchValueResult { @@ -143,9 +143,9 @@ impl StorageManager { // Return the best answer we've got let ctx = context.lock(); if ctx.opt_watch_value_result.is_some() { - log_stor!(debug "WatchValue Fanout Timeout Success"); + log_dht!(debug "WatchValue Fanout Timeout Success"); } else { - log_stor!(debug "WatchValue Fanout Timeout Failure"); + log_dht!(debug "WatchValue Fanout Timeout Failure"); } Ok(ctx.opt_watch_value_result.clone()) } @@ -154,9 +154,9 @@ impl StorageManager { // Return the best answer we've got let ctx = context.lock(); if ctx.opt_watch_value_result.is_some() { - log_stor!(debug "WatchValue Fanout Success"); + log_dht!(debug "WatchValue Fanout Success"); } else { - log_stor!(debug "WatchValue Fanout Failure"); + log_dht!(debug "WatchValue Fanout Failure"); } Ok(ctx.opt_watch_value_result.clone()) } @@ -165,16 +165,16 @@ impl StorageManager { // Return the best answer we've got let ctx = context.lock(); if ctx.opt_watch_value_result.is_some() { - log_stor!(debug "WatchValue Fanout Exhausted Success"); + log_dht!(debug "WatchValue Fanout Exhausted Success"); } else { - log_stor!(debug "WatchValue Fanout Exhausted Failure"); + log_dht!(debug "WatchValue Fanout Exhausted Failure"); } Ok(ctx.opt_watch_value_result.clone()) } // Failed TimeoutOr::Value(Err(e)) => { // If we finished with an error, return that - log_stor!(debug "WatchValue Fanout Error: {}", e); + log_dht!(debug "WatchValue Fanout Error: {}", e); Err(e.into()) } } diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 23930e49..7d7f7c37 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -1796,25 +1796,13 @@ impl VeilidAPI { let (key, rc) = get_opened_dht_record_context(&args, "debug_record_watch", "key", 1)?; let mut rest_defaults = false; - let subkeys = get_debug_argument_at( - &args, - 1 + opt_arg_add, - "debug_record_inspect", - "subkeys", - get_subkeys, - ) - .ok() - .unwrap_or_else(|| { - rest_defaults = true; - Default::default() - }); let scope = if rest_defaults { Default::default() } else { get_debug_argument_at( &args, - 2 + opt_arg_add, + 1 + opt_arg_add, "debug_record_inspect", "scope", get_dht_report_scope, @@ -1826,6 +1814,19 @@ impl VeilidAPI { }) }; + let subkeys = get_debug_argument_at( + &args, + 2 + opt_arg_add, + "debug_record_inspect", + "subkeys", + get_subkeys, + ) + .ok() + .unwrap_or_else(|| { + rest_defaults = true; + Default::default() + }); + // Do a record inspect let report = match rc.inspect_dht_record(key, subkeys, scope).await { Err(e) => { @@ -1945,9 +1946,9 @@ record list get [] [force] delete info [] [subkey] - watch [] [] [] [] + watch [] [ [ []]] cancel [] [] - inspect [] [] [] + inspect [] [ []] -------------------------------------------------------------------- is: VLD0:GsgXCRPrzSK6oBNgxhNpm-rTYFd02R0ySx6j9vbQBG4 * also , , , diff --git a/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs b/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs index 8282f3b5..a7f39716 100644 --- a/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs +++ b/veilid-core/src/veilid_api/types/dht/value_subkey_range_set.rs @@ -53,6 +53,31 @@ impl ValueSubkeyRangeSet { pub fn into_data(self) -> RangeSetBlaze { self.data } + + pub fn nth_subkey(&self, idx: usize) -> Option { + let mut idxleft = idx; + for range in self.data.ranges() { + let range_len = (*range.end() - *range.start() + 1) as usize; + if idxleft < range_len { + return Some(*range.start() + idxleft as u32); + } + idxleft -= range_len; + } + None + } + + pub fn idx_of_subkey(&self, subkey: ValueSubkey) -> Option { + let mut idx = 0usize; + for range in self.data.ranges() { + if range.contains(&subkey) { + idx += (subkey - *range.start()) as usize; + return Some(idx); + } else { + idx += (*range.end() - *range.start() + 1) as usize; + } + } + None + } } impl FromStr for ValueSubkeyRangeSet {