Inspect watched records for value changes made while offline when coming back online

This commit is contained in:
Christien Rioux 2025-05-19 14:52:42 -04:00
parent 49173b9373
commit f4ef16eef3
15 changed files with 182 additions and 104 deletions

View file

@ -94,7 +94,7 @@ thiserror = "1.0.69"
# Data structures
enumset = { version = "1.1.5", features = ["serde"] }
keyvaluedb = "0.1.2"
keyvaluedb = "0.1.3"
range-set-blaze = "0.1.16"
weak-table = "0.3.2"
hashlink = { package = "veilid-hashlink", version = "0.1.1", features = [
@ -180,7 +180,7 @@ futures-util = { version = "0.3.31", default-features = false, features = [
# Data structures
keyring-manager = "0.5.1"
keyvaluedb-sqlite = "0.1.2"
keyvaluedb-sqlite = "0.1.3"
# Network
async-tungstenite = { version = "0.27.0", features = ["async-tls"] }
@ -213,7 +213,7 @@ send_wrapper = { version = "0.6.0", features = ["futures"] }
serde_bytes = { version = "0.11", default-features = false, features = [
"alloc",
] }
tsify = { version = "0.5.5", features = ["js"] }
tsify = { version = "0.5.5", features = ["js"] }
serde-wasm-bindgen = "0.6.5"
# Network
@ -223,7 +223,7 @@ ws_stream_wasm = "0.7.4"
wasm-logger = "0.2.0"
# Data Structures
keyvaluedb-web = "0.1.2"
keyvaluedb-web = "0.1.3"
### Configuration for WASM32 'web-sys' crate
[target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies.web-sys]

View file

@ -376,17 +376,17 @@ macro_rules! impl_subscribe_event_bus {
pub(crate) use impl_subscribe_event_bus;
// macro_rules! impl_subscribe_event_bus_async {
// ($this:expr, $this_type:ty, $event_handler:ident ) => {{
// let registry = $this.registry();
// $this.event_bus().subscribe(move |evt| {
// let registry = registry.clone();
// Box::pin(async move {
// let this = registry.lookup::<$this_type>().unwrap();
// this.$event_handler(evt).await;
// })
// })
// }};
// }
macro_rules! impl_subscribe_event_bus_async {
($this:expr, $this_type:ty, $event_handler:ident ) => {{
let registry = $this.registry();
$this.event_bus().subscribe(move |evt| {
let registry = registry.clone();
Box::pin(async move {
let this = registry.lookup::<$this_type>().unwrap();
this.$event_handler(evt).await;
})
})
}};
}
// pub(crate) use impl_subscribe_event_bus_async;
pub(crate) use impl_subscribe_event_bus_async;

View file

@ -41,9 +41,9 @@ impl ProtectedStore {
pub fn delete_all(&self) -> EyreResult<()> {
for kpsk in &KNOWN_PROTECTED_STORE_KEYS {
if let Err(e) = self.remove_user_secret(kpsk) {
error!("failed to delete '{}': {}", kpsk, e);
veilid_log!(self error "failed to delete '{}': {}", kpsk, e);
} else {
veilid_log!(self debug "deleted table '{}'", kpsk);
veilid_log!(self debug "deleted protected store key '{}'", kpsk);
}
}
Ok(())
@ -97,7 +97,6 @@ impl ProtectedStore {
);
}
if inner.keyring_manager.is_none() {
veilid_log!(self error "QWERQWER");
bail!("Could not initialize the protected store.");
}
c.protected_store.delete

View file

@ -22,9 +22,9 @@ impl ProtectedStore {
pub fn delete_all(&self) -> EyreResult<()> {
for kpsk in &KNOWN_PROTECTED_STORE_KEYS {
if let Err(e) = self.remove_user_secret(kpsk) {
error!("failed to delete '{}': {}", kpsk, e);
veilid_log!(self error "failed to delete protected store key '{}': {}", kpsk, e);
} else {
veilid_log!(self debug "deleted table '{}'", kpsk);
veilid_log!(self debug "deleted protected store key '{}'", kpsk);
}
}
Ok(())
@ -32,6 +32,10 @@ impl ProtectedStore {
#[instrument(level = "debug", skip(self), err)]
async fn init_async(&self) -> EyreResult<()> {
if self.config().with(|c| c.protected_store.delete) {
self.delete_all()?;
}
Ok(())
}
@ -124,7 +128,6 @@ impl ProtectedStore {
};
let vkey = self.browser_key_name(key.as_ref());
ls.get_item(&vkey)
.map_err(map_jsvalue_error)
.wrap_err("exception_thrown")

View file

@ -1268,7 +1268,8 @@ impl NetworkManager {
fn peer_info_change_event_handler(&self, evt: Arc<PeerInfoChangeEvent>) {
let mut inner = self.inner.lock();
if let Some(address_check) = inner.address_check.as_mut() {
address_check.report_peer_info_change(evt.routing_domain, evt.opt_peer_info.clone());
address_check
.report_peer_info_change(evt.routing_domain, evt.opt_new_peer_info.clone());
}
}

View file

@ -1,11 +1,6 @@
use super::*;
pub(crate) struct PeerInfoChangeEvent {
pub routing_domain: RoutingDomain,
pub opt_peer_info: Option<Arc<PeerInfo>>,
}
pub(crate) struct SocketAddressChangeEvent {
pub struct SocketAddressChangeEvent {
pub routing_domain: RoutingDomain, // the routing domain this flow is over
pub socket_address: SocketAddress, // the socket address as seen by the remote peer
pub old_socket_address: Option<SocketAddress>, // the socket address previously for this peer

View file

@ -134,7 +134,7 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
}
fn publish_peer_info(&self, rti: &RoutingTableInner) -> bool {
let opt_peer_info = {
let (opt_old_peer_info, opt_new_peer_info) = {
let opt_new_peer_info = {
let pi = self.get_peer_info(rti);
@ -156,7 +156,9 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
// Don't publish if the peer info hasnt changed from our previous publication
let mut ppi_lock = self.published_peer_info.lock();
if let Some(old_peer_info) = &*ppi_lock {
let opt_old_peer_info = (*ppi_lock).clone();
if let Some(old_peer_info) = &opt_old_peer_info {
if let Some(new_peer_info) = &opt_new_peer_info {
if new_peer_info.equivalent(old_peer_info) {
veilid_log!(rti debug "[LocalNetwork] Not publishing peer info because it is equivalent");
@ -175,12 +177,13 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
}
*ppi_lock = opt_new_peer_info.clone();
opt_new_peer_info
(opt_old_peer_info, opt_new_peer_info)
};
if let Err(e) = rti.event_bus().post(PeerInfoChangeEvent {
routing_domain: RoutingDomain::LocalNetwork,
opt_peer_info,
opt_old_peer_info,
opt_new_peer_info,
}) {
veilid_log!(rti debug "Failed to post event: {}", e);
}

View file

@ -112,7 +112,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
}
fn publish_peer_info(&self, rti: &RoutingTableInner) -> bool {
let opt_peer_info = {
let (opt_old_peer_info, opt_new_peer_info) = {
let opt_new_peer_info = {
let pi = self.get_peer_info(rti);
@ -134,7 +134,9 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
// Don't publish if the peer info hasnt changed from our previous publication
let mut ppi_lock = self.published_peer_info.lock();
if let Some(old_peer_info) = &*ppi_lock {
let opt_old_peer_info = (*ppi_lock).clone();
if let Some(old_peer_info) = &opt_old_peer_info {
if let Some(new_peer_info) = &opt_new_peer_info {
if new_peer_info.equivalent(old_peer_info) {
veilid_log!(self debug "[PublicInternet] Not publishing peer info because it is equivalent");
@ -154,12 +156,13 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
*ppi_lock = opt_new_peer_info.clone();
opt_new_peer_info
(opt_old_peer_info, opt_new_peer_info)
};
if let Err(e) = rti.event_bus().post(PeerInfoChangeEvent {
routing_domain: RoutingDomain::PublicInternet,
opt_peer_info,
opt_old_peer_info,
opt_new_peer_info,
}) {
veilid_log!(self debug "Failed to post event: {}", e);
}

View file

@ -0,0 +1,7 @@
use super::*;
pub struct PeerInfoChangeEvent {
pub routing_domain: RoutingDomain,
pub opt_old_peer_info: Option<Arc<PeerInfo>>,
pub opt_new_peer_info: Option<Arc<PeerInfo>>,
}

View file

@ -1,6 +1,7 @@
mod contact_method;
mod dial_info_detail;
mod direction;
mod events;
#[cfg(feature = "geolocation")]
mod geolocation_info;
mod node_info;
@ -16,6 +17,7 @@ use super::*;
pub use contact_method::*;
pub use dial_info_detail::*;
pub use direction::*;
pub use events::*;
#[cfg(feature = "geolocation")]
pub use geolocation_info::*;
pub use node_info::*;

View file

@ -94,6 +94,8 @@ struct StorageManagerInner {
pub metadata_db: Option<TableDB>,
/// Background processing task (not part of attachment manager tick tree so it happens when detached too)
pub tick_future: Option<PinBoxFutureStatic<()>>,
/// PeerInfo subscription
peer_info_change_subscription: Option<EventBusSubscription>,
}
impl fmt::Debug for StorageManagerInner {
@ -107,6 +109,10 @@ impl fmt::Debug for StorageManagerInner {
.field("active_subkey_writes", &self.active_subkey_writes)
.field("rehydration_requests", &self.rehydration_requests)
.field("outbound_watch_manager", &self.outbound_watch_manager)
.field(
"peer_info_change_subscription",
&self.peer_info_change_subscription,
)
//.field("metadata_db", &self.metadata_db)
//.field("tick_future", &self.tick_future)
.finish()
@ -137,6 +143,9 @@ pub(crate) struct StorageManager {
// for offline subkey writes, watch changes, and any other
// background operations the storage manager wants to perform
background_operation_processor: DeferredStreamProcessor,
// Online check
is_online: AtomicBool,
}
impl fmt::Debug for StorageManager {
@ -161,6 +170,7 @@ impl fmt::Debug for StorageManager {
&self.background_operation_processor,
)
.field("anonymous_watch_keys", &self.anonymous_watch_keys)
.field("is_online", &self.is_online)
.finish()
}
}
@ -216,6 +226,7 @@ impl StorageManager {
outbound_watch_lock_table: AsyncTagLockTable::new(),
anonymous_watch_keys,
background_operation_processor: DeferredStreamProcessor::new(),
is_online: AtomicBool::new(false),
};
this.setup_tasks();
@ -295,6 +306,10 @@ impl StorageManager {
#[instrument(level = "trace", target = "tstore", skip_all)]
async fn post_init_async(&self) -> EyreResult<()> {
// Register event handlers
let peer_info_change_subscription =
impl_subscribe_event_bus_async!(self, Self, peer_info_change_event_handler);
let mut inner = self.inner.lock().await;
// Resolve outbound watch manager noderefs
@ -312,13 +327,14 @@ impl StorageManager {
}
});
inner.tick_future = Some(tick_future);
inner.peer_info_change_subscription = Some(peer_info_change_subscription);
Ok(())
}
#[instrument(level = "trace", target = "tstore", skip_all)]
async fn pre_terminate_async(&self) {
// Stop the background ticker process
// Stop background operations
{
let mut inner = self.inner.lock().await;
// Stop ticker
@ -326,6 +342,9 @@ impl StorageManager {
if let Some(f) = tick_future {
f.await;
}
if let Some(sub) = inner.peer_info_change_subscription.take() {
self.event_bus().unsubscribe(sub);
}
}
// Cancel all tasks associated with the tick future
@ -427,17 +446,7 @@ impl StorageManager {
}
pub(super) fn dht_is_online(&self) -> bool {
// Check if we have published peer info
// Note, this is a best-effort check, subject to race conditions on the network's state
if self
.routing_table()
.get_published_peer_info(RoutingDomain::PublicInternet)
.is_none()
{
return false;
}
true
self.is_online.load(Ordering::Acquire)
}
/// Get the set of nodes in our active watches
@ -1890,4 +1899,18 @@ impl StorageManager {
self.background_operation_processor
.add_stream(receiver.into_stream(), handler)
}
async fn peer_info_change_event_handler(&self, evt: Arc<PeerInfoChangeEvent>) {
// Note when we have come back online
if evt.routing_domain == RoutingDomain::PublicInternet {
if evt.opt_old_peer_info.is_none() && evt.opt_new_peer_info.is_some() {
self.is_online.store(true, Ordering::Release);
// Trigger online updates
self.change_inspect_all_watches().await;
} else if evt.opt_old_peer_info.is_some() && evt.opt_new_peer_info.is_none() {
self.is_online.store(false, Ordering::Release);
}
}
}
}

View file

@ -1292,4 +1292,32 @@ impl StorageManager {
Ok(NetworkResult::value(()))
}
/// Check all watches for changes
/// Used when we come back online from being offline and may have
/// missed some ValueChanged notifications
#[instrument(level = "trace", target = "watch", skip_all)]
pub async fn change_inspect_all_watches(&self) {
let mut inner = self.inner.lock().await;
let mut change_inspects = vec![];
for (record_key, outbound_watch) in &inner.outbound_watch_manager.outbound_watches {
if let Some(state) = outbound_watch.state() {
let reportable_subkeys = state.params().subkeys.clone();
change_inspects.push((*record_key, reportable_subkeys));
}
}
if change_inspects.is_empty() {
return;
}
veilid_log!(self debug "change inspecting {} watches", change_inspects.len());
for change_inspect in change_inspects {
inner
.outbound_watch_manager
.enqueue_change_inspect(change_inspect.0, change_inspect.1);
}
}
}

View file

@ -283,6 +283,7 @@ pub async fn test_store_load_json_many(ts: &TableStore) {
let mut r = 0;
let start_ts = Timestamp::now();
let mut keys = HashSet::new();
loop {
while r < rows && unord.len() < parallel {
let key = format!("key_{}", r);
@ -290,6 +291,7 @@ pub async fn test_store_load_json_many(ts: &TableStore) {
unord.push(Box::pin(async {
let key = key;
db.store_json(0, key.as_bytes(), &value)
.await
.expect("should store");
@ -299,12 +301,21 @@ pub async fn test_store_load_json_many(ts: &TableStore) {
.expect("should load")
.expect("should exist");
assert_eq!(value, value2);
key.as_bytes().to_vec()
}));
}
if unord.next().await.is_none() {
if let Some(res) = unord.next().await {
keys.insert(res);
} else {
break;
}
}
let stored_keys = db.get_keys(0).await.expect("should get keys");
let stored_keys_set = stored_keys.into_iter().collect::<HashSet<_>>();
assert_eq!(stored_keys_set, keys, "should have same keys");
let end_ts = Timestamp::now();
trace!("test_store_load_json_many duration={}", (end_ts - start_ts));
}