diff --git a/veilid-core/src/storage_manager/do_get_value.rs b/veilid-core/src/storage_manager/do_get_value.rs index 4671fe47..1e1f0138 100644 --- a/veilid-core/src/storage_manager/do_get_value.rs +++ b/veilid-core/src/storage_manager/do_get_value.rs @@ -20,9 +20,8 @@ impl StorageManager { rpc_processor: RPCProcessor, key: TypedKey, subkey: ValueSubkey, - last_value: Option, - last_descriptor: Option, safety_selection: SafetySelection, + last_subkey_result: SubkeyResult, ) -> Result { let routing_table = rpc_processor.routing_table(); @@ -38,15 +37,15 @@ impl StorageManager { }; // Make do-get-value answer context - let schema = if let Some(d) = &last_descriptor { + let schema = if let Some(d) = &last_subkey_result.descriptor { Some(d.schema()?) } else { None }; let context = Arc::new(Mutex::new(DoGetValueContext { - value: last_value, + value: last_subkey_result.value, value_count: 0, - descriptor: last_descriptor.clone(), + descriptor: last_subkey_result.descriptor.clone(), schema, })); @@ -54,7 +53,7 @@ impl StorageManager { let call_routine = |next_node: NodeRef| { let rpc_processor = rpc_processor.clone(); let context = context.clone(); - let last_descriptor = last_descriptor.clone(); + let last_descriptor = last_subkey_result.descriptor.clone(); async move { let vres = rpc_processor .clone() diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 9c87d84d..c390ad80 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -57,8 +57,8 @@ impl StorageManager { flush_record_stores_task: TickTask::new(FLUSH_RECORD_STORES_INTERVAL_SECS), } } - fn new_inner() -> StorageManagerInner { - StorageManagerInner::default() + fn new_inner(unlocked_inner: Arc) -> StorageManagerInner { + StorageManagerInner::new(unlocked_inner) } fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { @@ -96,15 +96,16 @@ impl StorageManager { table_store: TableStore, block_store: BlockStore, ) -> StorageManager { + let unlocked_inner = Arc::new(Self::new_unlocked_inner( + config, + crypto, + protected_store, + table_store, + block_store, + )); let this = StorageManager { - unlocked_inner: Arc::new(Self::new_unlocked_inner( - config, - crypto, - protected_store, - table_store, - block_store, - )), - inner: Arc::new(AsyncMutex::new(Self::new_inner())), + unlocked_inner: unlocked_inner.clone(), + inner: Arc::new(AsyncMutex::new(Self::new_inner(unlocked_inner))), }; this.setup_tasks(); @@ -169,7 +170,7 @@ impl StorageManager { self.cancel_tasks().await; // Release the storage manager - *inner = Self::new_inner(); + *inner = Self::new_inner(self.unlocked_inner.clone()); debug!("finished storage manager shutdown"); } @@ -179,22 +180,6 @@ impl StorageManager { inner.rpc_processor = opt_rpc_processor } - /// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ] - fn get_key(vcrypto: CryptoSystemVersion, record: &Record) -> TypedKey - where - D: Clone + RkyvArchive + RkyvSerialize, - for<'t> ::Archived: CheckBytes>, - ::Archived: RkyvDeserialize, - { - let compiled = record.descriptor().schema_data(); - let mut hash_data = Vec::::with_capacity(PUBLIC_KEY_LENGTH + 4 + compiled.len()); - hash_data.extend_from_slice(&vcrypto.kind().0); - hash_data.extend_from_slice(&record.owner().bytes); - hash_data.extend_from_slice(compiled); - let hash = vcrypto.generate_hash(&hash_data); - TypedKey::new(vcrypto.kind(), hash) - } - async fn lock(&self) -> Result, VeilidAPIError> { let inner = asyncmutex_lock_arc!(&self.inner); if !inner.initialized { @@ -203,6 +188,7 @@ impl StorageManager { Ok(inner) } + /// Create a local record from scratch with a new owner key, open it, and return the opened descriptor pub async fn create_record( &self, kind: CryptoKind, @@ -211,59 +197,32 @@ impl StorageManager { ) -> Result { let mut inner = self.lock().await?; - // Get cryptosystem - let Some(vcrypto) = self.unlocked_inner.crypto.get(kind) else { - apibail_generic!("unsupported cryptosystem"); - }; + // Create a new owned local record from scratch + let (key, owner) = inner + .create_new_owned_local_record(kind, schema, safety_selection) + .await?; - // Get local record store - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - - // Compile the dht schema - let schema_data = schema.compile(); - - // New values require a new owner key - let owner = vcrypto.generate_keypair(); - - // Make a signed value descriptor for this dht value - let signed_value_descriptor = SignedValueDescriptor::make_signature( - owner.key, - schema_data, - vcrypto.clone(), - owner.secret, - )?; - - // Add new local value record - let cur_ts = get_aligned_timestamp(); - let local_record_detail = LocalRecordDetail { safety_selection }; - let record = - Record::::new(cur_ts, signed_value_descriptor, local_record_detail)?; - - let dht_key = Self::get_key(vcrypto.clone(), &record); - local_record_store.new_record(dht_key, record).await?; - - // Open the record - self.open_record_common(inner, dht_key, Some(owner), safety_selection) - .await + // Now that the record is made we should always succeed to open the existing record + // The initial writer is the owner of the record + inner + .open_existing_record(key, Some(owner), safety_selection) + .map(|r| r.unwrap()) } - async fn open_record_common( + /// Open an existing local record if it exists, + /// and if it doesnt exist locally, try to pull it from the network and + /// open it and return the opened descriptor + pub async fn open_record( &self, - mut inner: AsyncMutexGuardArc, key: TypedKey, writer: Option, safety_selection: SafetySelection, ) -> Result { - // Ensure the record is closed - if inner.opened_records.contains_key(&key) { - apibail_generic!("record is already open and should be closed first"); - } + let mut inner = self.lock().await?; // See if we have a local record already or not - if let Some(res) = inner.open_record_check_existing(key, writer, safety_selection) { - return res; + if let Some(res) = inner.open_existing_record(key, writer, safety_selection)? { + return Ok(res); } // No record yet, try to get it from the network @@ -280,86 +239,38 @@ impl StorageManager { // No last descriptor, no last value // Use the safety selection we opened the record with let subkey: ValueSubkey = 0; - let result = self - .do_get_value(rpc_processor, key, subkey, None, None, safety_selection) + let subkey_result = self + .do_get_value( + rpc_processor, + key, + subkey, + safety_selection, + SubkeyResult::default(), + ) .await?; // If we got nothing back, the key wasn't found - if result.value.is_none() && result.descriptor.is_none() { + if subkey_result.value.is_none() && subkey_result.descriptor.is_none() { // No result apibail_key_not_found!(key); }; - // Must have descriptor - let Some(signed_value_descriptor) = result.descriptor else { - // No descriptor for new record, can't store this - apibail_generic!("no descriptor"); - }; - - let owner = signed_value_descriptor.owner().clone(); - // If the writer we chose is also the owner, we have the owner secret - // Otherwise this is just another subkey writer - let owner_secret = if let Some(writer) = writer { - if writer.key == owner { - Some(writer.secret) - } else { - None - } - } else { - None - }; - let schema = signed_value_descriptor.schema()?; - // Reopen inner to store value we just got let mut inner = self.lock().await?; - // Get local record store - let Some(local_record_store) = inner.local_record_store.as_mut() else { - apibail_not_initialized!(); - }; - - // Make and store a new record for this descriptor - let record = Record::::new( - get_aligned_timestamp(), - signed_value_descriptor, - LocalRecordDetail { safety_selection }, - )?; - local_record_store.new_record(key, record).await?; - - // If we got a subkey with the getvalue, it has already been validated against the schema, so store it - if let Some(signed_value_data) = result.value { - // Write subkey to local store - local_record_store - .set_subkey(key, subkey, signed_value_data) - .await?; - } - - // Write open record + // Open the new record inner - .opened_records - .insert(key, OpenedRecord::new(writer, safety_selection)); - - // Make DHT Record Descriptor to return - let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema); - Ok(descriptor) - } - - pub async fn open_record( - &self, - key: TypedKey, - writer: Option, - safety_selection: SafetySelection, - ) -> Result { - let inner = self.lock().await?; - self.open_record_common(inner, key, writer, safety_selection) + .open_new_record(key, writer, subkey, subkey_result, safety_selection) .await } + /// Close an opened local record pub async fn close_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> { let mut inner = self.lock().await?; inner.close_record(key) } + /// Delete a local record pub async fn delete_record(&self, key: TypedKey) -> Result<(), VeilidAPIError> { let mut inner = self.lock().await?; @@ -376,6 +287,8 @@ impl StorageManager { local_record_store.delete_record(key).await } + /// Get the value of a subkey from an opened local record + /// may refresh the record, and will if it is forced to or the subkey is not available locally yet pub async fn get_value( &self, key: TypedKey, @@ -383,23 +296,23 @@ impl StorageManager { force_refresh: bool, ) -> Result, VeilidAPIError> { let mut inner = self.lock().await?; - - // Get rpc processor and drop mutex so we don't block while getting the value from the network let Some(opened_record) = inner.opened_records.remove(&key) else { apibail_generic!("record not open"); }; // See if the requested subkey is our local record store - let SubkeyResult { value, descriptor } = inner.handle_get_local_value(key, subkey, true)?; + let last_subkey_result = inner.handle_get_local_value(key, subkey, true)?; // Return the existing value if we have one unless we are forcing a refresh if !force_refresh { - if let Some(value) = value { - return Ok(Some(value.into_value_data())); + if let Some(last_subkey_result_value) = last_subkey_result.value { + return Ok(Some(last_subkey_result_value.into_value_data())); } } // Refresh if we can + + // Get rpc processor and drop mutex so we don't block while getting the value from the network let Some(rpc_processor) = inner.rpc_processor.clone() else { // Offline, try again later apibail_try_again!(); @@ -410,32 +323,34 @@ impl StorageManager { // May have last descriptor / value // Use the safety selection we opened the record with - let opt_last_seq = value.as_ref().map(|v| v.value_data().seq()); - let result = self + let opt_last_seq = last_subkey_result + .value + .as_ref() + .map(|v| v.value_data().seq()); + let subkey_result = self .do_get_value( rpc_processor, key, subkey, - value, - descriptor, opened_record.safety_selection(), + last_subkey_result, ) .await?; // See if we got a value back - let Some(result_value) = result.value else { + let Some(subkey_result_value) = subkey_result.value else { // If we got nothing back then we also had nothing beforehand, return nothing return Ok(None); }; // If we got a new value back then write it to the opened record - if Some(result_value.value_data().seq()) != opt_last_seq { + if Some(subkey_result_value.value_data().seq()) != opt_last_seq { let mut inner = self.lock().await?; inner - .handle_set_local_value(key, subkey, result_value.clone()) + .handle_set_local_value(key, subkey, subkey_result_value.clone()) .await?; } - Ok(Some(result_value.into_value_data())) + Ok(Some(subkey_result_value.into_value_data())) } pub async fn set_value( diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index 994e7b44..c6f8858a 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -31,6 +31,7 @@ where } /// The result of the do_get_value_operation +#[derive(Default, Debug)] pub struct SubkeyResult { /// The subkey value if we got one pub value: Option, diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs index d593d7a3..d756cbe8 100644 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -1,8 +1,8 @@ use super::*; /// Locked structure for storage manager -#[derive(Default)] pub(super) struct StorageManagerInner { + unlocked_inner: Arc, /// If we are started up pub initialized: bool, /// Records that have been 'opened' and are not yet closed @@ -18,15 +18,74 @@ pub(super) struct StorageManagerInner { } impl StorageManagerInner { - pub fn open_record_check_existing( + pub fn new(unlocked_inner: Arc) -> Self { + Self { + unlocked_inner, + initialized: false, + opened_records: Default::default(), + local_record_store: Default::default(), + remote_record_store: Default::default(), + rpc_processor: Default::default(), + tick_future: Default::default(), + } + } + + pub async fn create_new_owned_local_record( + &mut self, + kind: CryptoKind, + schema: DHTSchema, + safety_selection: SafetySelection, + ) -> Result<(TypedKey, KeyPair), VeilidAPIError> { + // Get cryptosystem + let Some(vcrypto) = self.unlocked_inner.crypto.get(kind) else { + apibail_generic!("unsupported cryptosystem"); + }; + + // Get local record store + let Some(local_record_store) = self.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + + // Compile the dht schema + let schema_data = schema.compile(); + + // New values require a new owner key + let owner = vcrypto.generate_keypair(); + + // Make a signed value descriptor for this dht value + let signed_value_descriptor = SignedValueDescriptor::make_signature( + owner.key, + schema_data, + vcrypto.clone(), + owner.secret, + )?; + + // Add new local value record + let cur_ts = get_aligned_timestamp(); + let local_record_detail = LocalRecordDetail { safety_selection }; + let record = + Record::::new(cur_ts, signed_value_descriptor, local_record_detail)?; + + let dht_key = Self::get_key(vcrypto.clone(), &record); + local_record_store.new_record(dht_key, record).await?; + + Ok((dht_key, owner)) + } + + pub fn open_existing_record( &mut self, key: TypedKey, writer: Option, safety_selection: SafetySelection, - ) -> Option> { + ) -> Result, VeilidAPIError> { + // Ensure the record is closed + if self.opened_records.contains_key(&key) { + apibail_generic!("record is already open and should be closed first"); + } + // Get local record store let Some(local_record_store) = self.local_record_store.as_mut() else { - return Some(Err(VeilidAPIError::not_initialized())); + apibail_not_initialized!(); }; // See if we have a local record already or not @@ -40,7 +99,7 @@ impl StorageManagerInner { (r.owner().clone(), r.schema()) }; let Some((owner, schema)) = local_record_store.with_record_mut(key, cb) else { - return None; + return Ok(None); }; // Had local record @@ -62,17 +121,43 @@ impl StorageManagerInner { // Make DHT Record Descriptor to return let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema); - Some(Ok(descriptor)) + Ok(Some(descriptor)) } - pub async fn new_local_record( + pub async fn open_new_record( &mut self, key: TypedKey, + writer: Option, subkey: ValueSubkey, - signed_value_descriptor: SignedValueDescriptor, - signed_value_data: Option, + subkey_result: SubkeyResult, safety_selection: SafetySelection, - ) -> Result<(), VeilidAPIError> { + ) -> Result { + // Ensure the record is closed + if self.opened_records.contains_key(&key) { + panic!("new record should never be opened at this point"); + } + + // Must have descriptor + let Some(signed_value_descriptor) = subkey_result.descriptor else { + // No descriptor for new record, can't store this + apibail_generic!("no descriptor"); + }; + // Get owner + let owner = signed_value_descriptor.owner().clone(); + + // If the writer we chose is also the owner, we have the owner secret + // Otherwise this is just another subkey writer + let owner_secret = if let Some(writer) = writer { + if writer.key == owner { + Some(writer.secret) + } else { + None + } + } else { + None + }; + let schema = signed_value_descriptor.schema()?; + // Get local record store let Some(local_record_store) = self.local_record_store.as_mut() else { apibail_not_initialized!(); @@ -87,17 +172,20 @@ impl StorageManagerInner { local_record_store.new_record(key, record).await?; // If we got a subkey with the getvalue, it has already been validated against the schema, so store it - if let Some(signed_value_data) = signed_value_data { + if let Some(signed_value_data) = subkey_result.value { // Write subkey to local store local_record_store .set_subkey(key, subkey, signed_value_data) .await?; } + // Write open record self.opened_records .insert(key, OpenedRecord::new(writer, safety_selection)); - Ok(()) + // Make DHT Record Descriptor to return + let descriptor = DHTRecordDescriptor::new(key, owner, owner_secret, schema); + Ok(descriptor) } pub fn close_record(&mut self, key: TypedKey) -> Result<(), VeilidAPIError> { @@ -184,4 +272,20 @@ impl StorageManagerInner { Ok(()) } + + /// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ] + fn get_key(vcrypto: CryptoSystemVersion, record: &Record) -> TypedKey + where + D: Clone + RkyvArchive + RkyvSerialize, + for<'t> ::Archived: CheckBytes>, + ::Archived: RkyvDeserialize, + { + let compiled = record.descriptor().schema_data(); + let mut hash_data = Vec::::with_capacity(PUBLIC_KEY_LENGTH + 4 + compiled.len()); + hash_data.extend_from_slice(&vcrypto.kind().0); + hash_data.extend_from_slice(&record.owner().bytes); + hash_data.extend_from_slice(compiled); + let hash = vcrypto.generate_hash(&hash_data); + TypedKey::new(vcrypto.kind(), hash) + } }