[skip ci] refactor checkpoint

This commit is contained in:
Christien Rioux 2025-01-26 21:29:12 -05:00
parent bd111ac73b
commit d196c934cd
13 changed files with 334 additions and 236 deletions

View File

@ -16,6 +16,8 @@ pub trait VeilidComponent: AsAnyArcSendSync + core::fmt::Debug {
fn registry(&self) -> VeilidComponentRegistry;
fn init(&self) -> SendPinBoxFutureLifetime<'_, EyreResult<()>>;
fn post_init(&self) -> SendPinBoxFutureLifetime<'_, EyreResult<()>>;
fn pre_terminate(&self) -> SendPinBoxFutureLifetime<'_, ()>;
fn terminate(&self) -> SendPinBoxFutureLifetime<'_, ()>;
// Registry shortcuts
@ -116,6 +118,40 @@ impl VeilidComponentRegistry {
Ok(())
}
pub async fn post_init(&self) -> EyreResult<()> {
let Some(mut _init_guard) = asyncmutex_try_lock!(self.init_lock) else {
bail!("init should only happen one at a time");
};
if !*_init_guard {
bail!("not initialized");
}
let init_order = self.get_init_order();
let mut post_initialized = vec![];
for component in init_order {
if let Err(e) = component.post_init().await {
self.pre_terminate_inner(post_initialized).await;
return Err(e);
}
post_initialized.push(component)
}
Ok(())
}
pub async fn pre_terminate(&self) {
let Some(mut _init_guard) = asyncmutex_try_lock!(self.init_lock) else {
panic!("terminate should only happen one at a time");
};
if !*_init_guard {
panic!("not initialized");
}
let init_order = self.get_init_order();
self.pre_terminate_inner(init_order).await;
*_init_guard = false;
}
pub async fn terminate(&self) {
let Some(mut _init_guard) = asyncmutex_try_lock!(self.init_lock) else {
panic!("terminate should only happen one at a time");
@ -130,6 +166,14 @@ impl VeilidComponentRegistry {
*_init_guard = false;
}
async fn pre_terminate_inner(
&self,
pre_initialized: Vec<Arc<dyn VeilidComponent + Send + Sync>>,
) {
for component in pre_initialized.iter().rev() {
component.pre_terminate().await;
}
}
async fn terminate_inner(&self, initialized: Vec<Arc<dyn VeilidComponent + Send + Sync>>) {
for component in initialized.iter().rev() {
component.terminate().await;
@ -185,6 +229,14 @@ macro_rules! impl_veilid_component {
Box::pin(async { self.init_async().await })
}
fn post_init(&self) -> SendPinBoxFutureLifetime<'_, EyreResult<()>> {
Box::pin(async { self.post_init_async().await })
}
fn pre_terminate(&self) -> SendPinBoxFutureLifetime<'_, ()> {
Box::pin(async { self.pre_terminate_async().await })
}
fn terminate(&self) -> SendPinBoxFutureLifetime<'_, ()> {
Box::pin(async { self.terminate_async().await })
}

View File

@ -65,19 +65,27 @@ impl VeilidCoreContext {
// Register all components
registry.register(ProtectedStore::new);
// Initialize table store first, so crypto code can load caches
// Tablestore can use crypto during init, just not any cached operations or things
// that require flushing back to the tablestore
registry.register(TableStore::new);
registry.register(Crypto::new);
registry.register(TableStore::new);
#[cfg(feature = "unstable-blockstore")]
registry.register(BlockStore::new);
registry.register(StorageManager::new);
registry.register(AttachmentManager::new);
// Run initialization
// This should make the majority of subsystems functional
registry.init().await.map_err(VeilidAPIError::internal)?;
// Run post-initialization
// This should resolve any inter-subsystem dependencies
// required for background processes that utilize multiple subsystems
// Background processes also often require registry lookup of the
// current subsystem, which is not available until after init succeeds
if let Err(e) = registry.post_init().await {
registry.terminate().await;
return VeilidAPIError::internal(e);
}
info!("Veilid API startup complete");
Ok(Self { registry })
@ -97,6 +105,13 @@ impl VeilidCoreContext {
)
};
// Run pre-termination
// This should shut down background processes that may require the existence of
// other subsystems that may not exist during final termination
self.registry.pre_terminate().await;
// Run termination
// This should finish any shutdown operations for the subsystems
self.registry.terminate().await;
if let Err(e) = ApiTracingLayer::remove_callback(program_name, namespace).await {
@ -118,11 +133,14 @@ pub trait RegisteredComponents: VeilidComponent {
fn protected_store(&self) -> VeilidComponentGuard<'_, ProtectedStore> {
self.registry().lookup::<ProtectedStore>().unwrap()
}
fn crypto(&self) -> VeilidComponentGuard<'_, Crypto> {
self.registry().lookup::<Crypto>().unwrap()
}
fn table_store(&self) -> VeilidComponentGuard<'_, TableStore> {
self.registry().lookup::<TableStore>().unwrap()
}
fn crypto(&self) -> VeilidComponentGuard<'_, Crypto> {
self.registry().lookup::<Crypto>().unwrap()
fn storage_manager(&self) -> VeilidComponentGuard<'_, StorageManager> {
self.registry().lookup::<StorageManager>().unwrap()
}
}
impl<T: VeilidComponent> RegisteredComponents for T {}

