diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 665c6c6d..b38dd2a0 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -3,7 +3,6 @@ mod get_value; mod inspect_value; mod record_store; mod set_value; -mod storage_manager_inner; mod tasks; mod types; mod watch_value; @@ -12,9 +11,9 @@ use super::*; use record_store::*; use routing_table::*; use rpc_processor::*; -use storage_manager_inner::*; pub use record_store::{WatchParameters, WatchResult}; + pub use types::*; /// The maximum size of a single subkey @@ -31,6 +30,10 @@ const SEND_VALUE_CHANGES_INTERVAL_SECS: u32 = 1; const CHECK_ACTIVE_WATCHES_INTERVAL_SECS: u32 = 1; /// Frequency to check for expired server-side watched records const CHECK_WATCHED_RECORDS_INTERVAL_SECS: u32 = 1; +/// Table store table for storage manager metadata +const STORAGE_MANAGER_METADATA: &str = "storage_manager_metadata"; +/// Storage manager metadata key name for offline subkey write persistence +const OFFLINE_SUBKEY_WRITES: &[u8] = b"offline_subkey_writes"; #[derive(Debug, Clone)] /// A single 'value changed' message to send @@ -43,7 +46,44 @@ struct ValueChangedInfo { value: Option>, } -struct StorageManagerUnlockedInner { +/// Locked structure for storage manager +#[derive(Default)] +struct StorageManagerInner { + /// Records that have been 'opened' and are not yet closed + pub opened_records: HashMap, + /// Records that have ever been 'created' or 'opened' by this node, things we care about that we must republish to keep alive + pub local_record_store: Option>, + /// Records that have been pushed to this node for distribution by other nodes, that we make an effort to republish + pub remote_record_store: Option>, + /// Record subkeys that have not been pushed to the network because they were written to offline + pub offline_subkey_writes: HashMap, + /// Storage manager metadata that is persistent, including copy of offline subkey writes + pub metadata_db: Option, + /// Background processing task (not part of attachment manager tick tree so it happens when detached too) + pub tick_future: Option>, + /// Deferred result processor + pub deferred_result_processor: DeferredStreamProcessor, +} + +impl fmt::Debug for StorageManagerInner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("StorageManagerInner") + // .field("unlocked_inner", &self.unlocked_inner) + .field("opened_records", &self.opened_records) + .field("local_record_store", &self.local_record_store) + .field("remote_record_store", &self.remote_record_store) + .field("offline_subkey_writes", &self.offline_subkey_writes) + //.field("metadata_db", &self.metadata_db) + //.field("tick_future", &self.tick_future) + .field("deferred_result_processor", &self.deferred_result_processor) + .finish() + } +} + +pub(crate) struct StorageManager { + registry: VeilidComponentRegistry, + inner: Arc>, + // Background processes flush_record_stores_task: TickTask, offline_subkey_writes_task: TickTask, @@ -55,9 +95,11 @@ struct StorageManagerUnlockedInner { anonymous_watch_keys: TypedKeyPairGroup, } -impl fmt::Debug for StorageManagerUnlockedInner { +impl fmt::Debug for StorageManager { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("StorageManagerUnlockedInner") + f.debug_struct("StorageManager") + .field("registry", &self.registry) + .field("inner", &self.inner) // .field("flush_record_stores_task", &self.flush_record_stores_task) // .field( // "offline_subkey_writes_task", @@ -74,17 +116,16 @@ impl fmt::Debug for StorageManagerUnlockedInner { } } -#[derive(Clone, Debug)] -pub(crate) struct StorageManager { - registry: VeilidComponentRegistry, - unlocked_inner: Arc, - inner: Arc>, -} - impl_veilid_component!(StorageManager); impl StorageManager { - fn new_unlocked_inner(crypto: Crypto) -> StorageManagerUnlockedInner { + fn new_inner() -> StorageManagerInner { + StorageManagerInner::default() + } + + pub fn new(registry: VeilidComponentRegistry) -> StorageManager { + let crypto = registry.lookup::().unwrap(); + // Generate keys to use for anonymous watches let mut anonymous_watch_keys = TypedKeyPairGroup::new(); for ck in VALID_CRYPTO_KINDS { @@ -93,7 +134,11 @@ impl StorageManager { anonymous_watch_keys.add(TypedKeyPair::new(ck, kp)); } - StorageManagerUnlockedInner { + let inner = Self::new_inner(); + let this = StorageManager { + registry, + inner: Arc::new(AsyncMutex::new(inner)), + flush_record_stores_task: TickTask::new( "flush_record_stores_task", FLUSH_RECORD_STORES_INTERVAL_SECS, @@ -116,19 +161,6 @@ impl StorageManager { ), anonymous_watch_keys, - } - } - fn new_inner(unlocked_inner: Arc) -> StorageManagerInner { - StorageManagerInner::new(unlocked_inner) - } - - pub fn new(registry: VeilidComponentRegistry) -> StorageManager { - let crypto = registry.lookup::().unwrap(); - let unlocked_inner = Arc::new(Self::new_unlocked_inner(crypto)); - let this = StorageManager { - registry, - unlocked_inner: unlocked_inner.clone(), - inner: Arc::new(AsyncMutex::new(Self::new_inner(unlocked_inner))), }; this.setup_tasks(); @@ -136,12 +168,83 @@ impl StorageManager { this } + fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { + let c = config.get(); + RecordStoreLimits { + subkey_cache_size: c.network.dht.local_subkey_cache_size as usize, + max_subkey_size: MAX_SUBKEY_SIZE, + max_record_total_size: MAX_RECORD_DATA_SIZE, + max_records: None, + max_subkey_cache_memory_mb: Some( + c.network.dht.local_max_subkey_cache_memory_mb as usize, + ), + max_storage_space_mb: None, + public_watch_limit: c.network.dht.public_watch_limit as usize, + member_watch_limit: c.network.dht.member_watch_limit as usize, + max_watch_expiration: TimestampDuration::new(ms_to_us( + c.network.dht.max_watch_expiration_ms, + )), + min_watch_expiration: TimestampDuration::new(ms_to_us(c.network.rpc.timeout_ms)), + } + } + + fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { + let c = config.get(); + RecordStoreLimits { + subkey_cache_size: c.network.dht.remote_subkey_cache_size as usize, + max_subkey_size: MAX_SUBKEY_SIZE, + max_record_total_size: MAX_RECORD_DATA_SIZE, + max_records: Some(c.network.dht.remote_max_records as usize), + max_subkey_cache_memory_mb: Some( + c.network.dht.remote_max_subkey_cache_memory_mb as usize, + ), + max_storage_space_mb: Some(c.network.dht.remote_max_storage_space_mb as usize), + public_watch_limit: c.network.dht.public_watch_limit as usize, + member_watch_limit: c.network.dht.member_watch_limit as usize, + max_watch_expiration: TimestampDuration::new(ms_to_us( + c.network.dht.max_watch_expiration_ms, + )), + min_watch_expiration: TimestampDuration::new(ms_to_us(c.network.rpc.timeout_ms)), + } + } + #[instrument(level = "debug", skip_all, err)] async fn init_async(&self) -> EyreResult<()> { log_stor!(debug "startup storage manager"); + let table_store = self.table_store(); + let config = self.config(); + + let metadata_db = table_store.open(STORAGE_MANAGER_METADATA, 1).await?; + + let local_limits = Self::local_limits_from_config(config.clone()); + let remote_limits = Self::remote_limits_from_config(config.clone()); + + let mut local_record_store = RecordStore::new(self.registry(), "local", local_limits); + local_record_store.setup().await?; + + let mut remote_record_store = RecordStore::new(self.registry(), "remote", remote_limits); + remote_record_store.setup().await?; let mut inner = self.inner.lock().await; - inner.init(self.clone()).await?; + inner.metadata_db = Some(metadata_db); + inner.local_record_store = Some(local_record_store); + inner.remote_record_store = Some(remote_record_store); + Self::load_metadata(&mut *inner).await?; + + // Start deferred results processors + inner.deferred_result_processor.init().await; + + // Schedule tick + let tick_future = interval("storage manager tick", 1000, move || { + let registry = self.registry(); + async move { + let this = registry.lookup::().unwrap(); + if let Err(e) = this.tick().await { + log_stor!(warn "storage manager tick failed: {}", e); + } + } + }); + inner.tick_future = Some(tick_future); Ok(()) } @@ -153,7 +256,11 @@ impl StorageManager { // Stop the background ticker process { let mut inner = self.inner.lock().await; - inner.stop_ticker().await; + // Stop ticker + let tick_future = inner.tick_future.take(); + if let Some(f) = tick_future { + f.await; + } } // Cancel all tasks @@ -162,35 +269,83 @@ impl StorageManager { // Terminate and release the storage manager { let mut inner = self.inner.lock().await; - inner.terminate().await; - *inner = Self::new_inner(self.unlocked_inner.clone()); + + // Stop deferred result processor + inner.deferred_result_processor.terminate().await; + + // Final flush on record stores + if let Some(mut local_record_store) = inner.local_record_store.take() { + if let Err(e) = local_record_store.flush().await { + log_stor!(error "termination local record store tick failed: {}", e); + } + } + if let Some(mut remote_record_store) = inner.remote_record_store.take() { + if let Err(e) = remote_record_store.flush().await { + log_stor!(error "termination remote record store tick failed: {}", e); + } + } + + // Save metadata + if let Err(e) = Self::save_metadata(&mut *inner).await { + log_stor!(error "termination metadata save failed: {}", e); + } + + // Reset inner state + *inner = Self::new_inner(); } log_stor!(debug "finished storage manager shutdown"); } - async fn lock(&self) -> VeilidAPIResult> { - let inner = asyncmutex_lock_arc!(&self.inner); - if !inner.initialized { - apibail_not_initialized!(); + async fn save_metadata(inner: &StorageManagerInner) -> EyreResult<()> { + if let Some(metadata_db) = &inner.metadata_db { + let tx = metadata_db.transact(); + tx.store_json(0, OFFLINE_SUBKEY_WRITES, &inner.offline_subkey_writes)?; + tx.commit().await.wrap_err("failed to commit")? } - Ok(inner) + Ok(()) } - fn online_ready_inner(inner: &StorageManagerInner) -> Option { - let routing_table = inner.opt_routing_table.clone()?; + async fn load_metadata(inner: &mut StorageManagerInner) -> EyreResult<()> { + if let Some(metadata_db) = &inner.metadata_db { + inner.offline_subkey_writes = match metadata_db + .load_json(0, OFFLINE_SUBKEY_WRITES) + .await + { + Ok(v) => v.unwrap_or_default(), + Err(_) => { + if let Err(e) = metadata_db.delete(0, OFFLINE_SUBKEY_WRITES).await { + log_stor!(debug "offline_subkey_writes format changed, clearing: {}", e); + } + Default::default() + } + } + } + Ok(()) + } + + fn get_ready_rpc_processor(&self) -> Option> { + let Some(rpc_processor) = self.registry().lookup::() else { + return None; + }; + + // Check if we have published peer info + // Note, this is a best-effort check, subject to race conditions on the network's state + let Some(routing_table) = self.registry().lookup::() else { + return None; + }; routing_table.get_published_peer_info(RoutingDomain::PublicInternet)?; - inner.opt_rpc_processor.clone() + + // Return the RPC processor if we think we're ready to send messages + Some(rpc_processor) } - async fn online_writes_ready(&self) -> EyreResult> { - let inner = self.lock().await?; - Ok(Self::online_ready_inner(&inner)) + async fn has_offline_subkey_writes(&self) -> bool { + !self.inner.lock().await.offline_subkey_writes.is_empty() } - async fn has_offline_subkey_writes(&self) -> EyreResult { - let inner = self.lock().await?; - Ok(!inner.offline_subkey_writes.is_empty()) + fn online_writes_ready(&self) -> bool { + self.get_ready_rpc_processor().is_some() } /// Get the set of nodes in our active watches @@ -213,16 +368,22 @@ impl StorageManager { /// Builds the record key for a given schema and owner #[instrument(level = "trace", target = "stor", skip_all)] - pub async fn get_record_key( + pub fn get_record_key( &self, kind: CryptoKind, schema: DHTSchema, owner_key: &PublicKey, ) -> VeilidAPIResult { - let inner = self.lock().await?; + // Get cryptosystem + let crypto = self.crypto(); + let Some(vcrypto) = crypto.get(kind) else { + apibail_generic!("unsupported cryptosystem"); + }; + + // Validate schema schema.validate()?; - inner.get_record_key(kind, owner_key, schema).await + Ok(Self::get_key(&vcrypto, owner_key, schema)) } /// Create a local record from scratch with a new owner key, open it, and return the opened descriptor @@ -233,12 +394,15 @@ impl StorageManager { owner: Option, safety_selection: SafetySelection, ) -> VeilidAPIResult { - let mut inner = self.lock().await?; + // Validate schema schema.validate()?; + // Lock access to the record stores + let mut inner = self.inner.lock().await; + // Create a new owned local record from scratch - let (key, owner) = inner - .create_new_owned_local_record(kind, schema, owner, safety_selection) + let (key, owner) = self + .create_new_owned_local_record_inner(&mut *inner, kind, schema, owner, safety_selection) .await?; // Now that the record is made we should always succeed to open the existing record @@ -270,7 +434,7 @@ impl StorageManager { // No record yet, try to get it from the network // Get rpc processor and drop mutex so we don't block while getting the value from the network - let Some(rpc_processor) = Self::online_ready_inner(&inner) else { + let Some(rpc_processor) = Self::get_ready_rpc_processor(&inner) else { apibail_try_again!("offline, try again later"); }; @@ -340,7 +504,10 @@ impl StorageManager { pub async fn close_record(&self, key: TypedKey) -> VeilidAPIResult<()> { let (opt_opened_record, opt_rpc_processor) = { let mut inner = self.lock().await?; - (inner.close_record(key)?, Self::online_ready_inner(&inner)) + ( + inner.close_record(key)?, + Self::get_ready_rpc_processor(&inner), + ) }; // Send a one-time cancel request for the watch if we have one and we're online @@ -432,7 +599,7 @@ impl StorageManager { // Refresh if we can // Get rpc processor and drop mutex so we don't block while getting the value from the network - let Some(rpc_processor) = Self::online_ready_inner(&inner) else { + let Some(rpc_processor) = Self::get_ready_rpc_processor(&inner) else { // Return the existing value if we have one if we aren't online if let Some(last_get_result_value) = last_get_result.opt_value { return Ok(Some(last_get_result_value.value_data().clone())); @@ -500,7 +667,8 @@ impl StorageManager { let mut inner = self.lock().await?; // Get cryptosystem - let Some(vcrypto) = self.crypto().get(key.kind) else { + let crypto = self.crypto(); + let Some(vcrypto) = crypto.get(key.kind) else { apibail_generic!("unsupported cryptosystem"); }; @@ -557,7 +725,7 @@ impl StorageManager { value_data, descriptor.owner(), subkey, - vcrypto, + &vcrypto, writer.secret, )?); @@ -573,7 +741,7 @@ impl StorageManager { .await?; // Get rpc processor and drop mutex so we don't block while getting the value from the network - let Some(rpc_processor) = Self::online_ready_inner(&inner) else { + let Some(rpc_processor) = Self::get_ready_rpc_processor(&inner) else { log_stor!(debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() ); // Add to offline writes to flush inner.add_offline_subkey_write(key, subkey, safety_selection); @@ -685,7 +853,7 @@ impl StorageManager { let subkeys = schema.truncate_subkeys(&subkeys, None); // Get rpc processor and drop mutex so we don't block while requesting the watch from the network - let Some(rpc_processor) = Self::online_ready_inner(&inner) else { + let Some(rpc_processor) = Self::get_ready_rpc_processor(&inner) else { apibail_try_again!("offline, try again later"); }; @@ -880,7 +1048,7 @@ impl StorageManager { } // Get rpc processor and drop mutex so we don't block while getting the value from the network - let Some(rpc_processor) = Self::online_ready_inner(&inner) else { + let Some(rpc_processor) = Self::get_ready_rpc_processor(&inner) else { apibail_try_again!("offline, try again later"); }; @@ -950,7 +1118,7 @@ impl StorageManager { async fn send_value_change(&self, vc: ValueChangedInfo) -> VeilidAPIResult<()> { let rpc_processor = { let inner = self.inner.lock().await; - if let Some(rpc_processor) = Self::online_ready_inner(&inner) { + if let Some(rpc_processor) = Self::get_ready_rpc_processor(&inner) { rpc_processor.clone() } else { apibail_try_again!("network is not available"); @@ -975,27 +1143,20 @@ impl StorageManager { // Send a value change up through the callback #[instrument(level = "trace", target = "stor", skip(self, value), err)] - async fn update_callback_value_change( + fn update_callback_value_change( &self, key: TypedKey, subkeys: ValueSubkeyRangeSet, count: u32, value: Option, - ) -> Result<(), VeilidAPIError> { - let opt_update_callback = { - let inner = self.lock().await?; - inner.update_callback.clone() - }; - - if let Some(update_callback) = opt_update_callback { - update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { - key, - subkeys, - count, - value, - }))); - } - Ok(()) + ) { + let update_callback = self.update_callback(); + update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { + key, + subkeys, + count, + value, + }))); } #[instrument(level = "trace", target = "stor", skip_all)] @@ -1044,4 +1205,551 @@ impl StorageManager { FanoutResultKind::Finished => false, } } + + //////////////////////////////////////////////////////////////////////// + #[instrument(level = "trace", target = "stor", skip_all, err)] + async fn create_new_owned_local_record_inner( + &self, + inner: &mut StorageManagerInner, + kind: CryptoKind, + schema: DHTSchema, + owner: Option, + safety_selection: SafetySelection, + ) -> VeilidAPIResult<(TypedKey, KeyPair)> { + // Get cryptosystem + let crypto = self.crypto(); + let Some(vcrypto) = crypto.get(kind) else { + apibail_generic!("unsupported cryptosystem"); + }; + + // Get local record store + let Some(local_record_store) = inner.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + + // Verify the dht schema does not contain the node id + { + let config = self.config(); + let cfg = config.get(); + if let Some(node_id) = cfg.network.routing_table.node_id.get(kind) { + if schema.is_member(&node_id.value) { + apibail_invalid_argument!( + "node id can not be schema member", + "schema", + node_id.value + ); + } + } + } + + // Compile the dht schema + let schema_data = schema.compile(); + + // New values require a new owner key if not given + let owner = owner.unwrap_or_else(|| vcrypto.generate_keypair()); + + // Calculate dht key + let dht_key = Self::get_key(&vcrypto, &owner.key, &schema_data); + + // Make a signed value descriptor for this dht value + let signed_value_descriptor = Arc::new(SignedValueDescriptor::make_signature( + owner.key, + schema_data, + vcrypto.clone(), + owner.secret, + )?); + + // Add new local value record + let cur_ts = Timestamp::now(); + let local_record_detail = LocalRecordDetail::new(safety_selection); + let record = + Record::::new(cur_ts, signed_value_descriptor, local_record_detail)?; + + local_record_store.new_record(dht_key, record).await?; + + Ok((dht_key, owner)) + } + + #[instrument(level = "trace", target = "stor", skip_all, err)] + async fn move_remote_record_to_local_inner( + inner: &mut StorageManagerInner, + key: TypedKey, + safety_selection: SafetySelection, + ) -> VeilidAPIResult> { + // Get local record store + let Some(local_record_store) = inner.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + + // Get remote record store + let Some(remote_record_store) = inner.remote_record_store.as_mut() else { + apibail_not_initialized!(); + }; + + let rcb = |r: &Record| { + // Return record details + r.clone() + }; + let Some(remote_record) = remote_record_store.with_record(key, rcb) else { + // No local or remote record found, return None + return Ok(None); + }; + + // Make local record + let cur_ts = Timestamp::now(); + let local_record = Record::new( + cur_ts, + remote_record.descriptor().clone(), + LocalRecordDetail::new(safety_selection), + )?; + local_record_store.new_record(key, local_record).await?; + + // Move copy subkey data from remote to local store + for subkey in remote_record.stored_subkeys().iter() { + let Some(get_result) = remote_record_store.get_subkey(key, subkey, false).await? else { + // Subkey was missing + warn!("Subkey was missing: {} #{}", key, subkey); + continue; + }; + let Some(subkey_data) = get_result.opt_value else { + // Subkey was missing + warn!("Subkey data was missing: {} #{}", key, subkey); + continue; + }; + local_record_store + .set_subkey(key, subkey, subkey_data, WatchUpdateMode::NoUpdate) + .await?; + } + + // Move watches + local_record_store.move_watches(key, remote_record_store.move_watches(key, None)); + + // Delete remote record from store + remote_record_store.delete_record(key).await?; + + // Return record information as transferred to local record + Ok(Some((*remote_record.owner(), remote_record.schema()))) + } + + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub async fn open_existing_record_inner( + inner: &mut StorageManagerInner, + key: TypedKey, + writer: Option, + safety_selection: SafetySelection, + ) -> VeilidAPIResult> { + // Get local record store + let Some(local_record_store) = inner.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + + // See if we have a local record already or not + let cb = |r: &mut Record| { + // Process local record + + // Keep the safety selection we opened the record with + r.detail_mut().safety_selection = safety_selection; + + // Return record details + (*r.owner(), r.schema()) + }; + let (owner, schema) = match local_record_store.with_record_mut(key, cb) { + Some(v) => v, + None => { + // If we don't have a local record yet, check to see if we have a remote record + // if so, migrate it to a local record + let Some(v) = self + .move_remote_record_to_local(key, safety_selection) + .await? + else { + // No remote record either + return Ok(None); + }; + v + } + }; + // Had local record + + // If the writer we chose is also the owner, we have the owner secret + // Otherwise this is just another subkey writer + let owner_secret = if let Some(writer) = writer { + if writer.key == owner { + Some(writer.secret) + } else { + None + } + } else { + None + }; + + // Write open record + inner + .opened_records + .entry(key) + .and_modify(|e| { + e.set_writer(writer); + e.set_safety_selection(safety_selection); + }) + .or_insert_with(|| OpenedRecord::new(writer, safety_selection)); + + // Make DHT Record Descriptor to return + let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema); + Ok(Some(descriptor)) + } + + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub async fn open_new_record_inner( + inner: &mut StorageManagerInner, + key: TypedKey, + writer: Option, + subkey: ValueSubkey, + get_result: GetResult, + safety_selection: SafetySelection, + ) -> VeilidAPIResult { + // Ensure the record is closed + if inner.opened_records.contains_key(&key) { + panic!("new record should never be opened at this point"); + } + + // Must have descriptor + let Some(signed_value_descriptor) = get_result.opt_descriptor else { + // No descriptor for new record, can't store this + apibail_generic!("no descriptor"); + }; + // Get owner + let owner = *signed_value_descriptor.owner(); + + // If the writer we chose is also the owner, we have the owner secret + // Otherwise this is just another subkey writer + let owner_secret = if let Some(writer) = writer { + if writer.key == owner { + Some(writer.secret) + } else { + None + } + } else { + None + }; + let schema = signed_value_descriptor.schema()?; + + // Get local record store + let Some(local_record_store) = inner.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + + // Make and store a new record for this descriptor + let record = Record::::new( + Timestamp::now(), + signed_value_descriptor, + LocalRecordDetail::new(safety_selection), + )?; + local_record_store.new_record(key, record).await?; + + // If we got a subkey with the getvalue, it has already been validated against the schema, so store it + if let Some(signed_value_data) = get_result.opt_value { + // Write subkey to local store + local_record_store + .set_subkey(key, subkey, signed_value_data, WatchUpdateMode::NoUpdate) + .await?; + } + + // Write open record + inner + .opened_records + .insert(key, OpenedRecord::new(writer, safety_selection)); + + // Make DHT Record Descriptor to return + let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema); + Ok(descriptor) + } + + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub fn get_value_nodes_inner( + inner: &mut StorageManagerInner, + key: TypedKey, + ) -> VeilidAPIResult>> { + // Get local record store + let Some(local_record_store) = inner.local_record_store.as_ref() else { + apibail_not_initialized!(); + }; + + // Get routing table to see if we still know about these nodes + let Some(routing_table) = self.opt_rpc_processor.as_ref().map(|r| r.routing_table()) else { + apibail_try_again!("offline, try again later"); + }; + + let opt_value_nodes = local_record_store.peek_record(key, |r| { + let d = r.detail(); + d.nodes + .keys() + .copied() + .filter_map(|x| { + routing_table + .lookup_node_ref(TypedKey::new(key.kind, x)) + .ok() + .flatten() + }) + .collect() + }); + + Ok(opt_value_nodes) + } + + #[instrument(level = "trace", target = "stor", skip_all)] + pub(super) fn process_fanout_results_inner< + 'a, + I: IntoIterator, + >( + inner: &mut StorageManagerInner, + key: TypedKey, + subkey_results_iter: I, + is_set: bool, + ) { + // Get local record store + let local_record_store = inner.local_record_store.as_mut().unwrap(); + + let cur_ts = Timestamp::now(); + local_record_store.with_record_mut(key, |r| { + let d = r.detail_mut(); + + for (subkey, fanout_result) in subkey_results_iter { + for node_id in fanout_result + .value_nodes + .iter() + .filter_map(|x| x.node_ids().get(key.kind).map(|k| k.value)) + { + let pnd = d.nodes.entry(node_id).or_default(); + if is_set || pnd.last_set == Timestamp::default() { + pnd.last_set = cur_ts; + } + pnd.last_seen = cur_ts; + pnd.subkeys.insert(subkey); + } + } + + // Purge nodes down to the N most recently seen, where N is the consensus count for a set operation + let mut nodes_ts = d + .nodes + .iter() + .map(|kv| (*kv.0, kv.1.last_seen)) + .collect::>(); + nodes_ts.sort_by(|a, b| b.1.cmp(&a.1)); + + for dead_node_key in nodes_ts.iter().skip(self.set_consensus_count) { + d.nodes.remove(&dead_node_key.0); + } + }); + } + + pub fn close_record_inner( + inner: &mut StorageManagerInner, + key: TypedKey, + ) -> VeilidAPIResult> { + let Some(local_record_store) = inner.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + if local_record_store.peek_record(key, |_| {}).is_none() { + return Err(VeilidAPIError::key_not_found(key)); + } + + Ok(inner.opened_records.remove(&key)) + } + + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub(super) async fn handle_get_local_value_inner( + inner: &mut StorageManagerInner, + key: TypedKey, + subkey: ValueSubkey, + want_descriptor: bool, + ) -> VeilidAPIResult { + // See if it's in the local record store + let Some(local_record_store) = inner.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + if let Some(get_result) = local_record_store + .get_subkey(key, subkey, want_descriptor) + .await? + { + return Ok(get_result); + } + + Ok(GetResult { + opt_value: None, + opt_descriptor: None, + }) + } + + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub(super) async fn handle_set_local_value_inner( + inner: &mut StorageManagerInner, + key: TypedKey, + subkey: ValueSubkey, + signed_value_data: Arc, + watch_update_mode: WatchUpdateMode, + ) -> VeilidAPIResult<()> { + // See if it's in the local record store + let Some(local_record_store) = inner.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + + // Write subkey to local store + local_record_store + .set_subkey(key, subkey, signed_value_data, watch_update_mode) + .await?; + + Ok(()) + } + + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub(super) async fn handle_inspect_local_value_inner( + inner: &mut StorageManagerInner, + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + want_descriptor: bool, + ) -> VeilidAPIResult { + // See if it's in the local record store + let Some(local_record_store) = inner.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + if let Some(inspect_result) = local_record_store + .inspect_record(key, subkeys, want_descriptor) + .await? + { + return Ok(inspect_result); + } + + Ok(InspectResult { + subkeys: ValueSubkeyRangeSet::new(), + seqs: vec![], + opt_descriptor: None, + }) + } + + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub(super) async fn handle_get_remote_value_inner( + inner: &mut StorageManagerInner, + key: TypedKey, + subkey: ValueSubkey, + want_descriptor: bool, + ) -> VeilidAPIResult { + // See if it's in the remote record store + let Some(remote_record_store) = inner.remote_record_store.as_mut() else { + apibail_not_initialized!(); + }; + if let Some(get_result) = remote_record_store + .get_subkey(key, subkey, want_descriptor) + .await? + { + return Ok(get_result); + } + + Ok(GetResult { + opt_value: None, + opt_descriptor: None, + }) + } + + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub(super) async fn handle_set_remote_value_inner( + inner: &mut StorageManagerInner, + key: TypedKey, + subkey: ValueSubkey, + signed_value_data: Arc, + signed_value_descriptor: Arc, + watch_update_mode: WatchUpdateMode, + ) -> VeilidAPIResult<()> { + // See if it's in the remote record store + let Some(remote_record_store) = inner.remote_record_store.as_mut() else { + apibail_not_initialized!(); + }; + + // See if we have a remote record already or not + if remote_record_store.with_record(key, |_| {}).is_none() { + // record didn't exist, make it + let cur_ts = Timestamp::now(); + let remote_record_detail = RemoteRecordDetail {}; + let record = Record::::new( + cur_ts, + signed_value_descriptor, + remote_record_detail, + )?; + remote_record_store.new_record(key, record).await? + }; + + // Write subkey to remote store + remote_record_store + .set_subkey(key, subkey, signed_value_data, watch_update_mode) + .await?; + + Ok(()) + } + + #[instrument(level = "trace", target = "stor", skip_all, err)] + pub(super) async fn handle_inspect_remote_value_inner( + inner: &mut StorageManagerInner, + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + want_descriptor: bool, + ) -> VeilidAPIResult { + // See if it's in the local record store + let Some(remote_record_store) = inner.remote_record_store.as_mut() else { + apibail_not_initialized!(); + }; + if let Some(inspect_result) = remote_record_store + .inspect_record(key, subkeys, want_descriptor) + .await? + { + return Ok(inspect_result); + } + + Ok(InspectResult { + subkeys: ValueSubkeyRangeSet::new(), + seqs: vec![], + opt_descriptor: None, + }) + } + + fn get_key( + vcrypto: &CryptoSystemGuard<'_>, + owner_key: &PublicKey, + schema_data: &[u8], + ) -> TypedKey { + let mut hash_data = Vec::::with_capacity(PUBLIC_KEY_LENGTH + 4 + schema_data.len()); + hash_data.extend_from_slice(&vcrypto.kind().0); + hash_data.extend_from_slice(&owner_key.bytes); + hash_data.extend_from_slice(schema_data); + let hash = vcrypto.generate_hash(&hash_data); + TypedKey::new(vcrypto.kind(), hash) + } + + #[instrument(level = "trace", target = "stor", skip_all)] + pub(super) fn add_offline_subkey_write_inner( + inner: &mut StorageManagerInner, + key: TypedKey, + subkey: ValueSubkey, + safety_selection: SafetySelection, + ) { + inner + .offline_subkey_writes + .entry(key) + .and_modify(|x| { + x.subkeys.insert(subkey); + }) + .or_insert(tasks::offline_subkey_writes::OfflineSubkeyWrite { + safety_selection, + subkeys: ValueSubkeyRangeSet::single(subkey), + subkeys_in_flight: ValueSubkeyRangeSet::new(), + }); + } + + #[instrument(level = "trace", target = "stor", skip_all)] + pub(super) fn process_deferred_results_inner( + inner: &mut StorageManagerInner, + receiver: flume::Receiver, + handler: impl FnMut(T) -> SendPinBoxFuture + Send + 'static, + ) -> bool { + inner + .deferred_result_processor + .add(receiver.into_stream(), handler) + } } diff --git a/veilid-core/src/storage_manager/record_store/mod.rs b/veilid-core/src/storage_manager/record_store/mod.rs index 50a30e43..e05892f8 100644 --- a/veilid-core/src/storage_manager/record_store/mod.rs +++ b/veilid-core/src/storage_manager/record_store/mod.rs @@ -50,7 +50,7 @@ pub(super) struct RecordStore where D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>, { - table_store: TableStore, + registry: VeilidComponentRegistry, name: String, limits: RecordStoreLimits, @@ -129,7 +129,7 @@ impl RecordStore where D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>, { - pub fn new(table_store: TableStore, name: &str, limits: RecordStoreLimits) -> Self { + pub fn new(registry: VeilidComponentRegistry, name: &str, limits: RecordStoreLimits) -> Self { let subkey_cache_size = limits.subkey_cache_size; let limit_subkey_cache_total_size = limits .max_subkey_cache_memory_mb @@ -139,7 +139,7 @@ where .map(|mb| mb as u64 * 1_048_576u64); Self { - table_store, + registry, name: name.to_owned(), limits, record_table: None, @@ -165,7 +165,7 @@ where } } - pub async fn init(&mut self) -> EyreResult<()> { + pub async fn setup(&mut self) -> EyreResult<()> { let record_table = self .table_store .open(&format!("{}_records", self.name), 1) diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs deleted file mode 100644 index 014d521b..00000000 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ /dev/null @@ -1,776 +0,0 @@ -use super::*; - -const STORAGE_MANAGER_METADATA: &str = "storage_manager_metadata"; -const OFFLINE_SUBKEY_WRITES: &[u8] = b"offline_subkey_writes"; - -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub(super) struct OfflineSubkeyWrite { - pub safety_selection: SafetySelection, - pub subkeys: ValueSubkeyRangeSet, - #[serde(default)] - pub subkeys_in_flight: ValueSubkeyRangeSet, -} - -/// Locked structure for storage manager -pub(super) struct StorageManagerInner { - unlocked_inner: Arc, - /// If we are started up - pub initialized: bool, - /// Records that have been 'opened' and are not yet closed - pub opened_records: HashMap, - /// Records that have ever been 'created' or 'opened' by this node, things we care about that we must republish to keep alive - pub local_record_store: Option>, - /// Records that have been pushed to this node for distribution by other nodes, that we make an effort to republish - pub remote_record_store: Option>, - /// Record subkeys that have not been pushed to the network because they were written to offline - pub offline_subkey_writes: HashMap, - /// Storage manager metadata that is persistent, including copy of offline subkey writes - pub metadata_db: Option, - /// Background processing task (not part of attachment manager tick tree so it happens when detached too) - pub tick_future: Option>, - /// Update callback to send ValueChanged notification to - pub update_callback: Option, - /// Deferred result processor - pub deferred_result_processor: DeferredStreamProcessor, - - /// The maximum consensus count - set_consensus_count: usize, -} - -impl fmt::Debug for StorageManagerInner { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("StorageManagerInner") - // .field("unlocked_inner", &self.unlocked_inner) - .field("initialized", &self.initialized) - .field("opened_records", &self.opened_records) - .field("local_record_store", &self.local_record_store) - .field("remote_record_store", &self.remote_record_store) - .field("offline_subkey_writes", &self.offline_subkey_writes) - //.field("metadata_db", &self.metadata_db) - //.field("tick_future", &self.tick_future) - //.field("update_callback", &self.update_callback) - .field("deferred_result_processor", &self.deferred_result_processor) - .field("set_consensus_count", &self.set_consensus_count) - .finish() - } -} - -fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { - let c = config.get(); - RecordStoreLimits { - subkey_cache_size: c.network.dht.local_subkey_cache_size as usize, - max_subkey_size: MAX_SUBKEY_SIZE, - max_record_total_size: MAX_RECORD_DATA_SIZE, - max_records: None, - max_subkey_cache_memory_mb: Some(c.network.dht.local_max_subkey_cache_memory_mb as usize), - max_storage_space_mb: None, - public_watch_limit: c.network.dht.public_watch_limit as usize, - member_watch_limit: c.network.dht.member_watch_limit as usize, - max_watch_expiration: TimestampDuration::new(ms_to_us( - c.network.dht.max_watch_expiration_ms, - )), - min_watch_expiration: TimestampDuration::new(ms_to_us(c.network.rpc.timeout_ms)), - } -} - -fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { - let c = config.get(); - RecordStoreLimits { - subkey_cache_size: c.network.dht.remote_subkey_cache_size as usize, - max_subkey_size: MAX_SUBKEY_SIZE, - max_record_total_size: MAX_RECORD_DATA_SIZE, - max_records: Some(c.network.dht.remote_max_records as usize), - max_subkey_cache_memory_mb: Some(c.network.dht.remote_max_subkey_cache_memory_mb as usize), - max_storage_space_mb: Some(c.network.dht.remote_max_storage_space_mb as usize), - public_watch_limit: c.network.dht.public_watch_limit as usize, - member_watch_limit: c.network.dht.member_watch_limit as usize, - max_watch_expiration: TimestampDuration::new(ms_to_us( - c.network.dht.max_watch_expiration_ms, - )), - min_watch_expiration: TimestampDuration::new(ms_to_us(c.network.rpc.timeout_ms)), - } -} - -impl StorageManagerInner { - pub fn new(unlocked_inner: Arc) -> Self { - let set_consensus_count = unlocked_inner.config.get().network.dht.set_value_count as usize; - Self { - unlocked_inner, - initialized: false, - opened_records: Default::default(), - local_record_store: Default::default(), - remote_record_store: Default::default(), - offline_subkey_writes: Default::default(), - metadata_db: Default::default(), - opt_rpc_processor: Default::default(), - //opt_routing_table: Default::default(), - tick_future: Default::default(), - update_callback: None, - deferred_result_processor: DeferredStreamProcessor::default(), - set_consensus_count, - } - } - - pub async fn init( - &mut self, - outer_self: StorageManager, - update_callback: UpdateCallback, - ) -> EyreResult<()> { - let metadata_db = self - .unlocked_inner - .table_store - .open(STORAGE_MANAGER_METADATA, 1) - .await?; - - let local_limits = local_limits_from_config(self.unlocked_inner.config.clone()); - let remote_limits = remote_limits_from_config(self.unlocked_inner.config.clone()); - - let mut local_record_store = RecordStore::new( - self.unlocked_inner.table_store.clone(), - "local", - local_limits, - ); - local_record_store.init().await?; - - let mut remote_record_store = RecordStore::new( - self.unlocked_inner.table_store.clone(), - "remote", - remote_limits, - ); - remote_record_store.init().await?; - - self.metadata_db = Some(metadata_db); - self.local_record_store = Some(local_record_store); - self.remote_record_store = Some(remote_record_store); - - self.load_metadata().await?; - - // Start deferred results processors - self.deferred_result_processor.init().await; - - // Schedule tick - let tick_future = interval("storage manager tick", 1000, move || { - let this = outer_self.clone(); - async move { - if let Err(e) = this.tick().await { - log_stor!(warn "storage manager tick failed: {}", e); - } - } - }); - self.tick_future = Some(tick_future); - self.update_callback = Some(update_callback); - self.initialized = true; - - Ok(()) - } - - pub async fn stop_ticker(&mut self) { - // Stop ticker - let tick_future = self.tick_future.take(); - if let Some(f) = tick_future { - f.await; - } - } - - pub async fn terminate(&mut self) { - self.update_callback = None; - - // Stop deferred result processor - self.deferred_result_processor.terminate().await; - - // Final flush on record stores - if let Some(mut local_record_store) = self.local_record_store.take() { - if let Err(e) = local_record_store.flush().await { - log_stor!(error "termination local record store tick failed: {}", e); - } - } - if let Some(mut remote_record_store) = self.remote_record_store.take() { - if let Err(e) = remote_record_store.flush().await { - log_stor!(error "termination remote record store tick failed: {}", e); - } - } - - // Save metadata - if self.metadata_db.is_some() { - if let Err(e) = self.save_metadata().await { - log_stor!(error "termination metadata save failed: {}", e); - } - self.metadata_db = None; - } - self.offline_subkey_writes.clear(); - - // Mark not initialized - self.initialized = false; - } - - async fn save_metadata(&mut self) -> EyreResult<()> { - if let Some(metadata_db) = &self.metadata_db { - let tx = metadata_db.transact(); - tx.store_json(0, OFFLINE_SUBKEY_WRITES, &self.offline_subkey_writes)?; - tx.commit().await.wrap_err("failed to commit")? - } - Ok(()) - } - - async fn load_metadata(&mut self) -> EyreResult<()> { - if let Some(metadata_db) = &self.metadata_db { - self.offline_subkey_writes = match metadata_db.load_json(0, OFFLINE_SUBKEY_WRITES).await - { - Ok(v) => v.unwrap_or_default(), - Err(_) => { - if let Err(e) = metadata_db.delete(0, OFFLINE_SUBKEY_WRITES).await { - log_stor!(debug "offline_subkey_writes format changed, clearing: {}", e); - } - Default::default() - } - } - } - Ok(()) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub async fn create_new_owned_local_record( - &mut self, - kind: CryptoKind, - schema: DHTSchema, - owner: Option, - safety_selection: SafetySelection, - ) -> VeilidAPIResult<(TypedKey, KeyPair)> { - // Get cryptosystem - let Some(vcrypto) = self.unlocked_inner.crypto.get(kind) else { - apibail_generic!("unsupported cryptosystem"); - }; - - // Get local record store - let Some(local_record_store) = self.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - - // Verify the dht schema does not contain the node id - { - let cfg = self.unlocked_inner.config.get(); - if let Some(node_id) = cfg.network.routing_table.node_id.get(kind) { - if schema.is_member(&node_id.value) { - apibail_invalid_argument!( - "node id can not be schema member", - "schema", - node_id.value - ); - } - } - } - - // Compile the dht schema - let schema_data = schema.compile(); - - // New values require a new owner key if not given - let owner = owner.unwrap_or_else(|| vcrypto.generate_keypair()); - - // Calculate dht key - let dht_key = Self::get_key(vcrypto.clone(), &owner.key, &schema_data); - - // Make a signed value descriptor for this dht value - let signed_value_descriptor = Arc::new(SignedValueDescriptor::make_signature( - owner.key, - schema_data, - vcrypto.clone(), - owner.secret, - )?); - // Add new local value record - let cur_ts = Timestamp::now(); - let local_record_detail = LocalRecordDetail::new(safety_selection); - let record = - Record::::new(cur_ts, signed_value_descriptor, local_record_detail)?; - - local_record_store.new_record(dht_key, record).await?; - - Ok((dht_key, owner)) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - async fn move_remote_record_to_local( - &mut self, - key: TypedKey, - safety_selection: SafetySelection, - ) -> VeilidAPIResult> { - // Get local record store - let Some(local_record_store) = self.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - - // Get remote record store - let Some(remote_record_store) = self.remote_record_store.as_mut() else { - apibail_not_initialized!(); - }; - - let rcb = |r: &Record| { - // Return record details - r.clone() - }; - let Some(remote_record) = remote_record_store.with_record(key, rcb) else { - // No local or remote record found, return None - return Ok(None); - }; - - // Make local record - let cur_ts = Timestamp::now(); - let local_record = Record::new( - cur_ts, - remote_record.descriptor().clone(), - LocalRecordDetail::new(safety_selection), - )?; - local_record_store.new_record(key, local_record).await?; - - // Move copy subkey data from remote to local store - for subkey in remote_record.stored_subkeys().iter() { - let Some(get_result) = remote_record_store.get_subkey(key, subkey, false).await? else { - // Subkey was missing - warn!("Subkey was missing: {} #{}", key, subkey); - continue; - }; - let Some(subkey_data) = get_result.opt_value else { - // Subkey was missing - warn!("Subkey data was missing: {} #{}", key, subkey); - continue; - }; - local_record_store - .set_subkey(key, subkey, subkey_data, WatchUpdateMode::NoUpdate) - .await?; - } - - // Move watches - local_record_store.move_watches(key, remote_record_store.move_watches(key, None)); - - // Delete remote record from store - remote_record_store.delete_record(key).await?; - - // Return record information as transferred to local record - Ok(Some((*remote_record.owner(), remote_record.schema()))) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub async fn open_existing_record( - &mut self, - key: TypedKey, - writer: Option, - safety_selection: SafetySelection, - ) -> VeilidAPIResult> { - // Get local record store - let Some(local_record_store) = self.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - - // See if we have a local record already or not - let cb = |r: &mut Record| { - // Process local record - - // Keep the safety selection we opened the record with - r.detail_mut().safety_selection = safety_selection; - - // Return record details - (*r.owner(), r.schema()) - }; - let (owner, schema) = match local_record_store.with_record_mut(key, cb) { - Some(v) => v, - None => { - // If we don't have a local record yet, check to see if we have a remote record - // if so, migrate it to a local record - let Some(v) = self - .move_remote_record_to_local(key, safety_selection) - .await? - else { - // No remote record either - return Ok(None); - }; - v - } - }; - // Had local record - - // If the writer we chose is also the owner, we have the owner secret - // Otherwise this is just another subkey writer - let owner_secret = if let Some(writer) = writer { - if writer.key == owner { - Some(writer.secret) - } else { - None - } - } else { - None - }; - - // Write open record - self.opened_records - .entry(key) - .and_modify(|e| { - e.set_writer(writer); - e.set_safety_selection(safety_selection); - }) - .or_insert_with(|| OpenedRecord::new(writer, safety_selection)); - - // Make DHT Record Descriptor to return - let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema); - Ok(Some(descriptor)) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub async fn open_new_record( - &mut self, - key: TypedKey, - writer: Option, - subkey: ValueSubkey, - get_result: GetResult, - safety_selection: SafetySelection, - ) -> VeilidAPIResult { - // Ensure the record is closed - if self.opened_records.contains_key(&key) { - panic!("new record should never be opened at this point"); - } - - // Must have descriptor - let Some(signed_value_descriptor) = get_result.opt_descriptor else { - // No descriptor for new record, can't store this - apibail_generic!("no descriptor"); - }; - // Get owner - let owner = *signed_value_descriptor.owner(); - - // If the writer we chose is also the owner, we have the owner secret - // Otherwise this is just another subkey writer - let owner_secret = if let Some(writer) = writer { - if writer.key == owner { - Some(writer.secret) - } else { - None - } - } else { - None - }; - let schema = signed_value_descriptor.schema()?; - - // Get local record store - let Some(local_record_store) = self.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - - // Make and store a new record for this descriptor - let record = Record::::new( - Timestamp::now(), - signed_value_descriptor, - LocalRecordDetail::new(safety_selection), - )?; - local_record_store.new_record(key, record).await?; - - // If we got a subkey with the getvalue, it has already been validated against the schema, so store it - if let Some(signed_value_data) = get_result.opt_value { - // Write subkey to local store - local_record_store - .set_subkey(key, subkey, signed_value_data, WatchUpdateMode::NoUpdate) - .await?; - } - - // Write open record - self.opened_records - .insert(key, OpenedRecord::new(writer, safety_selection)); - - // Make DHT Record Descriptor to return - let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema); - Ok(descriptor) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub fn get_value_nodes(&self, key: TypedKey) -> VeilidAPIResult>> { - // Get local record store - let Some(local_record_store) = self.local_record_store.as_ref() else { - apibail_not_initialized!(); - }; - - // Get routing table to see if we still know about these nodes - let Some(routing_table) = self.opt_rpc_processor.as_ref().map(|r| r.routing_table()) else { - apibail_try_again!("offline, try again later"); - }; - - let opt_value_nodes = local_record_store.peek_record(key, |r| { - let d = r.detail(); - d.nodes - .keys() - .copied() - .filter_map(|x| { - routing_table - .lookup_node_ref(TypedKey::new(key.kind, x)) - .ok() - .flatten() - }) - .collect() - }); - - Ok(opt_value_nodes) - } - - #[instrument(level = "trace", target = "stor", skip_all)] - pub(super) fn process_fanout_results< - 'a, - I: IntoIterator, - >( - &mut self, - key: TypedKey, - subkey_results_iter: I, - is_set: bool, - ) { - // Get local record store - let local_record_store = self.local_record_store.as_mut().unwrap(); - - let cur_ts = Timestamp::now(); - local_record_store.with_record_mut(key, |r| { - let d = r.detail_mut(); - - for (subkey, fanout_result) in subkey_results_iter { - for node_id in fanout_result - .value_nodes - .iter() - .filter_map(|x| x.node_ids().get(key.kind).map(|k| k.value)) - { - let pnd = d.nodes.entry(node_id).or_default(); - if is_set || pnd.last_set == Timestamp::default() { - pnd.last_set = cur_ts; - } - pnd.last_seen = cur_ts; - pnd.subkeys.insert(subkey); - } - } - - // Purge nodes down to the N most recently seen, where N is the consensus count for a set operation - let mut nodes_ts = d - .nodes - .iter() - .map(|kv| (*kv.0, kv.1.last_seen)) - .collect::>(); - nodes_ts.sort_by(|a, b| b.1.cmp(&a.1)); - - for dead_node_key in nodes_ts.iter().skip(self.set_consensus_count) { - d.nodes.remove(&dead_node_key.0); - } - }); - } - - pub fn close_record(&mut self, key: TypedKey) -> VeilidAPIResult> { - let Some(local_record_store) = self.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - if local_record_store.peek_record(key, |_| {}).is_none() { - return Err(VeilidAPIError::key_not_found(key)); - } - - Ok(self.opened_records.remove(&key)) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_get_local_value( - &mut self, - key: TypedKey, - subkey: ValueSubkey, - want_descriptor: bool, - ) -> VeilidAPIResult { - // See if it's in the local record store - let Some(local_record_store) = self.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - if let Some(get_result) = local_record_store - .get_subkey(key, subkey, want_descriptor) - .await? - { - return Ok(get_result); - } - - Ok(GetResult { - opt_value: None, - opt_descriptor: None, - }) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_set_local_value( - &mut self, - key: TypedKey, - subkey: ValueSubkey, - signed_value_data: Arc, - watch_update_mode: WatchUpdateMode, - ) -> VeilidAPIResult<()> { - // See if it's in the local record store - let Some(local_record_store) = self.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - - // Write subkey to local store - local_record_store - .set_subkey(key, subkey, signed_value_data, watch_update_mode) - .await?; - - Ok(()) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_inspect_local_value( - &mut self, - key: TypedKey, - subkeys: ValueSubkeyRangeSet, - want_descriptor: bool, - ) -> VeilidAPIResult { - // See if it's in the local record store - let Some(local_record_store) = self.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - if let Some(inspect_result) = local_record_store - .inspect_record(key, subkeys, want_descriptor) - .await? - { - return Ok(inspect_result); - } - - Ok(InspectResult { - subkeys: ValueSubkeyRangeSet::new(), - seqs: vec![], - opt_descriptor: None, - }) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_get_remote_value( - &mut self, - key: TypedKey, - subkey: ValueSubkey, - want_descriptor: bool, - ) -> VeilidAPIResult { - // See if it's in the remote record store - let Some(remote_record_store) = self.remote_record_store.as_mut() else { - apibail_not_initialized!(); - }; - if let Some(get_result) = remote_record_store - .get_subkey(key, subkey, want_descriptor) - .await? - { - return Ok(get_result); - } - - Ok(GetResult { - opt_value: None, - opt_descriptor: None, - }) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_set_remote_value( - &mut self, - key: TypedKey, - subkey: ValueSubkey, - signed_value_data: Arc, - signed_value_descriptor: Arc, - watch_update_mode: WatchUpdateMode, - ) -> VeilidAPIResult<()> { - // See if it's in the remote record store - let Some(remote_record_store) = self.remote_record_store.as_mut() else { - apibail_not_initialized!(); - }; - - // See if we have a remote record already or not - if remote_record_store.with_record(key, |_| {}).is_none() { - // record didn't exist, make it - let cur_ts = Timestamp::now(); - let remote_record_detail = RemoteRecordDetail {}; - let record = Record::::new( - cur_ts, - signed_value_descriptor, - remote_record_detail, - )?; - remote_record_store.new_record(key, record).await? - }; - - // Write subkey to remote store - remote_record_store - .set_subkey(key, subkey, signed_value_data, watch_update_mode) - .await?; - - Ok(()) - } - - #[instrument(level = "trace", target = "stor", skip_all, err)] - pub(super) async fn handle_inspect_remote_value( - &mut self, - key: TypedKey, - subkeys: ValueSubkeyRangeSet, - want_descriptor: bool, - ) -> VeilidAPIResult { - // See if it's in the local record store - let Some(remote_record_store) = self.remote_record_store.as_mut() else { - apibail_not_initialized!(); - }; - if let Some(inspect_result) = remote_record_store - .inspect_record(key, subkeys, want_descriptor) - .await? - { - return Ok(inspect_result); - } - - Ok(InspectResult { - subkeys: ValueSubkeyRangeSet::new(), - seqs: vec![], - opt_descriptor: None, - }) - } - - pub async fn get_record_key( - &self, - kind: CryptoKind, - owner_key: &PublicKey, - schema: DHTSchema, - ) -> VeilidAPIResult { - // Get cryptosystem - let Some(vcrypto) = self.unlocked_inner.crypto.get(kind) else { - apibail_generic!("unsupported cryptosystem"); - }; - - Ok(Self::get_key(vcrypto, owner_key, &schema.compile())) - } - - fn get_key( - vcrypto: CryptoSystemVersion, - owner_key: &PublicKey, - schema_data: &[u8], - ) -> TypedKey { - let mut hash_data = Vec::::with_capacity(PUBLIC_KEY_LENGTH + 4 + schema_data.len()); - hash_data.extend_from_slice(&vcrypto.kind().0); - hash_data.extend_from_slice(&owner_key.bytes); - hash_data.extend_from_slice(schema_data); - let hash = vcrypto.generate_hash(&hash_data); - TypedKey::new(vcrypto.kind(), hash) - } - - #[instrument(level = "trace", target = "stor", skip_all)] - pub(super) fn add_offline_subkey_write( - &mut self, - key: TypedKey, - subkey: ValueSubkey, - safety_selection: SafetySelection, - ) { - self.offline_subkey_writes - .entry(key) - .and_modify(|x| { - x.subkeys.insert(subkey); - }) - .or_insert(OfflineSubkeyWrite { - safety_selection, - subkeys: ValueSubkeyRangeSet::single(subkey), - subkeys_in_flight: ValueSubkeyRangeSet::new(), - }); - } - - #[instrument(level = "trace", target = "stor", skip_all)] - pub fn process_deferred_results( - &mut self, - receiver: flume::Receiver, - handler: impl FnMut(T) -> SendPinBoxFuture + Send + 'static, - ) -> bool { - self.deferred_result_processor - .add(receiver.into_stream(), handler) - } -} diff --git a/veilid-core/src/storage_manager/tasks/mod.rs b/veilid-core/src/storage_manager/tasks/mod.rs index d946ff16..a0d088eb 100644 --- a/veilid-core/src/storage_manager/tasks/mod.rs +++ b/veilid-core/src/storage_manager/tasks/mod.rs @@ -12,100 +12,84 @@ impl StorageManager { log_stor!(debug "starting flush record stores task"); { let this = self.clone(); - self.unlocked_inner - .flush_record_stores_task - .set_routine(move |s, l, t| { - Box::pin(this.clone().flush_record_stores_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) - }); + self.flush_record_stores_task.set_routine(move |s, l, t| { + Box::pin(this.clone().flush_record_stores_task_routine( + s, + Timestamp::new(l), + Timestamp::new(t), + )) + }); } // Set offline subkey writes tick task log_stor!(debug "starting offline subkey writes task"); { let this = self.clone(); - self.unlocked_inner - .offline_subkey_writes_task - .set_routine(move |s, l, t| { - Box::pin(this.clone().offline_subkey_writes_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) - }); + self.offline_subkey_writes_task.set_routine(move |s, l, t| { + Box::pin(this.clone().offline_subkey_writes_task_routine( + s, + Timestamp::new(l), + Timestamp::new(t), + )) + }); } // Set send value changes tick task log_stor!(debug "starting send value changes task"); { let this = self.clone(); - self.unlocked_inner - .send_value_changes_task - .set_routine(move |s, l, t| { - Box::pin(this.clone().send_value_changes_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) - }); + self.send_value_changes_task.set_routine(move |s, l, t| { + Box::pin(this.clone().send_value_changes_task_routine( + s, + Timestamp::new(l), + Timestamp::new(t), + )) + }); } // Set check active watches tick task log_stor!(debug "starting check active watches task"); { let this = self.clone(); - self.unlocked_inner - .check_active_watches_task - .set_routine(move |s, l, t| { - Box::pin(this.clone().check_active_watches_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) - }); + self.check_active_watches_task.set_routine(move |s, l, t| { + Box::pin(this.clone().check_active_watches_task_routine( + s, + Timestamp::new(l), + Timestamp::new(t), + )) + }); } // Set check watched records tick task log_stor!(debug "starting checked watched records task"); { let this = self.clone(); - self.unlocked_inner - .check_watched_records_task - .set_routine(move |s, l, t| { - Box::pin(this.clone().check_watched_records_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) - }); + self.check_watched_records_task.set_routine(move |s, l, t| { + Box::pin(this.clone().check_watched_records_task_routine( + s, + Timestamp::new(l), + Timestamp::new(t), + )) + }); } } #[instrument(parent = None, level = "trace", target = "stor", name = "StorageManager::tick", skip_all, err)] pub async fn tick(&self) -> EyreResult<()> { // Run the flush stores task - self.unlocked_inner.flush_record_stores_task.tick().await?; + self.flush_record_stores_task.tick().await?; // Check active watches - self.unlocked_inner.check_active_watches_task.tick().await?; + self.check_active_watches_task.tick().await?; // Check watched records - self.unlocked_inner - .check_watched_records_task - .tick() - .await?; + self.check_watched_records_task.tick().await?; // Run online-only tasks - if self.online_writes_ready().await?.is_some() { + if self.online_writes_ready() { // Run offline subkey writes task if there's work to be done - if self.has_offline_subkey_writes().await? { - self.unlocked_inner - .offline_subkey_writes_task - .tick() - .await?; + if self.has_offline_subkey_writes().await { + self.offline_subkey_writes_task.tick().await?; } // Send value changed notifications - self.unlocked_inner.send_value_changes_task.tick().await?; + self.send_value_changes_task.tick().await?; } Ok(()) } @@ -113,23 +97,23 @@ impl StorageManager { #[instrument(level = "trace", target = "stor", skip_all)] pub(super) async fn cancel_tasks(&self) { log_stor!(debug "stopping check watched records task"); - if let Err(e) = self.unlocked_inner.check_watched_records_task.stop().await { + if let Err(e) = self.check_watched_records_task.stop().await { warn!("check_watched_records_task not stopped: {}", e); } log_stor!(debug "stopping check active watches task"); - if let Err(e) = self.unlocked_inner.check_active_watches_task.stop().await { + if let Err(e) = self.check_active_watches_task.stop().await { warn!("check_active_watches_task not stopped: {}", e); } log_stor!(debug "stopping send value changes task"); - if let Err(e) = self.unlocked_inner.send_value_changes_task.stop().await { + if let Err(e) = self.send_value_changes_task.stop().await { warn!("send_value_changes_task not stopped: {}", e); } log_stor!(debug "stopping flush record stores task"); - if let Err(e) = self.unlocked_inner.flush_record_stores_task.stop().await { + if let Err(e) = self.flush_record_stores_task.stop().await { warn!("flush_record_stores_task not stopped: {}", e); } log_stor!(debug "stopping offline subkey writes task"); - if let Err(e) = self.unlocked_inner.offline_subkey_writes_task.stop().await { + if let Err(e) = self.offline_subkey_writes_task.stop().await { warn!("offline_subkey_writes_task not stopped: {}", e); } } diff --git a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs index 680bce8e..a7c66d09 100644 --- a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs +++ b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs @@ -2,6 +2,14 @@ use super::*; use futures_util::*; use stop_token::future::FutureExt as _; +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct OfflineSubkeyWrite { + pub safety_selection: SafetySelection, + pub subkeys: ValueSubkeyRangeSet, + #[serde(default)] + pub subkeys_in_flight: ValueSubkeyRangeSet, +} + #[derive(Debug)] enum OfflineSubkeyWriteResult { Finished(set_value::OutboundSetValueResult), diff --git a/veilid-core/src/storage_manager/types/signed_value_data.rs b/veilid-core/src/storage_manager/types/signed_value_data.rs index e89354f0..3cfcc989 100644 --- a/veilid-core/src/storage_manager/types/signed_value_data.rs +++ b/veilid-core/src/storage_manager/types/signed_value_data.rs @@ -19,7 +19,7 @@ impl SignedValueData { &self, owner: &PublicKey, subkey: ValueSubkey, - vcrypto: CryptoSystemVersion, + vcrypto: &CryptoSystemGuard<'_>, ) -> VeilidAPIResult { let node_info_bytes = Self::make_signature_bytes(&self.value_data, owner, subkey)?; // validate signature @@ -30,7 +30,7 @@ impl SignedValueData { value_data: ValueData, owner: &PublicKey, subkey: ValueSubkey, - vcrypto: CryptoSystemVersion, + vcrypto: &CryptoSystemGuard<'_>, writer_secret: SecretKey, ) -> VeilidAPIResult { let node_info_bytes = Self::make_signature_bytes(&value_data, owner, subkey)?; diff --git a/veilid-core/src/storage_manager/types/signed_value_descriptor.rs b/veilid-core/src/storage_manager/types/signed_value_descriptor.rs index 10832c38..48bf2a6a 100644 --- a/veilid-core/src/storage_manager/types/signed_value_descriptor.rs +++ b/veilid-core/src/storage_manager/types/signed_value_descriptor.rs @@ -17,7 +17,7 @@ impl SignedValueDescriptor { } } - pub fn validate(&self, vcrypto: CryptoSystemVersion) -> VeilidAPIResult<()> { + pub fn validate(&self, vcrypto: &CryptoSystemGuard<'_>) -> VeilidAPIResult<()> { // validate signature if !vcrypto.verify(&self.owner, &self.schema_data, &self.signature)? { apibail_parse_error!( @@ -49,7 +49,7 @@ impl SignedValueDescriptor { pub fn make_signature( owner: PublicKey, schema_data: Vec, - vcrypto: CryptoSystemVersion, + vcrypto: &CryptoSystemGuard<'_>, owner_secret: SecretKey, ) -> VeilidAPIResult { // create signature diff --git a/veilid-core/src/table_store/mod.rs b/veilid-core/src/table_store/mod.rs index e9bca03d..8490150d 100644 --- a/veilid-core/src/table_store/mod.rs +++ b/veilid-core/src/table_store/mod.rs @@ -267,7 +267,8 @@ impl TableStore { // Get cryptosystem let kind = FourCC::try_from(&dek_bytes[0..4]).unwrap(); - let Some(vcrypto) = self.crypto().get(kind) else { + let crypto = self.crypto(); + let Some(vcrypto) = crypto.get(kind) else { bail!("unsupported cryptosystem '{kind}'"); }; @@ -322,7 +323,8 @@ impl TableStore { } // Get cryptosystem - let Some(vcrypto) = self.crypto().get(dek.kind) else { + let crypto = self.crypto(); + let Some(vcrypto) = crypto.get(dek.kind) else { bail!("unsupported cryptosystem '{}'", dek.kind); };