From d196c934cdd53edde1cf9409adad17a3cd1b01ce Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sun, 26 Jan 2025 21:29:12 -0500 Subject: [PATCH] [skip ci] refactor checkpoint --- veilid-core/src/component.rs | 52 +++++++ veilid-core/src/core_context.rs | 30 +++- veilid-core/src/crypto/mod.rs | 63 +++++--- veilid-core/src/logging/facilities.rs | 8 ++ veilid-core/src/routing_table/mod.rs | 10 +- veilid-core/src/storage_manager/mod.rs | 42 +++--- .../src/storage_manager/record_store/mod.rs | 97 +++++-------- veilid-core/src/storage_manager/set_value.rs | 4 +- .../tasks/check_active_watches.rs | 25 ++-- .../tasks/flush_record_stores.rs | 2 +- veilid-core/src/storage_manager/tasks/mod.rs | 60 ++++---- .../tasks/offline_subkey_writes.rs | 43 +++--- veilid-core/src/table_store/mod.rs | 134 ++++++++++-------- 13 files changed, 334 insertions(+), 236 deletions(-) diff --git a/veilid-core/src/component.rs b/veilid-core/src/component.rs index b9300320..847c788e 100644 --- a/veilid-core/src/component.rs +++ b/veilid-core/src/component.rs @@ -16,6 +16,8 @@ pub trait VeilidComponent: AsAnyArcSendSync + core::fmt::Debug { fn registry(&self) -> VeilidComponentRegistry; fn init(&self) -> SendPinBoxFutureLifetime<'_, EyreResult<()>>; + fn post_init(&self) -> SendPinBoxFutureLifetime<'_, EyreResult<()>>; + fn pre_terminate(&self) -> SendPinBoxFutureLifetime<'_, ()>; fn terminate(&self) -> SendPinBoxFutureLifetime<'_, ()>; // Registry shortcuts @@ -116,6 +118,40 @@ impl VeilidComponentRegistry { Ok(()) } + pub async fn post_init(&self) -> EyreResult<()> { + let Some(mut _init_guard) = asyncmutex_try_lock!(self.init_lock) else { + bail!("init should only happen one at a time"); + }; + if !*_init_guard { + bail!("not initialized"); + } + + let init_order = self.get_init_order(); + let mut post_initialized = vec![]; + for component in init_order { + if let Err(e) = component.post_init().await { + self.pre_terminate_inner(post_initialized).await; + return Err(e); + } + post_initialized.push(component) + } + Ok(()) + } + + pub async fn pre_terminate(&self) { + let Some(mut _init_guard) = asyncmutex_try_lock!(self.init_lock) else { + panic!("terminate should only happen one at a time"); + }; + if !*_init_guard { + panic!("not initialized"); + } + + let init_order = self.get_init_order(); + self.pre_terminate_inner(init_order).await; + + *_init_guard = false; + } + pub async fn terminate(&self) { let Some(mut _init_guard) = asyncmutex_try_lock!(self.init_lock) else { panic!("terminate should only happen one at a time"); @@ -130,6 +166,14 @@ impl VeilidComponentRegistry { *_init_guard = false; } + async fn pre_terminate_inner( + &self, + pre_initialized: Vec>, + ) { + for component in pre_initialized.iter().rev() { + component.pre_terminate().await; + } + } async fn terminate_inner(&self, initialized: Vec>) { for component in initialized.iter().rev() { component.terminate().await; @@ -185,6 +229,14 @@ macro_rules! impl_veilid_component { Box::pin(async { self.init_async().await }) } + fn post_init(&self) -> SendPinBoxFutureLifetime<'_, EyreResult<()>> { + Box::pin(async { self.post_init_async().await }) + } + + fn pre_terminate(&self) -> SendPinBoxFutureLifetime<'_, ()> { + Box::pin(async { self.pre_terminate_async().await }) + } + fn terminate(&self) -> SendPinBoxFutureLifetime<'_, ()> { Box::pin(async { self.terminate_async().await }) } diff --git a/veilid-core/src/core_context.rs b/veilid-core/src/core_context.rs index 90f9e0e9..ae7b382f 100644 --- a/veilid-core/src/core_context.rs +++ b/veilid-core/src/core_context.rs @@ -65,19 +65,27 @@ impl VeilidCoreContext { // Register all components registry.register(ProtectedStore::new); - // Initialize table store first, so crypto code can load caches - // Tablestore can use crypto during init, just not any cached operations or things - // that require flushing back to the tablestore - registry.register(TableStore::new); registry.register(Crypto::new); + registry.register(TableStore::new); #[cfg(feature = "unstable-blockstore")] registry.register(BlockStore::new); registry.register(StorageManager::new); registry.register(AttachmentManager::new); // Run initialization + // This should make the majority of subsystems functional registry.init().await.map_err(VeilidAPIError::internal)?; + // Run post-initialization + // This should resolve any inter-subsystem dependencies + // required for background processes that utilize multiple subsystems + // Background processes also often require registry lookup of the + // current subsystem, which is not available until after init succeeds + if let Err(e) = registry.post_init().await { + registry.terminate().await; + return VeilidAPIError::internal(e); + } + info!("Veilid API startup complete"); Ok(Self { registry }) @@ -97,6 +105,13 @@ impl VeilidCoreContext { ) }; + // Run pre-termination + // This should shut down background processes that may require the existence of + // other subsystems that may not exist during final termination + self.registry.pre_terminate().await; + + // Run termination + // This should finish any shutdown operations for the subsystems self.registry.terminate().await; if let Err(e) = ApiTracingLayer::remove_callback(program_name, namespace).await { @@ -118,11 +133,14 @@ pub trait RegisteredComponents: VeilidComponent { fn protected_store(&self) -> VeilidComponentGuard<'_, ProtectedStore> { self.registry().lookup::().unwrap() } + fn crypto(&self) -> VeilidComponentGuard<'_, Crypto> { + self.registry().lookup::().unwrap() + } fn table_store(&self) -> VeilidComponentGuard<'_, TableStore> { self.registry().lookup::().unwrap() } - fn crypto(&self) -> VeilidComponentGuard<'_, Crypto> { - self.registry().lookup::().unwrap() + fn storage_manager(&self) -> VeilidComponentGuard<'_, StorageManager> { + self.registry().lookup::().unwrap() } } impl RegisteredComponents for T {} diff --git a/veilid-core/src/crypto/mod.rs b/veilid-core/src/crypto/mod.rs index 5ea982fb..6779a617 100644 --- a/veilid-core/src/crypto/mod.rs +++ b/veilid-core/src/crypto/mod.rs @@ -98,7 +98,6 @@ impl fmt::Debug for CryptoInner { } /// Crypto factory implementation -#[derive(Debug)] pub struct Crypto { registry: VeilidComponentRegistry, inner: Arc>, @@ -110,6 +109,17 @@ pub struct Crypto { impl_veilid_component!(Crypto); +impl fmt::Debug for Crypto { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Crypto") + //.field("registry", &self.registry) + .field("inner", &self.inner) + // .field("crypto_vld0", &self.crypto_vld0) + // .field("crypto_none", &self.crypto_none) + .finish() + } +} + impl Crypto { fn new_inner() -> CryptoInner { CryptoInner { @@ -131,8 +141,15 @@ impl Crypto { #[instrument(level = "trace", target = "crypto", skip_all, err)] async fn init_async(&self) -> EyreResult<()> { + // Nothing to initialize at this time + Ok(()) + } + + // Setup called by table store after it get initialized + #[instrument(level = "trace", target = "crypto", skip_all, err)] + pub(crate) async fn table_store_setup(&self, table_store: &TableStore) -> EyreResult<()> { // Init node id from config - if let Err(e) = self.init_node_ids().await { + if let Err(e) = self.setup_node_ids(table_store).await { return Err(e).wrap_err("init node id failed"); } @@ -147,8 +164,6 @@ impl Crypto { }); // load caches if they are valid for this node id - let table_store = self.table_store(); - let mut db = table_store .open("crypto_caches", 1) .await @@ -169,7 +184,11 @@ impl Crypto { db.store(0, b"cache_validity_key", &cache_validity_key) .await?; } + Ok(()) + } + #[instrument(level = "trace", target = "crypto", skip_all, err)] + async fn post_init_async(&self) -> EyreResult<()> { // Schedule flushing let registry = self.registry(); let flush_future = interval("crypto flush", 60000, move || { @@ -196,7 +215,7 @@ impl Crypto { Ok(()) } - async fn terminate_async(&self) { + async fn pre_terminate_async(&self) { let flush_future = self.inner.lock().flush_future.take(); if let Some(f) = flush_future { f.await; @@ -212,18 +231,21 @@ impl Crypto { }; } + async fn terminate_async(&self) { + // Nothing to terminate at this time + } + /// Factory method to get a specific crypto version pub fn get(&self, kind: CryptoKind) -> Option> { - let inner = self.inner.lock(); match kind { #[cfg(feature = "enable-crypto-vld0")] CRYPTO_KIND_VLD0 => Some(CryptoSystemGuard { - crypto_system: inner.crypto_vld0.clone().unwrap(), + crypto_system: self.crypto_vld0.clone(), _phantom: PhantomData {}, }), #[cfg(feature = "enable-crypto-none")] CRYPTO_KIND_NONE => Some(CryptoSystemGuard { - crypto_system: inner.crypto_none.clone().unwrap(), + crypto_system: self.crypto_none.clone(), _phantom: PhantomData {}, }), _ => None, @@ -329,9 +351,10 @@ impl Crypto { } #[cfg(not(test))] - async fn init_node_id( + async fn setup_node_id( &self, vcrypto: CryptoSystemGuard<'_>, + table_store: &TableStore, ) -> VeilidAPIResult<(TypedKey, TypedSecret)> { let config = self.config(); let ck = vcrypto.kind(); @@ -343,37 +366,35 @@ impl Crypto { }); // See if node id was previously stored in the table store - let table_store = self.table_store(); - let config_table = table_store.open("__veilid_config", 1).await?; let table_key_node_id = format!("node_id_{}", ck); let table_key_node_id_secret = format!("node_id_secret_{}", ck); if node_id.is_none() { - log_tstore!(debug "pulling {} from storage", table_key_node_id); + log_crypto!(debug "pulling {} from storage", table_key_node_id); if let Ok(Some(stored_node_id)) = config_table .load_json::(0, table_key_node_id.as_bytes()) .await { - log_tstore!(debug "{} found in storage", table_key_node_id); + log_crypto!(debug "{} found in storage", table_key_node_id); node_id = Some(stored_node_id); } else { - log_tstore!(debug "{} not found in storage", table_key_node_id); + log_crypto!(debug "{} not found in storage", table_key_node_id); } } // See if node id secret was previously stored in the protected store if node_id_secret.is_none() { - log_tstore!(debug "pulling {} from storage", table_key_node_id_secret); + log_crypto!(debug "pulling {} from storage", table_key_node_id_secret); if let Ok(Some(stored_node_id_secret)) = config_table .load_json::(0, table_key_node_id_secret.as_bytes()) .await { - log_tstore!(debug "{} found in storage", table_key_node_id_secret); + log_crypto!(debug "{} found in storage", table_key_node_id_secret); node_id_secret = Some(stored_node_id_secret); } else { - log_tstore!(debug "{} not found in storage", table_key_node_id_secret); + log_crypto!(debug "{} not found in storage", table_key_node_id_secret); } } @@ -390,7 +411,7 @@ impl Crypto { (node_id, node_id_secret) } else { // If we still don't have a valid node id, generate one - log_tstore!(debug "generating new node_id_{}", ck); + log_crypto!(debug "generating new node_id_{}", ck); let kp = vcrypto.generate_keypair(); (TypedKey::new(ck, kp.key), TypedSecret::new(ck, kp.secret)) }; @@ -408,9 +429,9 @@ impl Crypto { } /// Get the node id from config if one is specified. - /// Must be done -after- protected store startup. + /// Must be done -after- protected store is initialized, during table store init #[cfg_attr(test, allow(unused_variables))] - pub(crate) async fn init_node_ids(&self) -> VeilidAPIResult<()> { + async fn setup_node_ids(&self, table_store: &TableStore) -> VeilidAPIResult<()> { let mut out_node_id = TypedKeyGroup::new(); let mut out_node_id_secret = TypedSecretGroup::new(); @@ -425,7 +446,7 @@ impl Crypto { (TypedKey::new(ck, kp.key), TypedSecret::new(ck, kp.secret)) }; #[cfg(not(test))] - let (node_id, node_id_secret) = self.init_node_id(vcrypto).await?; + let (node_id, node_id_secret) = self.setup_node_id(vcrypto, table_store).await?; // Save for config out_node_id.add(node_id); diff --git a/veilid-core/src/logging/facilities.rs b/veilid-core/src/logging/facilities.rs index 93d00e5f..399dd15e 100644 --- a/veilid-core/src/logging/facilities.rs +++ b/veilid-core/src/logging/facilities.rs @@ -385,6 +385,14 @@ macro_rules! log_crypto { (warn $fmt:literal, $($arg:expr),+) => { warn!(target:"crypto", $fmt, $($arg),+); }; + (debug $text:expr) => { debug!( + target: "crypto", + "{}", + $text, + )}; + (debug $fmt:literal, $($arg:expr),+) => { + debug!(target:"crypto", $fmt, $($arg),+); + }; ($text:expr) => {trace!( target: "crypto", "{}", diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 9fef7822..a7c6560a 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -542,9 +542,17 @@ impl RoutingTable { self.inner.read().routing_domain_for_address(address) } - pub fn route_spec_store(&self) -> RouteSpecStore { + pub fn route_spec_store(&self) -> RwLockReadGuard<'_, RouteSpecStore> { self.inner.read().route_spec_store.as_ref().unwrap().clone() } + pub fn route_spec_store_mut(&self) -> RwLockReadGuard<'_, RouteSpecStore> { + self.inner + .write() + .route_spec_store + .as_ref() + .unwrap() + .clone() + } pub fn relay_node(&self, domain: RoutingDomain) -> Option { self.inner.read().relay_node(domain) diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index b38dd2a0..7a94dd6d 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -219,11 +219,10 @@ impl StorageManager { let local_limits = Self::local_limits_from_config(config.clone()); let remote_limits = Self::remote_limits_from_config(config.clone()); - let mut local_record_store = RecordStore::new(self.registry(), "local", local_limits); - local_record_store.setup().await?; - - let mut remote_record_store = RecordStore::new(self.registry(), "remote", remote_limits); - remote_record_store.setup().await?; + let local_record_store = + RecordStore::try_create(&table_store, "local", local_limits).await?; + let remote_record_store = + RecordStore::try_create(&table_store, "remote", remote_limits).await?; let mut inner = self.inner.lock().await; inner.metadata_db = Some(metadata_db); @@ -234,6 +233,13 @@ impl StorageManager { // Start deferred results processors inner.deferred_result_processor.init().await; + Ok(()) + } + + #[instrument(level = "trace", target = "tstore", skip_all)] + async fn post_init_async(&self) -> EyreResult<()> { + let mut inner = self.inner.lock().await; + // Schedule tick let tick_future = interval("storage manager tick", 1000, move || { let registry = self.registry(); @@ -249,10 +255,8 @@ impl StorageManager { Ok(()) } - #[instrument(level = "debug", skip_all)] - async fn terminate_async(&self) { - log_stor!(debug "starting storage manager shutdown"); - + #[instrument(level = "trace", target = "tstore", skip_all)] + async fn pre_terminate_async(&self) { // Stop the background ticker process { let mut inner = self.inner.lock().await; @@ -263,8 +267,13 @@ impl StorageManager { } } - // Cancel all tasks + // Cancel all tasks associated with the tick future self.cancel_tasks().await; + } + + #[instrument(level = "debug", skip_all)] + async fn terminate_async(&self) { + log_stor!(debug "starting storage manager shutdown"); // Terminate and release the storage manager { @@ -324,7 +333,7 @@ impl StorageManager { Ok(()) } - fn get_ready_rpc_processor(&self) -> Option> { + pub(super) fn get_ready_rpc_processor(&self) -> Option> { let Some(rpc_processor) = self.registry().lookup::() else { return None; }; @@ -340,11 +349,11 @@ impl StorageManager { Some(rpc_processor) } - async fn has_offline_subkey_writes(&self) -> bool { + pub(super) async fn has_offline_subkey_writes(&self) -> bool { !self.inner.lock().await.offline_subkey_writes.is_empty() } - fn online_writes_ready(&self) -> bool { + pub(super) fn online_writes_ready(&self) -> bool { self.get_ready_rpc_processor().is_some() } @@ -382,8 +391,9 @@ impl StorageManager { // Validate schema schema.validate()?; + let schema_data = schema.compile(); - Ok(Self::get_key(&vcrypto, owner_key, schema)) + Ok(Self::get_key(&vcrypto, owner_key, &schema_data)) } /// Create a local record from scratch with a new owner key, open it, and return the opened descriptor @@ -1358,8 +1368,8 @@ impl StorageManager { None => { // If we don't have a local record yet, check to see if we have a remote record // if so, migrate it to a local record - let Some(v) = self - .move_remote_record_to_local(key, safety_selection) + let Some(v) = inner + .move_remote_record_to_local_inner(key, safety_selection) .await? else { // No remote record either diff --git a/veilid-core/src/storage_manager/record_store/mod.rs b/veilid-core/src/storage_manager/record_store/mod.rs index e05892f8..b99659b7 100644 --- a/veilid-core/src/storage_manager/record_store/mod.rs +++ b/veilid-core/src/storage_manager/record_store/mod.rs @@ -50,14 +50,13 @@ pub(super) struct RecordStore where D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>, { - registry: VeilidComponentRegistry, name: String, limits: RecordStoreLimits, /// The tabledb used for record data - record_table: Option, + record_table: TableDB, /// The tabledb used for subkey data - subkey_table: Option, + subkey_table: TableDB, /// The in-memory index that keeps track of what records are in the tabledb record_index: LruCache>, /// The in-memory cache of commonly accessed subkey data so we don't have to keep hitting the db @@ -86,7 +85,6 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("RecordStore") - //.field("table_store", &self.table_store) .field("name", &self.name) .field("limits", &self.limits) .field("record_table", &self.record_table) @@ -129,7 +127,11 @@ impl RecordStore where D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>, { - pub fn new(registry: VeilidComponentRegistry, name: &str, limits: RecordStoreLimits) -> Self { + pub async fn try_create( + table_store: &TableStore, + name: &str, + limits: RecordStoreLimits, + ) -> EyreResult { let subkey_cache_size = limits.subkey_cache_size; let limit_subkey_cache_total_size = limits .max_subkey_cache_memory_mb @@ -138,12 +140,14 @@ where .max_storage_space_mb .map(|mb| mb as u64 * 1_048_576u64); - Self { - registry, + let record_table = table_store.open(&format!("{}_records", name), 1).await?; + let subkey_table = table_store.open(&format!("{}_subkeys", name), 1).await?; + + let mut out = Self { name: name.to_owned(), limits, - record_table: None, - subkey_table: None, + record_table, + subkey_table, record_index: LruCache::new(limits.max_records.unwrap_or(usize::MAX)), subkey_cache: LruCache::new(subkey_cache_size), inspect_cache: InspectCache::new(subkey_cache_size), @@ -162,25 +166,20 @@ where watched_records: HashMap::new(), purge_dead_records_mutex: Arc::new(AsyncMutex::new(())), changed_watched_values: HashSet::new(), - } + }; + + out.setup().await?; + + Ok(out) } - pub async fn setup(&mut self) -> EyreResult<()> { - let record_table = self - .table_store - .open(&format!("{}_records", self.name), 1) - .await?; - let subkey_table = self - .table_store - .open(&format!("{}_subkeys", self.name), 1) - .await?; - + async fn setup(&mut self) -> EyreResult<()> { // Pull record index from table into a vector to ensure we sort them - let record_table_keys = record_table.get_keys(0).await?; + let record_table_keys = self.record_table.get_keys(0).await?; let mut record_index_saved: Vec<(RecordTableKey, Record)> = Vec::with_capacity(record_table_keys.len()); for rtk in record_table_keys { - if let Some(vr) = record_table.load_json::>(0, &rtk).await? { + if let Some(vr) = self.record_table.load_json::>(0, &rtk).await? { let rik = RecordTableKey::try_from(rtk.as_ref())?; record_index_saved.push((rik, vr)); } @@ -229,8 +228,6 @@ where self.dead_records.push(dr); } - self.record_table = Some(record_table); - self.subkey_table = Some(subkey_table); Ok(()) } @@ -309,11 +306,8 @@ where return; } - let record_table = self.record_table.clone().unwrap(); - let subkey_table = self.subkey_table.clone().unwrap(); - - let rt_xact = record_table.transact(); - let st_xact = subkey_table.transact(); + let rt_xact = self.record_table.transact(); + let st_xact = self.subkey_table.transact(); let dead_records = mem::take(&mut self.dead_records); for dr in dead_records { // Record should already be gone from index @@ -375,9 +369,7 @@ where return; } - let record_table = self.record_table.clone().unwrap(); - - let rt_xact = record_table.transact(); + let rt_xact = self.record_table.transact(); let changed_records = mem::take(&mut self.changed_records); for rtk in changed_records { // Get the changed record and save it to the table @@ -406,11 +398,6 @@ where apibail_internal!("record already exists"); } - // Get record table - let Some(record_table) = self.record_table.clone() else { - apibail_internal!("record store not initialized"); - }; - // If over size limit, dont create record self.total_storage_space .add((mem::size_of::() + record.total_size()) as u64) @@ -421,7 +408,7 @@ where } // Save to record table - record_table + self.record_table .store_json(0, &rtk.bytes(), &record) .await .map_err(VeilidAPIError::internal)?; @@ -577,11 +564,6 @@ where })); } - // Get subkey table - let Some(subkey_table) = self.subkey_table.clone() else { - apibail_internal!("record store not initialized"); - }; - // If subkey exists in subkey cache, use that let stk = SubkeyTableKey { key, subkey }; if let Some(record_data) = self.subkey_cache.get(&stk) { @@ -593,7 +575,8 @@ where })); } // If not in cache, try to pull from table store if it is in our stored subkey set - let Some(record_data) = subkey_table + let Some(record_data) = self + .subkey_table .load_json::(0, &stk.bytes()) .await .map_err(VeilidAPIError::internal)? @@ -649,11 +632,6 @@ where })); } - // Get subkey table - let Some(subkey_table) = self.subkey_table.clone() else { - apibail_internal!("record store not initialized"); - }; - // If subkey exists in subkey cache, use that let stk = SubkeyTableKey { key, subkey }; if let Some(record_data) = self.subkey_cache.peek(&stk) { @@ -665,7 +643,8 @@ where })); } // If not in cache, try to pull from table store if it is in our stored subkey set - let Some(record_data) = subkey_table + let Some(record_data) = self + .subkey_table .load_json::(0, &stk.bytes()) .await .map_err(VeilidAPIError::internal)? @@ -749,11 +728,6 @@ where apibail_invalid_argument!("subkey out of range", "subkey", subkey); } - // Get subkey table - let Some(subkey_table) = self.subkey_table.clone() else { - apibail_internal!("record store not initialized"); - }; - // Get the previous subkey and ensure we aren't going over the record size limit let mut prior_subkey_size = 0usize; @@ -765,7 +739,8 @@ where prior_subkey_size = record_data.data_size(); } else { // If not in cache, try to pull from table store - if let Some(record_data) = subkey_table + if let Some(record_data) = self + .subkey_table .load_json::(0, &stk_bytes) .await .map_err(VeilidAPIError::internal)? @@ -796,7 +771,7 @@ where } // Write subkey - subkey_table + self.subkey_table .store_json(0, &stk_bytes, &subkey_record_data) .await .map_err(VeilidAPIError::internal)?; @@ -835,11 +810,6 @@ where subkeys: ValueSubkeyRangeSet, want_descriptor: bool, ) -> VeilidAPIResult> { - // Get subkey table - let Some(subkey_table) = self.subkey_table.clone() else { - apibail_internal!("record store not initialized"); - }; - // Get record from index let Some((subkeys, opt_descriptor)) = self.with_record(key, |record| { // Get number of subkeys from schema and ensure we are getting the @@ -884,7 +854,8 @@ where } else { // If not in cache, try to pull from table store if it is in our stored subkey set // XXX: This would be better if it didn't have to pull the whole record data to get the seq. - if let Some(record_data) = subkey_table + if let Some(record_data) = self + .subkey_table .load_json::(0, &stk.bytes()) .await .map_err(VeilidAPIError::internal)? diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index d8ce7b48..e45b8c80 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -28,7 +28,7 @@ impl StorageManager { #[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_set_value( &self, - rpc_processor: RPCProcessor, + rpc_processor: &RPCProcessor, key: TypedKey, subkey: ValueSubkey, safety_selection: SafetySelection, @@ -333,7 +333,7 @@ impl StorageManager { // If more partial results show up, don't send an update until we're done return true; } - // If we processed the final result, possibly send an update + // If we processed the final result, possibly send an update // if the sequence number changed since our first partial update // Send with a max count as this is not attached to any watch let changed = { diff --git a/veilid-core/src/storage_manager/tasks/check_active_watches.rs b/veilid-core/src/storage_manager/tasks/check_active_watches.rs index 5500cd6b..4d6727cd 100644 --- a/veilid-core/src/storage_manager/tasks/check_active_watches.rs +++ b/veilid-core/src/storage_manager/tasks/check_active_watches.rs @@ -4,19 +4,18 @@ impl StorageManager { // Check if client-side watches on opened records either have dead nodes or if the watch has expired #[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn check_active_watches_task_routine( - self, + &self, _stop_token: StopToken, _last_ts: Timestamp, _cur_ts: Timestamp, ) -> EyreResult<()> { { let mut inner = self.inner.lock().await; - let Some(routing_table) = inner.opt_routing_table.clone() else { - return Ok(()); - }; + + let routing_table = self.routing_table(); let rss = routing_table.route_spec_store(); - let opt_update_callback = inner.update_callback.clone(); + let update_callback = self.update_callback(); let cur_ts = Timestamp::now(); for (k, v) in inner.opened_records.iter_mut() { @@ -50,15 +49,13 @@ impl StorageManager { if is_dead { v.clear_active_watch(); - if let Some(update_callback) = opt_update_callback.clone() { - // Send valuechange with dead count and no subkeys - update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { - key: *k, - subkeys: ValueSubkeyRangeSet::new(), - count: 0, - value: None, - }))); - } + // Send valuechange with dead count and no subkeys + update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { + key: *k, + subkeys: ValueSubkeyRangeSet::new(), + count: 0, + value: None, + }))); } } } diff --git a/veilid-core/src/storage_manager/tasks/flush_record_stores.rs b/veilid-core/src/storage_manager/tasks/flush_record_stores.rs index 8dbb3d41..295c1fb9 100644 --- a/veilid-core/src/storage_manager/tasks/flush_record_stores.rs +++ b/veilid-core/src/storage_manager/tasks/flush_record_stores.rs @@ -4,7 +4,7 @@ impl StorageManager { // Flush records stores to disk and remove dead records #[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn flush_record_stores_task_routine( - self, + &self, _stop_token: StopToken, _last_ts: Timestamp, _cur_ts: Timestamp, diff --git a/veilid-core/src/storage_manager/tasks/mod.rs b/veilid-core/src/storage_manager/tasks/mod.rs index a0d088eb..928184c6 100644 --- a/veilid-core/src/storage_manager/tasks/mod.rs +++ b/veilid-core/src/storage_manager/tasks/mod.rs @@ -11,61 +11,61 @@ impl StorageManager { // Set flush records tick task log_stor!(debug "starting flush record stores task"); { - let this = self.clone(); + let registry = self.registry(); self.flush_record_stores_task.set_routine(move |s, l, t| { - Box::pin(this.clone().flush_record_stores_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) + Box::pin(async move { + let this = registry.lookup::().unwrap(); + this.flush_record_stores_task_routine(s, Timestamp::new(l), Timestamp::new(t)) + .await + }) }); } // Set offline subkey writes tick task log_stor!(debug "starting offline subkey writes task"); { - let this = self.clone(); + let registry = self.registry(); self.offline_subkey_writes_task.set_routine(move |s, l, t| { - Box::pin(this.clone().offline_subkey_writes_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) + Box::pin(async move { + let this = registry.lookup::().unwrap(); + this.offline_subkey_writes_task_routine(s, Timestamp::new(l), Timestamp::new(t)) + .await + }) }); } // Set send value changes tick task log_stor!(debug "starting send value changes task"); { - let this = self.clone(); + let registry = self.registry(); self.send_value_changes_task.set_routine(move |s, l, t| { - Box::pin(this.clone().send_value_changes_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) + Box::pin(async move { + let this = registry.lookup::().unwrap(); + this.send_value_changes_task_routine(s, Timestamp::new(l), Timestamp::new(t)) + .await + }) }); } // Set check active watches tick task log_stor!(debug "starting check active watches task"); { - let this = self.clone(); + let registry = self.registry(); self.check_active_watches_task.set_routine(move |s, l, t| { - Box::pin(this.clone().check_active_watches_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) + Box::pin(async move { + let this = registry.lookup::().unwrap(); + this.check_active_watches_task_routine(s, Timestamp::new(l), Timestamp::new(t)) + .await + }) }); } // Set check watched records tick task log_stor!(debug "starting checked watched records task"); { - let this = self.clone(); + let registry = self.registry(); self.check_watched_records_task.set_routine(move |s, l, t| { - Box::pin(this.clone().check_watched_records_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) + Box::pin(async move { + let this = registry.lookup::().unwrap(); + this.check_watched_records_task_routine(s, Timestamp::new(l), Timestamp::new(t)) + .await + }) }); } } diff --git a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs index a7c66d09..abb2722f 100644 --- a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs +++ b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs @@ -35,19 +35,19 @@ impl StorageManager { // Write a single offline subkey #[instrument(level = "trace", target = "stor", skip_all, err)] async fn write_single_offline_subkey( - self, + &self, stop_token: StopToken, key: TypedKey, subkey: ValueSubkey, safety_selection: SafetySelection, ) -> EyreResult { - let Some(rpc_processor) = self.online_writes_ready().await? else { + let Some(rpc_processor) = self.get_ready_rpc_processor() else { // Cancel this operation because we're offline return Ok(OfflineSubkeyWriteResult::Cancelled); }; let get_result = { - let mut inner = self.lock().await?; - inner.handle_get_local_value(key, subkey, true).await + let mut inner = self.inner.lock().await; + Self::handle_get_local_value_inner(&mut *inner, key, subkey, true).await }; let Ok(get_result) = get_result else { log_stor!(debug "Offline subkey write had no subkey result: {}:{}", key, subkey); @@ -66,7 +66,7 @@ impl StorageManager { log_stor!(debug "Offline subkey write: {}:{} len={}", key, subkey, value.value_data().data().len()); let osvres = self .outbound_set_value( - rpc_processor, + &rpc_processor, key, subkey, safety_selection, @@ -88,15 +88,16 @@ impl StorageManager { // Set the new value if it differs from what was asked to set if result.signed_value_data.value_data() != value.value_data() { // Record the newer value and send and update since it is different than what we just set - let mut inner = self.lock().await?; - inner - .handle_set_local_value( - key, - subkey, - result.signed_value_data.clone(), - WatchUpdateMode::UpdateAll, - ) - .await?; + let mut inner = self.inner.lock().await; + + Self::handle_set_local_value_inner( + &mut *inner, + key, + subkey, + result.signed_value_data.clone(), + WatchUpdateMode::UpdateAll, + ) + .await?; } return Ok(OfflineSubkeyWriteResult::Finished(result)); @@ -120,7 +121,7 @@ impl StorageManager { // Write a set of subkeys of the same key #[instrument(level = "trace", target = "stor", skip_all, err)] async fn process_work_item( - self, + &self, stop_token: StopToken, work_item: WorkItem, ) -> EyreResult { @@ -133,7 +134,6 @@ impl StorageManager { } let result = match self - .clone() .write_single_offline_subkey( stop_token.clone(), work_item.key, @@ -217,7 +217,8 @@ impl StorageManager { } // Keep the list of nodes that returned a value for later reference - inner.process_fanout_results( + Self::process_fanout_results_inner( + inner, result.key, result.fanout_results.iter().map(|x| (x.0, &x.1)), true, @@ -226,7 +227,7 @@ impl StorageManager { #[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn process_offline_subkey_writes( - self, + &self, stop_token: StopToken, work_items: Arc>>, ) -> EyreResult<()> { @@ -236,11 +237,10 @@ impl StorageManager { break; }; let result = self - .clone() .process_work_item(stop_token.clone(), work_item) .await?; { - let mut inner = self.lock().await?; + let mut inner = self.inner.lock().await; Self::process_single_result_inner(&mut inner, result); } } @@ -251,7 +251,7 @@ impl StorageManager { // Best-effort write subkeys to the network that were written offline #[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn offline_subkey_writes_task_routine( - self, + &self, stop_token: StopToken, _last_ts: Timestamp, _cur_ts: Timestamp, @@ -272,7 +272,6 @@ impl StorageManager { // Process everything let res = self - .clone() .process_offline_subkey_writes(stop_token, work_items) .await; diff --git a/veilid-core/src/table_store/mod.rs b/veilid-core/src/table_store/mod.rs index 8490150d..21b5aacd 100644 --- a/veilid-core/src/table_store/mod.rs +++ b/veilid-core/src/table_store/mod.rs @@ -126,7 +126,7 @@ impl TableStore { } } - // Flush internal control state (must not use crypto) + // Flush internal control state async fn flush(&self) { let (all_table_names_value, all_tables_db) = { let inner = self.inner.lock(); @@ -421,80 +421,94 @@ impl TableStore { #[instrument(level = "trace", target = "tstore", skip_all)] async fn init_async(&self) -> EyreResult<()> { - let _async_guard = self.async_lock.lock().await; + { + let _async_guard = self.async_lock.lock().await; - // Get device encryption key from protected store - let mut device_encryption_key = self.load_device_encryption_key().await?; - let mut device_encryption_key_changed = false; - if let Some(device_encryption_key) = device_encryption_key { - // If encryption in current use is not the best encryption, then run table migration - let best_kind = best_crypto_kind(); - if device_encryption_key.kind != best_kind { - // XXX: Run migration. See issue #209 + // Get device encryption key from protected store + let mut device_encryption_key = self.load_device_encryption_key().await?; + let mut device_encryption_key_changed = false; + if let Some(device_encryption_key) = device_encryption_key { + // If encryption in current use is not the best encryption, then run table migration + let best_kind = best_crypto_kind(); + if device_encryption_key.kind != best_kind { + // XXX: Run migration. See issue #209 + } + } else { + // If we don't have an encryption key yet, then make one with the best cryptography and save it + let best_kind = best_crypto_kind(); + let mut shared_secret = SharedSecret::default(); + random_bytes(&mut shared_secret.bytes); + + device_encryption_key = Some(TypedSharedSecret::new(best_kind, shared_secret)); + device_encryption_key_changed = true; } - } else { - // If we don't have an encryption key yet, then make one with the best cryptography and save it - let best_kind = best_crypto_kind(); - let mut shared_secret = SharedSecret::default(); - random_bytes(&mut shared_secret.bytes); - device_encryption_key = Some(TypedSharedSecret::new(best_kind, shared_secret)); - device_encryption_key_changed = true; - } + // Check for password change + let changing_password = self.config().with(|c| { + c.protected_store + .new_device_encryption_key_password + .is_some() + }); - // Check for password change - let changing_password = self.config().with(|c| { - c.protected_store - .new_device_encryption_key_password - .is_some() - }); + // Save encryption key if it has changed or if the protecting password wants to change + if device_encryption_key_changed || changing_password { + self.save_device_encryption_key(device_encryption_key) + .await?; + } - // Save encryption key if it has changed or if the protecting password wants to change - if device_encryption_key_changed || changing_password { - self.save_device_encryption_key(device_encryption_key) - .await?; - } - - // Deserialize all table names - let all_tables_db = self - .table_store_driver - .open("__veilid_all_tables", 1) - .await - .wrap_err("failed to create all tables table")?; - match all_tables_db.get(0, ALL_TABLE_NAMES).await { - Ok(Some(v)) => match deserialize_json_bytes::>(&v) { - Ok(all_table_names) => { - let mut inner = self.inner.lock(); - inner.all_table_names = all_table_names; + // Deserialize all table names + let all_tables_db = self + .table_store_driver + .open("__veilid_all_tables", 1) + .await + .wrap_err("failed to create all tables table")?; + match all_tables_db.get(0, ALL_TABLE_NAMES).await { + Ok(Some(v)) => match deserialize_json_bytes::>(&v) { + Ok(all_table_names) => { + let mut inner = self.inner.lock(); + inner.all_table_names = all_table_names; + } + Err(e) => { + error!("could not deserialize __veilid_all_tables: {}", e); + } + }, + Ok(None) => { + // No table names yet, that's okay + log_tstore!("__veilid_all_tables is empty"); } Err(e) => { - error!("could not deserialize __veilid_all_tables: {}", e); + error!("could not get __veilid_all_tables: {}", e); } - }, - Ok(None) => { - // No table names yet, that's okay - log_tstore!("__veilid_all_tables is empty"); - } - Err(e) => { - error!("could not get __veilid_all_tables: {}", e); - } - }; + }; - { - let mut inner = self.inner.lock(); - inner.encryption_key = device_encryption_key; - inner.all_tables_db = Some(all_tables_db); + { + let mut inner = self.inner.lock(); + inner.encryption_key = device_encryption_key; + inner.all_tables_db = Some(all_tables_db); + } + + let do_delete = self.config().with(|c| c.table_store.delete); + + if do_delete { + self.delete_all().await; + } } - let do_delete = self.config().with(|c| c.table_store.delete); - - if do_delete { - self.delete_all().await; - } + // Set up crypto + let crypto = self.crypto(); + crypto.table_store_setup(self).await?; Ok(()) } + #[instrument(level = "trace", target = "tstore", skip_all)] + async fn post_init_async(&self) -> EyreResult<()> { + Ok(()) + } + + #[instrument(level = "trace", target = "tstore", skip_all)] + async fn pre_terminate_async(&self) {} + #[instrument(level = "trace", target = "tstore", skip_all)] async fn terminate_async(&self) { let _async_guard = self.async_lock.lock().await;