View File

@ -98,7 +98,6 @@ impl fmt::Debug for CryptoInner {
}
/// Crypto factory implementation
#[derive(Debug)]
pub struct Crypto {
registry: VeilidComponentRegistry,
inner: Arc<Mutex<CryptoInner>>,
@ -110,6 +109,17 @@ pub struct Crypto {
impl_veilid_component!(Crypto);
impl fmt::Debug for Crypto {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Crypto")
//.field("registry", &self.registry)
.field("inner", &self.inner)
// .field("crypto_vld0", &self.crypto_vld0)
// .field("crypto_none", &self.crypto_none)
.finish()
}
}
impl Crypto {
fn new_inner() -> CryptoInner {
CryptoInner {
@ -131,8 +141,15 @@ impl Crypto {
#[instrument(level = "trace", target = "crypto", skip_all, err)]
async fn init_async(&self) -> EyreResult<()> {
// Nothing to initialize at this time
Ok(())
}
// Setup called by table store after it get initialized
#[instrument(level = "trace", target = "crypto", skip_all, err)]
pub(crate) async fn table_store_setup(&self, table_store: &TableStore) -> EyreResult<()> {
// Init node id from config
if let Err(e) = self.init_node_ids().await {
if let Err(e) = self.setup_node_ids(table_store).await {
return Err(e).wrap_err("init node id failed");
}
@ -147,8 +164,6 @@ impl Crypto {
});
// load caches if they are valid for this node id
let table_store = self.table_store();
let mut db = table_store
.open("crypto_caches", 1)
.await
@ -169,7 +184,11 @@ impl Crypto {
db.store(0, b"cache_validity_key", &cache_validity_key)
.await?;
}
Ok(())
}
#[instrument(level = "trace", target = "crypto", skip_all, err)]
async fn post_init_async(&self) -> EyreResult<()> {
// Schedule flushing
let registry = self.registry();
let flush_future = interval("crypto flush", 60000, move || {
@ -196,7 +215,7 @@ impl Crypto {
Ok(())
}
async fn terminate_async(&self) {
async fn pre_terminate_async(&self) {
let flush_future = self.inner.lock().flush_future.take();
if let Some(f) = flush_future {
f.await;
@ -212,18 +231,21 @@ impl Crypto {
};
}
async fn terminate_async(&self) {
// Nothing to terminate at this time
}
/// Factory method to get a specific crypto version
pub fn get(&self, kind: CryptoKind) -> Option<CryptoSystemGuard<'_>> {
let inner = self.inner.lock();
match kind {
#[cfg(feature = "enable-crypto-vld0")]
CRYPTO_KIND_VLD0 => Some(CryptoSystemGuard {
crypto_system: inner.crypto_vld0.clone().unwrap(),
crypto_system: self.crypto_vld0.clone(),
_phantom: PhantomData {},
}),
#[cfg(feature = "enable-crypto-none")]
CRYPTO_KIND_NONE => Some(CryptoSystemGuard {
crypto_system: inner.crypto_none.clone().unwrap(),
crypto_system: self.crypto_none.clone(),
_phantom: PhantomData {},
}),
_ => None,
@ -329,9 +351,10 @@ impl Crypto {
}
#[cfg(not(test))]
async fn init_node_id(
async fn setup_node_id(
&self,
vcrypto: CryptoSystemGuard<'_>,
table_store: &TableStore,
) -> VeilidAPIResult<(TypedKey, TypedSecret)> {
let config = self.config();
let ck = vcrypto.kind();
@ -343,37 +366,35 @@ impl Crypto {
});
// See if node id was previously stored in the table store
let table_store = self.table_store();
let config_table = table_store.open("__veilid_config", 1).await?;
let table_key_node_id = format!("node_id_{}", ck);
let table_key_node_id_secret = format!("node_id_secret_{}", ck);
if node_id.is_none() {
log_tstore!(debug "pulling {} from storage", table_key_node_id);
log_crypto!(debug "pulling {} from storage", table_key_node_id);
if let Ok(Some(stored_node_id)) = config_table
.load_json::<TypedKey>(0, table_key_node_id.as_bytes())
.await
{
log_tstore!(debug "{} found in storage", table_key_node_id);
log_crypto!(debug "{} found in storage", table_key_node_id);
node_id = Some(stored_node_id);
} else {
log_tstore!(debug "{} not found in storage", table_key_node_id);
log_crypto!(debug "{} not found in storage", table_key_node_id);
}
}
// See if node id secret was previously stored in the protected store
if node_id_secret.is_none() {
log_tstore!(debug "pulling {} from storage", table_key_node_id_secret);
log_crypto!(debug "pulling {} from storage", table_key_node_id_secret);
if let Ok(Some(stored_node_id_secret)) = config_table
.load_json::<TypedSecret>(0, table_key_node_id_secret.as_bytes())
.await
{
log_tstore!(debug "{} found in storage", table_key_node_id_secret);
log_crypto!(debug "{} found in storage", table_key_node_id_secret);
node_id_secret = Some(stored_node_id_secret);
} else {
log_tstore!(debug "{} not found in storage", table_key_node_id_secret);
log_crypto!(debug "{} not found in storage", table_key_node_id_secret);
}
}
@ -390,7 +411,7 @@ impl Crypto {
(node_id, node_id_secret)
} else {
// If we still don't have a valid node id, generate one
log_tstore!(debug "generating new node_id_{}", ck);
log_crypto!(debug "generating new node_id_{}", ck);
let kp = vcrypto.generate_keypair();
(TypedKey::new(ck, kp.key), TypedSecret::new(ck, kp.secret))
};
@ -408,9 +429,9 @@ impl Crypto {
}
/// Get the node id from config if one is specified.
/// Must be done -after- protected store startup.
/// Must be done -after- protected store is initialized, during table store init
#[cfg_attr(test, allow(unused_variables))]
pub(crate) async fn init_node_ids(&self) -> VeilidAPIResult<()> {
async fn setup_node_ids(&self, table_store: &TableStore) -> VeilidAPIResult<()> {
let mut out_node_id = TypedKeyGroup::new();
let mut out_node_id_secret = TypedSecretGroup::new();
@ -425,7 +446,7 @@ impl Crypto {
(TypedKey::new(ck, kp.key), TypedSecret::new(ck, kp.secret))
};
#[cfg(not(test))]
let (node_id, node_id_secret) = self.init_node_id(vcrypto).await?;
let (node_id, node_id_secret) = self.setup_node_id(vcrypto, table_store).await?;
// Save for config
out_node_id.add(node_id);

View File

@ -385,6 +385,14 @@ macro_rules! log_crypto {
(warn $fmt:literal, $($arg:expr),+) => {
warn!(target:"crypto", $fmt, $($arg),+);
};
(debug $text:expr) => { debug!(
target: "crypto",
"{}",
$text,
)};
(debug $fmt:literal, $($arg:expr),+) => {
debug!(target:"crypto", $fmt, $($arg),+);
};
($text:expr) => {trace!(
target: "crypto",
"{}",

View File

@ -542,9 +542,17 @@ impl RoutingTable {
self.inner.read().routing_domain_for_address(address)
}
pub fn route_spec_store(&self) -> RouteSpecStore {
pub fn route_spec_store(&self) -> RwLockReadGuard<'_, RouteSpecStore> {
self.inner.read().route_spec_store.as_ref().unwrap().clone()
}
pub fn route_spec_store_mut(&self) -> RwLockReadGuard<'_, RouteSpecStore> {
self.inner
.write()
.route_spec_store
.as_ref()
.unwrap()
.clone()
}
pub fn relay_node(&self, domain: RoutingDomain) -> Option<FilteredNodeRef> {
self.inner.read().relay_node(domain)

View File

@ -219,11 +219,10 @@ impl StorageManager {
let local_limits = Self::local_limits_from_config(config.clone());
let remote_limits = Self::remote_limits_from_config(config.clone());
let mut local_record_store = RecordStore::new(self.registry(), "local", local_limits);
local_record_store.setup().await?;
let mut remote_record_store = RecordStore::new(self.registry(), "remote", remote_limits);
remote_record_store.setup().await?;
let local_record_store =
RecordStore::try_create(&table_store, "local", local_limits).await?;
let remote_record_store =
RecordStore::try_create(&table_store, "remote", remote_limits).await?;
let mut inner = self.inner.lock().await;
inner.metadata_db = Some(metadata_db);
@ -234,6 +233,13 @@ impl StorageManager {
// Start deferred results processors
inner.deferred_result_processor.init().await;
Ok(())
}
#[instrument(level = "trace", target = "tstore", skip_all)]
async fn post_init_async(&self) -> EyreResult<()> {
let mut inner = self.inner.lock().await;
// Schedule tick
let tick_future = interval("storage manager tick", 1000, move || {
let registry = self.registry();
@ -249,10 +255,8 @@ impl StorageManager {
Ok(())
}
#[instrument(level = "debug", skip_all)]
async fn terminate_async(&self) {
log_stor!(debug "starting storage manager shutdown");
#[instrument(level = "trace", target = "tstore", skip_all)]
async fn pre_terminate_async(&self) {
// Stop the background ticker process
{
let mut inner = self.inner.lock().await;
@ -263,8 +267,13 @@ impl StorageManager {
}
}
// Cancel all tasks
// Cancel all tasks associated with the tick future
self.cancel_tasks().await;
}
#[instrument(level = "debug", skip_all)]
async fn terminate_async(&self) {
log_stor!(debug "starting storage manager shutdown");
// Terminate and release the storage manager
{
@ -324,7 +333,7 @@ impl StorageManager {
Ok(())
}
fn get_ready_rpc_processor(&self) -> Option<VeilidComponentGuard<'_, RPCProcessor>> {
pub(super) fn get_ready_rpc_processor(&self) -> Option<VeilidComponentGuard<'_, RPCProcessor>> {
let Some(rpc_processor) = self.registry().lookup::<RPCProcessor>() else {
return None;
};
@ -340,11 +349,11 @@ impl StorageManager {
Some(rpc_processor)
}
async fn has_offline_subkey_writes(&self) -> bool {
pub(super) async fn has_offline_subkey_writes(&self) -> bool {
!self.inner.lock().await.offline_subkey_writes.is_empty()
}
fn online_writes_ready(&self) -> bool {
pub(super) fn online_writes_ready(&self) -> bool {
self.get_ready_rpc_processor().is_some()
}
@ -382,8 +391,9 @@ impl StorageManager {
// Validate schema
schema.validate()?;
let schema_data = schema.compile();
Ok(Self::get_key(&vcrypto, owner_key, schema))
Ok(Self::get_key(&vcrypto, owner_key, &schema_data))
}
/// Create a local record from scratch with a new owner key, open it, and return the opened descriptor
@ -1358,8 +1368,8 @@ impl StorageManager {
None => {
// If we don't have a local record yet, check to see if we have a remote record
// if so, migrate it to a local record
let Some(v) = self
.move_remote_record_to_local(key, safety_selection)
let Some(v) = inner
.move_remote_record_to_local_inner(key, safety_selection)
.await?
else {
// No remote record either

View File

@ -50,14 +50,13 @@ pub(super) struct RecordStore<D>
where
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
{
registry: VeilidComponentRegistry,
name: String,
limits: RecordStoreLimits,
/// The tabledb used for record data
record_table: Option<TableDB>,
record_table: TableDB,
/// The tabledb used for subkey data
subkey_table: Option<TableDB>,
subkey_table: TableDB,
/// The in-memory index that keeps track of what records are in the tabledb
record_index: LruCache<RecordTableKey, Record<D>>,
/// The in-memory cache of commonly accessed subkey data so we don't have to keep hitting the db
@ -86,7 +85,6 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RecordStore")
//.field("table_store", &self.table_store)
.field("name", &self.name)
.field("limits", &self.limits)
.field("record_table", &self.record_table)
@ -129,7 +127,11 @@ impl<D> RecordStore<D>
where
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
{
pub fn new(registry: VeilidComponentRegistry, name: &str, limits: RecordStoreLimits) -> Self {
pub async fn try_create(
table_store: &TableStore,
name: &str,
limits: RecordStoreLimits,
) -> EyreResult<Self> {
let subkey_cache_size = limits.subkey_cache_size;
let limit_subkey_cache_total_size = limits
.max_subkey_cache_memory_mb
@ -138,12 +140,14 @@ where
.max_storage_space_mb
.map(|mb| mb as u64 * 1_048_576u64);
Self {
registry,
let record_table = table_store.open(&format!("{}_records", name), 1).await?;
let subkey_table = table_store.open(&format!("{}_subkeys", name), 1).await?;
let mut out = Self {
name: name.to_owned(),
limits,
record_table: None,
subkey_table: None,
record_table,
subkey_table,
record_index: LruCache::new(limits.max_records.unwrap_or(usize::MAX)),
subkey_cache: LruCache::new(subkey_cache_size),
inspect_cache: InspectCache::new(subkey_cache_size),
@ -162,25 +166,20 @@ where
watched_records: HashMap::new(),
purge_dead_records_mutex: Arc::new(AsyncMutex::new(())),
changed_watched_values: HashSet::new(),
}
};
out.setup().await?;
Ok(out)
}
pub async fn setup(&mut self) -> EyreResult<()> {
let record_table = self
.table_store
.open(&format!("{}_records", self.name), 1)
.await?;
let subkey_table = self
.table_store
.open(&format!("{}_subkeys", self.name), 1)
.await?;
async fn setup(&mut self) -> EyreResult<()> {
// Pull record index from table into a vector to ensure we sort them
let record_table_keys = record_table.get_keys(0).await?;
let record_table_keys = self.record_table.get_keys(0).await?;
let mut record_index_saved: Vec<(RecordTableKey, Record<D>)> =
Vec::with_capacity(record_table_keys.len());
for rtk in record_table_keys {
if let Some(vr) = record_table.load_json::<Record<D>>(0, &rtk).await? {
if let Some(vr) = self.record_table.load_json::<Record<D>>(0, &rtk).await? {
let rik = RecordTableKey::try_from(rtk.as_ref())?;
record_index_saved.push((rik, vr));
}
@ -229,8 +228,6 @@ where
self.dead_records.push(dr);
}
self.record_table = Some(record_table);
self.subkey_table = Some(subkey_table);
Ok(())
}
@ -309,11 +306,8 @@ where
return;
}
let record_table = self.record_table.clone().unwrap();
let subkey_table = self.subkey_table.clone().unwrap();
let rt_xact = record_table.transact();
let st_xact = subkey_table.transact();
let rt_xact = self.record_table.transact();
let st_xact = self.subkey_table.transact();
let dead_records = mem::take(&mut self.dead_records);
for dr in dead_records {
// Record should already be gone from index
@ -375,9 +369,7 @@ where
return;
}
let record_table = self.record_table.clone().unwrap();
let rt_xact = record_table.transact();
let rt_xact = self.record_table.transact();
let changed_records = mem::take(&mut self.changed_records);
for rtk in changed_records {
// Get the changed record and save it to the table
@ -406,11 +398,6 @@ where
apibail_internal!("record already exists");
}
// Get record table
let Some(record_table) = self.record_table.clone() else {
apibail_internal!("record store not initialized");
};
// If over size limit, dont create record
self.total_storage_space
.add((mem::size_of::<RecordTableKey>() + record.total_size()) as u64)
@ -421,7 +408,7 @@ where
}
// Save to record table
record_table
self.record_table
.store_json(0, &rtk.bytes(), &record)
.await
.map_err(VeilidAPIError::internal)?;
@ -577,11 +564,6 @@ where
}));
}
// Get subkey table
let Some(subkey_table) = self.subkey_table.clone() else {
apibail_internal!("record store not initialized");
};
// If subkey exists in subkey cache, use that
let stk = SubkeyTableKey { key, subkey };
if let Some(record_data) = self.subkey_cache.get(&stk) {
@ -593,7 +575,8 @@ where
}));
}
// If not in cache, try to pull from table store if it is in our stored subkey set
let Some(record_data) = subkey_table
let Some(record_data) = self
.subkey_table
.load_json::<RecordData>(0, &stk.bytes())
.await
.map_err(VeilidAPIError::internal)?
@ -649,11 +632,6 @@ where
}));
}
// Get subkey table
let Some(subkey_table) = self.subkey_table.clone() else {
apibail_internal!("record store not initialized");
};
// If subkey exists in subkey cache, use that
let stk = SubkeyTableKey { key, subkey };
if let Some(record_data) = self.subkey_cache.peek(&stk) {
@ -665,7 +643,8 @@ where
}));
}
// If not in cache, try to pull from table store if it is in our stored subkey set
let Some(record_data) = subkey_table
let Some(record_data) = self
.subkey_table
.load_json::<RecordData>(0, &stk.bytes())
.await
.map_err(VeilidAPIError::internal)?
@ -749,11 +728,6 @@ where
apibail_invalid_argument!("subkey out of range", "subkey", subkey);
}
// Get subkey table
let Some(subkey_table) = self.subkey_table.clone() else {
apibail_internal!("record store not initialized");
};
// Get the previous subkey and ensure we aren't going over the record size limit
let mut prior_subkey_size = 0usize;
@ -765,7 +739,8 @@ where
prior_subkey_size = record_data.data_size();
} else {
// If not in cache, try to pull from table store
if let Some(record_data) = subkey_table
if let Some(record_data) = self
.subkey_table
.load_json::<RecordData>(0, &stk_bytes)
.await
.map_err(VeilidAPIError::internal)?
@ -796,7 +771,7 @@ where
}
// Write subkey
subkey_table
self.subkey_table
.store_json(0, &stk_bytes, &subkey_record_data)
.await
.map_err(VeilidAPIError::internal)?;
@ -835,11 +810,6 @@ where
subkeys: ValueSubkeyRangeSet,
want_descriptor: bool,
) -> VeilidAPIResult<Option<InspectResult>> {
// Get subkey table
let Some(subkey_table) = self.subkey_table.clone() else {
apibail_internal!("record store not initialized");
};
// Get record from index
let Some((subkeys, opt_descriptor)) = self.with_record(key, |record| {
// Get number of subkeys from schema and ensure we are getting the
@ -884,7 +854,8 @@ where
} else {
// If not in cache, try to pull from table store if it is in our stored subkey set
// XXX: This would be better if it didn't have to pull the whole record data to get the seq.
if let Some(record_data) = subkey_table
if let Some(record_data) = self
.subkey_table
.load_json::<RecordData>(0, &stk.bytes())
.await
.map_err(VeilidAPIError::internal)?

View File

@ -28,7 +28,7 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_set_value(
&self,
rpc_processor: RPCProcessor,
rpc_processor: &RPCProcessor,
key: TypedKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
@ -333,7 +333,7 @@ impl StorageManager {
// If more partial results show up, don't send an update until we're done
return true;
}
// If we processed the final result, possibly send an update
// If we processed the final result, possibly send an update
// if the sequence number changed since our first partial update
// Send with a max count as this is not attached to any watch
let changed = {

View File

@ -4,19 +4,18 @@ impl StorageManager {
// Check if client-side watches on opened records either have dead nodes or if the watch has expired
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn check_active_watches_task_routine(
self,
&self,
_stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
{
let mut inner = self.inner.lock().await;
let Some(routing_table) = inner.opt_routing_table.clone() else {
return Ok(());
};
let routing_table = self.routing_table();
let rss = routing_table.route_spec_store();
let opt_update_callback = inner.update_callback.clone();
let update_callback = self.update_callback();
let cur_ts = Timestamp::now();
for (k, v) in inner.opened_records.iter_mut() {
@ -50,15 +49,13 @@ impl StorageManager {
if is_dead {
v.clear_active_watch();
if let Some(update_callback) = opt_update_callback.clone() {
// Send valuechange with dead count and no subkeys
update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
key: *k,
subkeys: ValueSubkeyRangeSet::new(),
count: 0,
value: None,
})));
}
// Send valuechange with dead count and no subkeys
update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
key: *k,
subkeys: ValueSubkeyRangeSet::new(),
count: 0,
value: None,
})));
}
}
}

View File

@ -4,7 +4,7 @@ impl StorageManager {
// Flush records stores to disk and remove dead records
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn flush_record_stores_task_routine(
self,
&self,
_stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,

View File

@ -11,61 +11,61 @@ impl StorageManager {
// Set flush records tick task
log_stor!(debug "starting flush record stores task");
{
let this = self.clone();
let registry = self.registry();
self.flush_record_stores_task.set_routine(move |s, l, t| {
Box::pin(this.clone().flush_record_stores_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
Box::pin(async move {
let this = registry.lookup::<StorageManager>().unwrap();
this.flush_record_stores_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.await
})
});
}
// Set offline subkey writes tick task
log_stor!(debug "starting offline subkey writes task");
{
let this = self.clone();
let registry = self.registry();
self.offline_subkey_writes_task.set_routine(move |s, l, t| {
Box::pin(this.clone().offline_subkey_writes_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
Box::pin(async move {
let this = registry.lookup::<StorageManager>().unwrap();
this.offline_subkey_writes_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.await
})
});
}
// Set send value changes tick task
log_stor!(debug "starting send value changes task");
{
let this = self.clone();
let registry = self.registry();
self.send_value_changes_task.set_routine(move |s, l, t| {
Box::pin(this.clone().send_value_changes_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
Box::pin(async move {
let this = registry.lookup::<StorageManager>().unwrap();
this.send_value_changes_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.await
})
});
}
// Set check active watches tick task
log_stor!(debug "starting check active watches task");
{
let this = self.clone();
let registry = self.registry();
self.check_active_watches_task.set_routine(move |s, l, t| {
Box::pin(this.clone().check_active_watches_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
Box::pin(async move {
let this = registry.lookup::<StorageManager>().unwrap();
this.check_active_watches_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.await
})
});
}
// Set check watched records tick task
log_stor!(debug "starting checked watched records task");
{
let this = self.clone();
let registry = self.registry();
self.check_watched_records_task.set_routine(move |s, l, t| {
Box::pin(this.clone().check_watched_records_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
Box::pin(async move {
let this = registry.lookup::<StorageManager>().unwrap();
this.check_watched_records_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.await
})
});
}
}

View File

@ -35,19 +35,19 @@ impl StorageManager {
// Write a single offline subkey
#[instrument(level = "trace", target = "stor", skip_all, err)]
async fn write_single_offline_subkey(
self,
&self,
stop_token: StopToken,
key: TypedKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
) -> EyreResult<OfflineSubkeyWriteResult> {
let Some(rpc_processor) = self.online_writes_ready().await? else {
let Some(rpc_processor) = self.get_ready_rpc_processor() else {
// Cancel this operation because we're offline
return Ok(OfflineSubkeyWriteResult::Cancelled);
};
let get_result = {
let mut inner = self.lock().await?;
inner.handle_get_local_value(key, subkey, true).await
let mut inner = self.inner.lock().await;
Self::handle_get_local_value_inner(&mut *inner, key, subkey, true).await
};
let Ok(get_result) = get_result else {
log_stor!(debug "Offline subkey write had no subkey result: {}:{}", key, subkey);
@ -66,7 +66,7 @@ impl StorageManager {
log_stor!(debug "Offline subkey write: {}:{} len={}", key, subkey, value.value_data().data().len());
let osvres = self
.outbound_set_value(
rpc_processor,
&rpc_processor,
key,
subkey,
safety_selection,
@ -88,15 +88,16 @@ impl StorageManager {
// Set the new value if it differs from what was asked to set
if result.signed_value_data.value_data() != value.value_data() {
// Record the newer value and send and update since it is different than what we just set
let mut inner = self.lock().await?;
inner
.handle_set_local_value(
key,
subkey,
result.signed_value_data.clone(),
WatchUpdateMode::UpdateAll,
)
.await?;
let mut inner = self.inner.lock().await;
Self::handle_set_local_value_inner(
&mut *inner,
key,
subkey,
result.signed_value_data.clone(),
WatchUpdateMode::UpdateAll,
)
.await?;
}
return Ok(OfflineSubkeyWriteResult::Finished(result));
@ -120,7 +121,7 @@ impl StorageManager {
// Write a set of subkeys of the same key
#[instrument(level = "trace", target = "stor", skip_all, err)]
async fn process_work_item(
self,
&self,
stop_token: StopToken,
work_item: WorkItem,
) -> EyreResult<WorkItemResult> {
@ -133,7 +134,6 @@ impl StorageManager {
}
let result = match self
.clone()
.write_single_offline_subkey(
stop_token.clone(),
work_item.key,
@ -217,7 +217,8 @@ impl StorageManager {
}
// Keep the list of nodes that returned a value for later reference
inner.process_fanout_results(
Self::process_fanout_results_inner(
inner,
result.key,
result.fanout_results.iter().map(|x| (x.0, &x.1)),
true,
@ -226,7 +227,7 @@ impl StorageManager {
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn process_offline_subkey_writes(
self,
&self,
stop_token: StopToken,
work_items: Arc<Mutex<VecDeque<WorkItem>>>,
) -> EyreResult<()> {
@ -236,11 +237,10 @@ impl StorageManager {
break;
};
let result = self
.clone()
.process_work_item(stop_token.clone(), work_item)
.await?;
{
let mut inner = self.lock().await?;
let mut inner = self.inner.lock().await;
Self::process_single_result_inner(&mut inner, result);
}
}
@ -251,7 +251,7 @@ impl StorageManager {
// Best-effort write subkeys to the network that were written offline
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn offline_subkey_writes_task_routine(
self,
&self,
stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
@ -272,7 +272,6 @@ impl StorageManager {
// Process everything
let res = self
.clone()
.process_offline_subkey_writes(stop_token, work_items)
.await;

View File

@ -126,7 +126,7 @@ impl TableStore {
}
}
// Flush internal control state (must not use crypto)
// Flush internal control state
async fn flush(&self) {
let (all_table_names_value, all_tables_db) = {
let inner = self.inner.lock();
@ -421,80 +421,94 @@ impl TableStore {
#[instrument(level = "trace", target = "tstore", skip_all)]
async fn init_async(&self) -> EyreResult<()> {
let _async_guard = self.async_lock.lock().await;
{
let _async_guard = self.async_lock.lock().await;
// Get device encryption key from protected store
let mut device_encryption_key = self.load_device_encryption_key().await?;
let mut device_encryption_key_changed = false;
if let Some(device_encryption_key) = device_encryption_key {
// If encryption in current use is not the best encryption, then run table migration
let best_kind = best_crypto_kind();
if device_encryption_key.kind != best_kind {
// XXX: Run migration. See issue #209
// Get device encryption key from protected store
let mut device_encryption_key = self.load_device_encryption_key().await?;
let mut device_encryption_key_changed = false;
if let Some(device_encryption_key) = device_encryption_key {
// If encryption in current use is not the best encryption, then run table migration
let best_kind = best_crypto_kind();
if device_encryption_key.kind != best_kind {
// XXX: Run migration. See issue #209
}
} else {
// If we don't have an encryption key yet, then make one with the best cryptography and save it
let best_kind = best_crypto_kind();
let mut shared_secret = SharedSecret::default();
random_bytes(&mut shared_secret.bytes);
device_encryption_key = Some(TypedSharedSecret::new(best_kind, shared_secret));
device_encryption_key_changed = true;
}
} else {
// If we don't have an encryption key yet, then make one with the best cryptography and save it
let best_kind = best_crypto_kind();
let mut shared_secret = SharedSecret::default();
random_bytes(&mut shared_secret.bytes);
device_encryption_key = Some(TypedSharedSecret::new(best_kind, shared_secret));
device_encryption_key_changed = true;
}
// Check for password change
let changing_password = self.config().with(|c| {
c.protected_store
.new_device_encryption_key_password
.is_some()
});
// Check for password change
let changing_password = self.config().with(|c| {
c.protected_store
.new_device_encryption_key_password
.is_some()
});
// Save encryption key if it has changed or if the protecting password wants to change
if device_encryption_key_changed || changing_password {
self.save_device_encryption_key(device_encryption_key)
.await?;
}
// Save encryption key if it has changed or if the protecting password wants to change
if device_encryption_key_changed || changing_password {
self.save_device_encryption_key(device_encryption_key)
.await?;
}
// Deserialize all table names
let all_tables_db = self
.table_store_driver
.open("__veilid_all_tables", 1)
.await
.wrap_err("failed to create all tables table")?;
match all_tables_db.get(0, ALL_TABLE_NAMES).await {
Ok(Some(v)) => match deserialize_json_bytes::<HashMap<String, String>>(&v) {
Ok(all_table_names) => {
let mut inner = self.inner.lock();
inner.all_table_names = all_table_names;
// Deserialize all table names
let all_tables_db = self
.table_store_driver
.open("__veilid_all_tables", 1)
.await
.wrap_err("failed to create all tables table")?;
match all_tables_db.get(0, ALL_TABLE_NAMES).await {
Ok(Some(v)) => match deserialize_json_bytes::<HashMap<String, String>>(&v) {
Ok(all_table_names) => {
let mut inner = self.inner.lock();
inner.all_table_names = all_table_names;
}
Err(e) => {
error!("could not deserialize __veilid_all_tables: {}", e);
}
},
Ok(None) => {
// No table names yet, that's okay
log_tstore!("__veilid_all_tables is empty");
}
Err(e) => {
error!("could not deserialize __veilid_all_tables: {}", e);
error!("could not get __veilid_all_tables: {}", e);
}
},
Ok(None) => {
// No table names yet, that's okay
log_tstore!("__veilid_all_tables is empty");
}
Err(e) => {
error!("could not get __veilid_all_tables: {}", e);
}
};
};
{
let mut inner = self.inner.lock();
inner.encryption_key = device_encryption_key;
inner.all_tables_db = Some(all_tables_db);
{
let mut inner = self.inner.lock();
inner.encryption_key = device_encryption_key;
inner.all_tables_db = Some(all_tables_db);
}
let do_delete = self.config().with(|c| c.table_store.delete);
if do_delete {
self.delete_all().await;
}
}
let do_delete = self.config().with(|c| c.table_store.delete);
if do_delete {
self.delete_all().await;
}
// Set up crypto
let crypto = self.crypto();
crypto.table_store_setup(self).await?;
Ok(())
}
#[instrument(level = "trace", target = "tstore", skip_all)]
async fn post_init_async(&self) -> EyreResult<()> {
Ok(())
}
#[instrument(level = "trace", target = "tstore", skip_all)]
async fn pre_terminate_async(&self) {}
#[instrument(level = "trace", target = "tstore", skip_all)]
async fn terminate_async(&self) {
let _async_guard = self.async_lock.lock().await;