diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index a5b7e798..7d087436 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -18,6 +18,7 @@ use storage_manager_inner::*; pub use types::*; use super::*; +use network_manager::*; use routing_table::*; use rpc_processor::*; @@ -27,6 +28,8 @@ const MAX_SUBKEY_SIZE: usize = ValueData::MAX_LEN; const MAX_RECORD_DATA_SIZE: usize = 1_048_576; /// Frequency to flush record stores to disk const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1; +/// Frequency to check for offline subkeys writes to send to the network +const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 1; struct StorageManagerUnlockedInner { config: VeilidConfig, @@ -37,6 +40,7 @@ struct StorageManagerUnlockedInner { // Background processes flush_record_stores_task: TickTask, + offline_subkey_writes_task: TickTask, } #[derive(Clone)] @@ -59,6 +63,7 @@ impl StorageManager { #[cfg(feature = "unstable-blockstore")] block_store, flush_record_stores_task: TickTask::new(FLUSH_RECORD_STORES_INTERVAL_SECS), + offline_subkey_writes_task: TickTask::new(OFFLINE_SUBKEY_WRITES_INTERVAL_SECS), } } fn new_inner(unlocked_inner: Arc) -> StorageManagerInner { @@ -127,6 +132,32 @@ impl StorageManager { Ok(inner) } + async fn network_is_ready(&self) -> EyreResult { + if let Some(rpc_processor) = { + let inner = self.lock().await?; + inner.rpc_processor.clone() + } { + if let Some(network_class) = rpc_processor + .routing_table() + .get_network_class(RoutingDomain::PublicInternet) + { + // If our PublicInternet network class is valid we're ready to talk + Ok(network_class != NetworkClass::Invalid) + } else { + // If we haven't gotten a network class yet we shouldnt try to use the DHT + Ok(false) + } + } else { + // If we aren't attached, we won't have an rpc processor + Ok(false) + } + } + + async fn has_offline_subkey_writes(&self) -> EyreResult { + let inner = self.lock().await?; + Ok(inner.offline_subkey_writes.len() != 0) + } + /// Create a local record from scratch with a new owner key, open it, and return the opened descriptor pub async fn create_record( &self, @@ -391,7 +422,12 @@ impl StorageManager { .await?; // Add to offline writes to flush - inner.offline_subkey_writes.entry(key).and_modify(|x| { x.insert(subkey); } ).or_insert(ValueSubkeyRangeSet::single(subkey)); + inner.offline_subkey_writes.entry(key) + .and_modify(|x| { x.subkeys.insert(subkey); } ) + .or_insert(OfflineSubkeyWrite{ + safety_selection, + subkeys: ValueSubkeyRangeSet::single(subkey) + }); return Ok(None) }; diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs index 2d74c2b5..9a8815da 100644 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -3,6 +3,12 @@ use super::*; const STORAGE_MANAGER_METADATA: &str = "storage_manager_metadata"; const OFFLINE_SUBKEY_WRITES: &[u8] = b"offline_subkey_writes"; +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub(super) struct OfflineSubkeyWrite { + pub safety_selection: SafetySelection, + pub subkeys: ValueSubkeyRangeSet, +} + /// Locked structure for storage manager pub(super) struct StorageManagerInner { unlocked_inner: Arc, @@ -15,7 +21,7 @@ pub(super) struct StorageManagerInner { /// Records that have been pushed to this node for distribution by other nodes, that we make an effort to republish pub remote_record_store: Option>, /// Record subkeys that have not been pushed to the network because they were written to offline - pub offline_subkey_writes: HashMap, + pub offline_subkey_writes: HashMap, /// Storage manager metadata that is persistent, including copy of offline subkey writes pub metadata_db: Option, /// RPC processor if it is available diff --git a/veilid-core/src/storage_manager/tasks/mod.rs b/veilid-core/src/storage_manager/tasks/mod.rs index cd90a82e..d9f49d7b 100644 --- a/veilid-core/src/storage_manager/tasks/mod.rs +++ b/veilid-core/src/storage_manager/tasks/mod.rs @@ -1,10 +1,11 @@ pub mod flush_record_stores; +pub mod offline_subkey_writes; use super::*; impl StorageManager { pub(crate) fn setup_tasks(&self) { - // Set rolling transfers tick task + // Set flush records tick task debug!("starting flush record stores task"); { let this = self.clone(); @@ -25,12 +26,40 @@ impl StorageManager { ) }); } + // Set offline subkey writes tick task + debug!("starting offline subkey writes task"); + { + let this = self.clone(); + self.unlocked_inner + .offline_subkey_writes_task + .set_routine(move |s, l, t| { + Box::pin( + this.clone() + .offline_subkey_writes_task_routine( + s, + Timestamp::new(l), + Timestamp::new(t), + ) + .instrument(trace_span!( + parent: None, + "StorageManager offline subkey writes task routine" + )), + ) + }); + } } pub async fn tick(&self) -> EyreResult<()> { // Run the rolling transfers task self.unlocked_inner.flush_record_stores_task.tick().await?; + // Run offline subkey writes task if there's work to be done + if self.network_is_ready().await? && self.has_offline_subkey_writes().await? { + self.unlocked_inner + .offline_subkey_writes_task + .tick() + .await?; + } Ok(()) } @@ -39,5 +68,9 @@ impl StorageManager { if let Err(e) = self.unlocked_inner.flush_record_stores_task.stop().await { warn!("flush_record_stores_task not stopped: {}", e); } + debug!("stopping offline subkey writes task"); + if let Err(e) = self.unlocked_inner.offline_subkey_writes_task.stop().await { + warn!("offline_subkey_writes_task not stopped: {}", e); + } } } diff --git a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs new file mode 100644 index 00000000..81b1613a --- /dev/null +++ b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs @@ -0,0 +1,62 @@ +use super::*; +use futures_util::*; + +impl StorageManager { + // Best-effort write subkeys to the network that were written offline + #[instrument(level = "trace", skip(self), err)] + pub(crate) async fn offline_subkey_writes_task_routine( + self, + stop_token: StopToken, + _last_ts: Timestamp, + _cur_ts: Timestamp, + ) -> EyreResult<()> { + let (rpc_processor, offline_subkey_writes) = { + let inner = self.lock().await?; + + let Some(rpc_processor) = inner.rpc_processor.clone() else { + return Ok(()); + }; + + (rpc_processor, inner.offline_subkey_writes.clone()) + }; + + // make a safety selection that is conservative + for (key, osw) in offline_subkey_writes { + if poll!(stop_token.clone()).is_ready() { + break; + } + for subkey in osw.subkeys.iter() { + let subkey_result = { + let mut inner = self.lock().await?; + inner.handle_get_local_value(key, subkey, true).await + }; + let Ok(subkey_result) = subkey_result else { + continue; + }; + let Some(value) = subkey_result.value else { + continue; + }; + let Some(descriptor) = subkey_result.descriptor else { + continue; + }; + if let Err(e) = self + .outbound_set_value( + rpc_processor.clone(), + key, + subkey, + osw.safety_selection, + value, + descriptor, + ) + .await + { + log_stor!(debug "failed to write offline subkey: {}", e); + } + } + let mut inner = self.lock().await?; + inner.offline_subkey_writes.remove(&key); + } + + Ok(()) + } +}