watch maintenance tasks

This commit is contained in:
Christien Rioux 2023-12-05 20:00:08 -05:00
parent a67bfde1f7
commit b46fd7690f
9 changed files with 172 additions and 14 deletions

View File

@ -276,6 +276,12 @@ impl RoutingTable {
inner.route_spec_store = Some(route_spec_store); inner.route_spec_store = Some(route_spec_store);
} }
// Inform storage manager we are up
self.network_manager
.storage_manager()
.set_routing_table(Some(self.clone()))
.await;
debug!("finished routing table init"); debug!("finished routing table init");
Ok(()) Ok(())
} }
@ -284,6 +290,12 @@ impl RoutingTable {
pub async fn terminate(&self) { pub async fn terminate(&self) {
debug!("starting routing table terminate"); debug!("starting routing table terminate");
// Stop storage manager from using us
self.network_manager
.storage_manager()
.set_routing_table(None)
.await;
// Stop tasks // Stop tasks
self.cancel_tasks().await; self.cancel_tasks().await;

View File

@ -33,6 +33,8 @@ const FLUSH_RECORD_STORES_INTERVAL_SECS: u32 = 1;
const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 1; const OFFLINE_SUBKEY_WRITES_INTERVAL_SECS: u32 = 1;
/// Frequency to send ValueChanged notifications to the network /// Frequency to send ValueChanged notifications to the network
const SEND_VALUE_CHANGES_INTERVAL_SECS: u32 = 1; const SEND_VALUE_CHANGES_INTERVAL_SECS: u32 = 1;
/// Frequence to check for dead nodes and routes for active watches
const CHECK_ACTIVE_WATCHES_INTERVAL_SECS: u32 = 1;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
/// A single 'value changed' message to send /// A single 'value changed' message to send
@ -55,6 +57,7 @@ struct StorageManagerUnlockedInner {
flush_record_stores_task: TickTask<EyreReport>, flush_record_stores_task: TickTask<EyreReport>,
offline_subkey_writes_task: TickTask<EyreReport>, offline_subkey_writes_task: TickTask<EyreReport>,
send_value_changes_task: TickTask<EyreReport>, send_value_changes_task: TickTask<EyreReport>,
check_active_watches_task: TickTask<EyreReport>,
// Anonymous watch keys // Anonymous watch keys
anonymous_watch_keys: TypedKeyPairGroup, anonymous_watch_keys: TypedKeyPairGroup,
@ -90,6 +93,7 @@ impl StorageManager {
flush_record_stores_task: TickTask::new(FLUSH_RECORD_STORES_INTERVAL_SECS), flush_record_stores_task: TickTask::new(FLUSH_RECORD_STORES_INTERVAL_SECS),
offline_subkey_writes_task: TickTask::new(OFFLINE_SUBKEY_WRITES_INTERVAL_SECS), offline_subkey_writes_task: TickTask::new(OFFLINE_SUBKEY_WRITES_INTERVAL_SECS),
send_value_changes_task: TickTask::new(SEND_VALUE_CHANGES_INTERVAL_SECS), send_value_changes_task: TickTask::new(SEND_VALUE_CHANGES_INTERVAL_SECS),
check_active_watches_task: TickTask::new(CHECK_ACTIVE_WATCHES_INTERVAL_SECS),
anonymous_watch_keys, anonymous_watch_keys,
} }
@ -149,7 +153,12 @@ impl StorageManager {
pub async fn set_rpc_processor(&self, opt_rpc_processor: Option<RPCProcessor>) { pub async fn set_rpc_processor(&self, opt_rpc_processor: Option<RPCProcessor>) {
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
inner.rpc_processor = opt_rpc_processor inner.opt_rpc_processor = opt_rpc_processor
}
pub async fn set_routing_table(&self, opt_routing_table: Option<RoutingTable>) {
let mut inner = self.inner.lock().await;
inner.opt_routing_table = opt_routing_table
} }
async fn lock(&self) -> VeilidAPIResult<AsyncMutexGuardArc<StorageManagerInner>> { async fn lock(&self) -> VeilidAPIResult<AsyncMutexGuardArc<StorageManagerInner>> {
@ -161,7 +170,7 @@ impl StorageManager {
} }
fn online_writes_ready_inner(inner: &StorageManagerInner) -> Option<RPCProcessor> { fn online_writes_ready_inner(inner: &StorageManagerInner) -> Option<RPCProcessor> {
if let Some(rpc_processor) = { inner.rpc_processor.clone() } { if let Some(rpc_processor) = { inner.opt_rpc_processor.clone() } {
if let Some(network_class) = rpc_processor if let Some(network_class) = rpc_processor
.routing_table() .routing_table()
.get_network_class(RoutingDomain::PublicInternet) .get_network_class(RoutingDomain::PublicInternet)
@ -234,7 +243,7 @@ impl StorageManager {
// No record yet, try to get it from the network // No record yet, try to get it from the network
// Get rpc processor and drop mutex so we don't block while getting the value from the network // Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = inner.rpc_processor.clone() else { let Some(rpc_processor) = inner.opt_rpc_processor.clone() else {
apibail_try_again!("offline, try again later"); apibail_try_again!("offline, try again later");
}; };
@ -284,7 +293,7 @@ impl StorageManager {
pub async fn close_record(&self, key: TypedKey) -> VeilidAPIResult<()> { pub async fn close_record(&self, key: TypedKey) -> VeilidAPIResult<()> {
let (opened_record, opt_rpc_processor) = { let (opened_record, opt_rpc_processor) = {
let mut inner = self.lock().await?; let mut inner = self.lock().await?;
(inner.close_record(key)?, inner.rpc_processor.clone()) (inner.close_record(key)?, inner.opt_rpc_processor.clone())
}; };
// Send a one-time cancel request for the watch if we have one and we're online // Send a one-time cancel request for the watch if we have one and we're online
@ -364,7 +373,7 @@ impl StorageManager {
// Refresh if we can // Refresh if we can
// Get rpc processor and drop mutex so we don't block while getting the value from the network // Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = inner.rpc_processor.clone() else { let Some(rpc_processor) = inner.opt_rpc_processor.clone() else {
// Return the existing value if we have one if we aren't online // Return the existing value if we have one if we aren't online
if let Some(last_subkey_result_value) = last_subkey_result.value { if let Some(last_subkey_result_value) = last_subkey_result.value {
return Ok(Some(last_subkey_result_value.value_data().clone())); return Ok(Some(last_subkey_result_value.value_data().clone()));
@ -562,7 +571,7 @@ impl StorageManager {
}; };
// Get rpc processor and drop mutex so we don't block while requesting the watch from the network // Get rpc processor and drop mutex so we don't block while requesting the watch from the network
let Some(rpc_processor) = inner.rpc_processor.clone() else { let Some(rpc_processor) = inner.opt_rpc_processor.clone() else {
apibail_try_again!("offline, try again later"); apibail_try_again!("offline, try again later");
}; };
@ -684,7 +693,7 @@ impl StorageManager {
async fn send_value_change(&self, vc: ValueChangedInfo) -> VeilidAPIResult<()> { async fn send_value_change(&self, vc: ValueChangedInfo) -> VeilidAPIResult<()> {
let rpc_processor = { let rpc_processor = {
let inner = self.inner.lock().await; let inner = self.inner.lock().await;
if let Some(rpc_processor) = &inner.rpc_processor { if let Some(rpc_processor) = &inner.opt_rpc_processor {
rpc_processor.clone() rpc_processor.clone()
} else { } else {
apibail_try_again!("network is not available"); apibail_try_again!("network is not available");

View File

@ -859,7 +859,7 @@ where
} }
let mut evcis = vec![]; let mut evcis = vec![];
let mut empty_watched_records = vec![];
for rtk in self.changed_watched_values.drain() { for rtk in self.changed_watched_values.drain() {
if let Some(watch) = self.watched_records.get_mut(&rtk) { if let Some(watch) = self.watched_records.get_mut(&rtk) {
// Process watch notifications // Process watch notifications
@ -888,9 +888,15 @@ where
// Remove in reverse so we don't have to offset the index to remove the right key // Remove in reverse so we don't have to offset the index to remove the right key
for dw in dead_watchers.iter().rev().copied() { for dw in dead_watchers.iter().rev().copied() {
watch.watchers.remove(dw); watch.watchers.remove(dw);
if watch.watchers.is_empty() {
empty_watched_records.push(rtk);
} }
} }
} }
}
for ewr in empty_watched_records {
self.watched_records.remove(&ewr);
}
for evci in evcis { for evci in evcis {
// Get the first subkey data // Get the first subkey data

View File

@ -25,7 +25,9 @@ pub(super) struct StorageManagerInner {
/// Storage manager metadata that is persistent, including copy of offline subkey writes /// Storage manager metadata that is persistent, including copy of offline subkey writes
pub metadata_db: Option<TableDB>, pub metadata_db: Option<TableDB>,
/// RPC processor if it is available /// RPC processor if it is available
pub rpc_processor: Option<RPCProcessor>, pub opt_rpc_processor: Option<RPCProcessor>,
/// Routing table if it is available
pub opt_routing_table: Option<RoutingTable>,
/// Background processing task (not part of attachment manager tick tree so it happens when detached too) /// Background processing task (not part of attachment manager tick tree so it happens when detached too)
pub tick_future: Option<SendPinBoxFuture<()>>, pub tick_future: Option<SendPinBoxFuture<()>>,
/// Update callback to send ValueChanged notification to /// Update callback to send ValueChanged notification to
@ -78,7 +80,8 @@ impl StorageManagerInner {
remote_record_store: Default::default(), remote_record_store: Default::default(),
offline_subkey_writes: Default::default(), offline_subkey_writes: Default::default(),
metadata_db: Default::default(), metadata_db: Default::default(),
rpc_processor: Default::default(), opt_rpc_processor: Default::default(),
opt_routing_table: Default::default(),
tick_future: Default::default(), tick_future: Default::default(),
update_callback: None, update_callback: None,
} }
@ -437,7 +440,7 @@ impl StorageManagerInner {
}; };
// Get routing table to see if we still know about these nodes // Get routing table to see if we still know about these nodes
let Some(routing_table) = self.rpc_processor.as_ref().map(|r| r.routing_table()) else { let Some(routing_table) = self.opt_rpc_processor.as_ref().map(|r| r.routing_table()) else {
apibail_try_again!("offline, try again later"); apibail_try_again!("offline, try again later");
}; };

View File

@ -0,0 +1,66 @@
use super::*;
impl StorageManager {
// Flush records stores to disk and remove dead records and send watch notifications
#[instrument(level = "trace", skip(self), err)]
pub(super) async fn check_active_watches_task_routine(
self,
stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
{
let mut inner = self.inner.lock().await;
let Some(routing_table) = inner.opt_routing_table.clone() else {
return Ok(());
};
let rss = routing_table.route_spec_store();
let opt_update_callback = inner.update_callback.clone();
let cur_ts = get_aligned_timestamp();
for (k, v) in inner.opened_records.iter_mut() {
// If no active watch, then skip this
let Some(active_watch) = v.active_watch() else {
continue;
};
// See if the active watch's node is dead
let mut is_dead = false;
if matches!(
active_watch.watch_node.state(cur_ts),
BucketEntryState::Dead
) {
// Watched node is dead
is_dead = true;
}
// See if the private route we're using is dead
if !is_dead {
if let Some(value_changed_route) = active_watch.opt_value_changed_route {
if rss.get_route_id_for_key(&value_changed_route).is_none() {
// Route we would receive value changes on is dead
is_dead = true;
}
}
}
if is_dead {
if let Some(update_callback) = opt_update_callback.clone() {
// Send valuechange with dead count and no subkeys
update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
key: *k,
subkeys: ValueSubkeyRangeSet::new(),
count: 0,
value: ValueData::default(),
})));
}
v.clear_active_watch();
}
}
}
Ok(())
}
}

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
impl StorageManager { impl StorageManager {
// Flush records stores to disk and remove dead records and send watch notifications // Flush records stores to disk and remove dead records
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(crate) async fn flush_record_stores_task_routine( pub(crate) async fn flush_record_stores_task_routine(
self, self,

View File

@ -1,3 +1,4 @@
pub mod check_active_watches;
pub mod flush_record_stores; pub mod flush_record_stores;
pub mod offline_subkey_writes; pub mod offline_subkey_writes;
pub mod send_value_changes; pub mod send_value_changes;
@ -69,12 +70,36 @@ impl StorageManager {
) )
}); });
} }
// Set check active watches tick task
debug!("starting check active watches task");
{
let this = self.clone();
self.unlocked_inner
.check_active_watches_task
.set_routine(move |s, l, t| {
Box::pin(
this.clone()
.check_active_watches_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
)
.instrument(trace_span!(
parent: None,
"StorageManager check active watches task routine"
)),
)
});
}
} }
pub async fn tick(&self) -> EyreResult<()> { pub async fn tick(&self) -> EyreResult<()> {
// Run the flush stores task // Run the flush stores task
self.unlocked_inner.flush_record_stores_task.tick().await?; self.unlocked_inner.flush_record_stores_task.tick().await?;
// Check active watches
self.unlocked_inner.check_active_watches_task.tick().await?;
// Run online-only tasks // Run online-only tasks
if self.online_writes_ready().await?.is_some() { if self.online_writes_ready().await?.is_some() {
// Run offline subkey writes task if there's work to be done // Run offline subkey writes task if there's work to be done
@ -92,6 +117,10 @@ impl StorageManager {
} }
pub(crate) async fn cancel_tasks(&self) { pub(crate) async fn cancel_tasks(&self) {
debug!("stopping check active watches task");
if let Err(e) = self.unlocked_inner.check_active_watches_task.stop().await {
warn!("check_active_watches_task not stopped: {}", e);
}
debug!("stopping send value changes task"); debug!("stopping send value changes task");
if let Err(e) = self.unlocked_inner.send_value_changes_task.stop().await { if let Err(e) = self.unlocked_inner.send_value_changes_task.stop().await {
warn!("send_value_changes_task not stopped: {}", e); warn!("send_value_changes_task not stopped: {}", e);

View File

@ -3,7 +3,7 @@ use futures_util::StreamExt;
use stop_token::future::FutureExt; use stop_token::future::FutureExt;
impl StorageManager { impl StorageManager {
// Flush records stores to disk and remove dead records and send watch notifications // Send value change notifications across the network
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(super) async fn send_value_changes_task_routine( pub(super) async fn send_value_changes_task_routine(
self, self,

View File

@ -219,7 +219,7 @@ impl StorageManager {
&self, &self,
key: TypedKey, key: TypedKey,
subkeys: ValueSubkeyRangeSet, subkeys: ValueSubkeyRangeSet,
count: u32, mut count: u32,
value: Arc<SignedValueData>, value: Arc<SignedValueData>,
) -> VeilidAPIResult<()> { ) -> VeilidAPIResult<()> {
// Update local record store with new value // Update local record store with new value
@ -233,8 +233,40 @@ impl StorageManager {
} else { } else {
VeilidAPIResult::Ok(()) VeilidAPIResult::Ok(())
}; };
let Some(opened_record) = inner.opened_records.get_mut(&key) else {
// Don't send update or update the ActiveWatch if this record is closed
return res;
};
let Some(mut active_watch) = opened_record.active_watch() else {
// No active watch means no callback
return res;
};
if count > active_watch.count {
// If count is greater than our requested count then this is invalid, cancel the watch
log_stor!(debug "watch count went backward: {}: {}/{}", key, count, active_watch.count);
// Force count to zero
count = 0;
opened_record.clear_active_watch();
} else if count == 0 {
// If count is zero, we're done, cancel the watch and the app can renew it if it wants
log_stor!(debug "watch count finished: {}", key);
opened_record.clear_active_watch();
} else {
log_stor!(
"watch count decremented: {}: {}/{}",
key,
count,
active_watch.count
);
active_watch.count = count;
opened_record.set_active_watch(active_watch);
}
(res, inner.update_callback.clone()) (res, inner.update_callback.clone())
}; };
// Announce ValueChanged VeilidUpdate // Announce ValueChanged VeilidUpdate
if let Some(update_callback) = opt_update_callback { if let Some(update_callback) = opt_update_callback {
update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange { update_callback(VeilidUpdate::ValueChange(Box::new(VeilidValueChange {
@ -244,6 +276,7 @@ impl StorageManager {
value: value.value_data().clone(), value: value.value_data().clone(),
}))); })));
} }
res res
} }
} }