From f4ef16eef30e2f35ba1a3cc1605ef33f5d3eea31 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Mon, 19 May 2025 14:52:42 -0400 Subject: [PATCH] Inspect watched records for value changes made while offline when coming back online --- CHANGELOG.md | 7 +- Cargo.lock | 102 +++++++++--------- veilid-core/Cargo.toml | 8 +- veilid-core/src/component.rs | 26 ++--- .../src/intf/native/protected_store.rs | 5 +- veilid-core/src/intf/wasm/protected_store.rs | 9 +- veilid-core/src/network_manager/mod.rs | 3 +- .../src/network_manager/types/events.rs | 7 +- .../routing_domains/local_network/mod.rs | 11 +- .../routing_domains/public_internet/mod.rs | 11 +- veilid-core/src/routing_table/types/events.rs | 7 ++ veilid-core/src/routing_table/types/mod.rs | 2 + veilid-core/src/storage_manager/mod.rs | 47 +++++--- .../src/storage_manager/watch_value.rs | 28 +++++ .../src/table_store/tests/test_table_store.rs | 13 ++- 15 files changed, 182 insertions(+), 104 deletions(-) create mode 100644 veilid-core/src/routing_table/types/events.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 32b05d81..2e6ad2be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,9 @@ -**On Nightly** +**UNRELEASED** + +- veilid-core: + - Update keyvaluedb to 0.1.3 + - Inspect watched records for value changes made while offline when coming back online + - Additional table store unit test - veilid-flutter: - Fix exception handling for WASM diff --git a/Cargo.lock b/Cargo.lock index 99defc40..e59121fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -730,9 +730,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.9.0" +version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" +checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" [[package]] name = "bitmaps" @@ -831,9 +831,9 @@ dependencies = [ [[package]] name = "bosion" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "858150532ced83f59a49236ccade71f0ec46362dbd1632affdf79a5c299462cd" +checksum = "812324ec2142bc1152978f22772cedd56cc617094d9d24fab05643149b4f185f" dependencies = [ "time", ] @@ -882,9 +882,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.22" +version = "1.2.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32db95edf998450acc7881c932f94cd9b05c87b4b2599e8bab064753da4acfd1" +checksum = "5f4ac86a9e5bc1e2b3449ab9d7d3a6a405e3d1bb28d7b9be8614f55846ae3766" dependencies = [ "shlex", ] @@ -1292,7 +1292,7 @@ version = "0.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "crossterm_winapi", "libc", "mio 0.8.11", @@ -1308,7 +1308,7 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8b9f2e4c67f833b660cdb0a3523065869fb35570177239812ed4c905aeff87b" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "crossterm_winapi", "derive_more", "document-features", @@ -1671,7 +1671,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89a09f22a6c6069a18470eb92d2298acf25463f14256d24778e1230d789a2aec" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "objc2", ] @@ -1866,9 +1866,9 @@ checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "errno" -version = "0.3.11" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976dd42dc7e85965fe702eb8164f21f450704bdde31faefd6471dba214cb594e" +checksum = "cea14ef9355e3beab063703aa9dab15afd25f0667c341310c1e5274bb1d0da18" dependencies = [ "libc", "windows-sys 0.59.0", @@ -2709,7 +2709,7 @@ dependencies = [ "js-sys", "log", "wasm-bindgen", - "windows-core 0.61.0", + "windows-core 0.61.2", ] [[package]] @@ -2770,9 +2770,9 @@ checksum = "00210d6893afc98edb752b664b8890f0ef174c8adbb8d0be9710fa66fbbf72d3" [[package]] name = "icu_properties" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2549ca8c7241c82f59c80ba2a6f415d931c5b58d24fb8412caa1a1f02c49139a" +checksum = "016c619c1eeb94efb86809b015c58f479963de65bdb6253345c1a1276f22e32b" dependencies = [ "displaydoc", "icu_collections", @@ -2786,9 +2786,9 @@ dependencies = [ [[package]] name = "icu_properties_data" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8197e866e47b68f8f7d95249e172903bec06004b18b2937f1095d40a0c57de04" +checksum = "298459143998310acd25ffe6810ed544932242d3f07083eee1084d83a71bd632" [[package]] name = "icu_provider" @@ -3056,18 +3056,18 @@ dependencies = [ [[package]] name = "keyvaluedb" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3fe4850c4103a92a7bd14a56ecbe413c27f8c89ea642cb4753b310c30dff812" +checksum = "ad0c34a6c4cdaa09c7de9d712b7bbda2a111ebba238751696b522e261b5501e0" dependencies = [ "smallvec", ] [[package]] name = "keyvaluedb-memorydb" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5e8d196e170cdf21ee4fb0ff779278ca24253c10abee1a49914a893dce71097" +checksum = "462f214495626d3245889a431c608f6791a10623735593cccfc9cbf4c72a73f1" dependencies = [ "keyvaluedb", "parking_lot 0.12.3", @@ -3075,9 +3075,9 @@ dependencies = [ [[package]] name = "keyvaluedb-sqlite" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa4d3df76ca45b92891e22fbd0a0928d2ef848a65051d5bd2da43ed1c0dfeb6f" +checksum = "d69f492eb8a28913fcb3b741b94abf3986933efab9a305488237407052e002e8" dependencies = [ "hex", "keyvaluedb", @@ -3088,18 +3088,16 @@ dependencies = [ [[package]] name = "keyvaluedb-web" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "351f750d6c87e6af74cdead18c33bb14fa47882787873bc05faa0002078cb9e5" +checksum = "81293b271c88dc807e3e9eec9e31bc62c2dff6b9938ae2c5fc77f73f451aae8f" dependencies = [ - "async-lock 2.8.0", "flume", "futures", "js-sys", "keyvaluedb", "keyvaluedb-memorydb", "log", - "parking_lot 0.12.3", "send_wrapper 0.6.0", "wasm-bindgen", "web-sys", @@ -3157,7 +3155,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "libc", "redox_syscall 0.5.12", ] @@ -3566,7 +3564,7 @@ version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "cfg-if 1.0.0", "libc", ] @@ -3577,7 +3575,7 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "cfg-if 1.0.0", "cfg_aliases", "libc", @@ -3589,7 +3587,7 @@ version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "cfg-if 1.0.0", "cfg_aliases", "libc", @@ -3757,7 +3755,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6f29f568bec459b0ddff777cec4fe3fd8666d82d5a40ebd0ff7e66134f89bcc" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "objc2", "objc2-core-graphics", "objc2-foundation", @@ -3769,7 +3767,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c10c2894a6fed806ade6027bcd50662746363a9589d3ec9d9bef30a4e4bc166" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "dispatch2", "objc2", ] @@ -3780,7 +3778,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "989c6c68c13021b5c2d6b71456ebb0f9dc78d752e86a98da7c716f4f9470f5a4" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "dispatch2", "objc2", "objc2-core-foundation", @@ -3799,7 +3797,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "900831247d2fe1a09a683278e5384cfb8c80c79fe6b166f9d14bfdde0ea1b03c" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "objc2", "objc2-core-foundation", ] @@ -3810,7 +3808,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7282e9ac92529fa3457ce90ebb15f4ecbc383e8338060960760fa2cf75420c3c" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "objc2", "objc2-core-foundation", ] @@ -3842,7 +3840,7 @@ version = "0.10.72" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fedfea7d58a1f73118430a55da6a286e7b044961736ce96a16a17068ea25e5da" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "cfg-if 1.0.0", "foreign-types", "libc", @@ -4100,9 +4098,9 @@ dependencies = [ [[package]] name = "owo-colors" -version = "4.2.0" +version = "4.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1036865bb9422d3300cf723f657c2851d0e9ab12567854b1f4eba3d77decf564" +checksum = "26995317201fa17f3656c36716aed4a7c81743a9634ac4c99c0eeda495db0cec" [[package]] name = "paranoid-android" @@ -4588,7 +4586,7 @@ version = "0.5.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "928fca9cf2aa042393a8325b9ead81d2f0df4cb12e1e24cef072922ccd99c5af" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", ] [[package]] @@ -4752,7 +4750,7 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "549b9d036d571d42e6e85d1c1425e2ac83491075078ca9a15be021c56b1641f2" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "fallible-iterator", "fallible-streaming-iterator", "hashlink 0.8.4", @@ -4801,7 +4799,7 @@ version = "0.38.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "errno", "libc", "linux-raw-sys 0.4.15", @@ -4814,7 +4812,7 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "errno", "libc", "linux-raw-sys 0.9.4", @@ -4994,7 +4992,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "core-foundation", "core-foundation-sys", "libc", @@ -7065,9 +7063,9 @@ dependencies = [ [[package]] name = "windows-core" -version = "0.61.0" +version = "0.61.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4763c1de310c86d75a878046489e2e5ba02c649d185f21c67d4cf8a56d098980" +checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" dependencies = [ "windows-implement", "windows-interface", @@ -7116,9 +7114,9 @@ dependencies = [ [[package]] name = "windows-result" -version = "0.3.2" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c64fd11a4fd95df68efcfee5f44a294fe71b8bc6a91993e2791938abcc712252" +checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" dependencies = [ "windows-link", ] @@ -7129,16 +7127,16 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "193cae8e647981c35bc947fdd57ba7928b1fa0d4a79305f6dd2dc55221ac35ac" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", "widestring", "windows-sys 0.59.0", ] [[package]] name = "windows-strings" -version = "0.4.0" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a2ba9642430ee452d5a7aa78d72907ebe8cfda358e8cb7918a2050581322f97" +checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" dependencies = [ "windows-link", ] @@ -7455,7 +7453,7 @@ version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.9.1", ] [[package]] @@ -7472,7 +7470,7 @@ checksum = "ed39ff9f8b2eda91bf6390f9f49eee93d655489e15708e3bb638c1c4f07cecb4" dependencies = [ "async-tungstenite 0.28.2", "async_io_stream", - "bitflags 2.9.0", + "bitflags 2.9.1", "futures-core", "futures-io", "futures-sink", diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index 439446b1..a170762e 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -94,7 +94,7 @@ thiserror = "1.0.69" # Data structures enumset = { version = "1.1.5", features = ["serde"] } -keyvaluedb = "0.1.2" +keyvaluedb = "0.1.3" range-set-blaze = "0.1.16" weak-table = "0.3.2" hashlink = { package = "veilid-hashlink", version = "0.1.1", features = [ @@ -180,7 +180,7 @@ futures-util = { version = "0.3.31", default-features = false, features = [ # Data structures keyring-manager = "0.5.1" -keyvaluedb-sqlite = "0.1.2" +keyvaluedb-sqlite = "0.1.3" # Network async-tungstenite = { version = "0.27.0", features = ["async-tls"] } @@ -213,7 +213,7 @@ send_wrapper = { version = "0.6.0", features = ["futures"] } serde_bytes = { version = "0.11", default-features = false, features = [ "alloc", ] } -tsify = { version = "0.5.5", features = ["js"] } +tsify = { version = "0.5.5", features = ["js"] } serde-wasm-bindgen = "0.6.5" # Network @@ -223,7 +223,7 @@ ws_stream_wasm = "0.7.4" wasm-logger = "0.2.0" # Data Structures -keyvaluedb-web = "0.1.2" +keyvaluedb-web = "0.1.3" ### Configuration for WASM32 'web-sys' crate [target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies.web-sys] diff --git a/veilid-core/src/component.rs b/veilid-core/src/component.rs index e7946dd7..3a005e0f 100644 --- a/veilid-core/src/component.rs +++ b/veilid-core/src/component.rs @@ -376,17 +376,17 @@ macro_rules! impl_subscribe_event_bus { pub(crate) use impl_subscribe_event_bus; -// macro_rules! impl_subscribe_event_bus_async { -// ($this:expr, $this_type:ty, $event_handler:ident ) => {{ -// let registry = $this.registry(); -// $this.event_bus().subscribe(move |evt| { -// let registry = registry.clone(); -// Box::pin(async move { -// let this = registry.lookup::<$this_type>().unwrap(); -// this.$event_handler(evt).await; -// }) -// }) -// }}; -// } +macro_rules! impl_subscribe_event_bus_async { + ($this:expr, $this_type:ty, $event_handler:ident ) => {{ + let registry = $this.registry(); + $this.event_bus().subscribe(move |evt| { + let registry = registry.clone(); + Box::pin(async move { + let this = registry.lookup::<$this_type>().unwrap(); + this.$event_handler(evt).await; + }) + }) + }}; +} -// pub(crate) use impl_subscribe_event_bus_async; +pub(crate) use impl_subscribe_event_bus_async; diff --git a/veilid-core/src/intf/native/protected_store.rs b/veilid-core/src/intf/native/protected_store.rs index 2ab97fc2..4b55f3f7 100644 --- a/veilid-core/src/intf/native/protected_store.rs +++ b/veilid-core/src/intf/native/protected_store.rs @@ -41,9 +41,9 @@ impl ProtectedStore { pub fn delete_all(&self) -> EyreResult<()> { for kpsk in &KNOWN_PROTECTED_STORE_KEYS { if let Err(e) = self.remove_user_secret(kpsk) { - error!("failed to delete '{}': {}", kpsk, e); + veilid_log!(self error "failed to delete '{}': {}", kpsk, e); } else { - veilid_log!(self debug "deleted table '{}'", kpsk); + veilid_log!(self debug "deleted protected store key '{}'", kpsk); } } Ok(()) @@ -97,7 +97,6 @@ impl ProtectedStore { ); } if inner.keyring_manager.is_none() { - veilid_log!(self error "QWERQWER"); bail!("Could not initialize the protected store."); } c.protected_store.delete diff --git a/veilid-core/src/intf/wasm/protected_store.rs b/veilid-core/src/intf/wasm/protected_store.rs index c41cafb5..f26cfe48 100644 --- a/veilid-core/src/intf/wasm/protected_store.rs +++ b/veilid-core/src/intf/wasm/protected_store.rs @@ -22,9 +22,9 @@ impl ProtectedStore { pub fn delete_all(&self) -> EyreResult<()> { for kpsk in &KNOWN_PROTECTED_STORE_KEYS { if let Err(e) = self.remove_user_secret(kpsk) { - error!("failed to delete '{}': {}", kpsk, e); + veilid_log!(self error "failed to delete protected store key '{}': {}", kpsk, e); } else { - veilid_log!(self debug "deleted table '{}'", kpsk); + veilid_log!(self debug "deleted protected store key '{}'", kpsk); } } Ok(()) @@ -32,6 +32,10 @@ impl ProtectedStore { #[instrument(level = "debug", skip(self), err)] async fn init_async(&self) -> EyreResult<()> { + if self.config().with(|c| c.protected_store.delete) { + self.delete_all()?; + } + Ok(()) } @@ -124,7 +128,6 @@ impl ProtectedStore { }; let vkey = self.browser_key_name(key.as_ref()); - ls.get_item(&vkey) .map_err(map_jsvalue_error) .wrap_err("exception_thrown") diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 3789186a..c96a27ea 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -1268,7 +1268,8 @@ impl NetworkManager { fn peer_info_change_event_handler(&self, evt: Arc) { let mut inner = self.inner.lock(); if let Some(address_check) = inner.address_check.as_mut() { - address_check.report_peer_info_change(evt.routing_domain, evt.opt_peer_info.clone()); + address_check + .report_peer_info_change(evt.routing_domain, evt.opt_new_peer_info.clone()); } } diff --git a/veilid-core/src/network_manager/types/events.rs b/veilid-core/src/network_manager/types/events.rs index 80dc2a51..30d016be 100644 --- a/veilid-core/src/network_manager/types/events.rs +++ b/veilid-core/src/network_manager/types/events.rs @@ -1,11 +1,6 @@ use super::*; -pub(crate) struct PeerInfoChangeEvent { - pub routing_domain: RoutingDomain, - pub opt_peer_info: Option>, -} - -pub(crate) struct SocketAddressChangeEvent { +pub struct SocketAddressChangeEvent { pub routing_domain: RoutingDomain, // the routing domain this flow is over pub socket_address: SocketAddress, // the socket address as seen by the remote peer pub old_socket_address: Option, // the socket address previously for this peer diff --git a/veilid-core/src/routing_table/routing_table_inner/routing_domains/local_network/mod.rs b/veilid-core/src/routing_table/routing_table_inner/routing_domains/local_network/mod.rs index 54d35019..3536b41c 100644 --- a/veilid-core/src/routing_table/routing_table_inner/routing_domains/local_network/mod.rs +++ b/veilid-core/src/routing_table/routing_table_inner/routing_domains/local_network/mod.rs @@ -134,7 +134,7 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail { } fn publish_peer_info(&self, rti: &RoutingTableInner) -> bool { - let opt_peer_info = { + let (opt_old_peer_info, opt_new_peer_info) = { let opt_new_peer_info = { let pi = self.get_peer_info(rti); @@ -156,7 +156,9 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail { // Don't publish if the peer info hasnt changed from our previous publication let mut ppi_lock = self.published_peer_info.lock(); - if let Some(old_peer_info) = &*ppi_lock { + let opt_old_peer_info = (*ppi_lock).clone(); + + if let Some(old_peer_info) = &opt_old_peer_info { if let Some(new_peer_info) = &opt_new_peer_info { if new_peer_info.equivalent(old_peer_info) { veilid_log!(rti debug "[LocalNetwork] Not publishing peer info because it is equivalent"); @@ -175,12 +177,13 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail { } *ppi_lock = opt_new_peer_info.clone(); - opt_new_peer_info + (opt_old_peer_info, opt_new_peer_info) }; if let Err(e) = rti.event_bus().post(PeerInfoChangeEvent { routing_domain: RoutingDomain::LocalNetwork, - opt_peer_info, + opt_old_peer_info, + opt_new_peer_info, }) { veilid_log!(rti debug "Failed to post event: {}", e); } diff --git a/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/mod.rs b/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/mod.rs index b641d72b..ae0a82bd 100644 --- a/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/mod.rs +++ b/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/mod.rs @@ -112,7 +112,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { } fn publish_peer_info(&self, rti: &RoutingTableInner) -> bool { - let opt_peer_info = { + let (opt_old_peer_info, opt_new_peer_info) = { let opt_new_peer_info = { let pi = self.get_peer_info(rti); @@ -134,7 +134,9 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { // Don't publish if the peer info hasnt changed from our previous publication let mut ppi_lock = self.published_peer_info.lock(); - if let Some(old_peer_info) = &*ppi_lock { + let opt_old_peer_info = (*ppi_lock).clone(); + + if let Some(old_peer_info) = &opt_old_peer_info { if let Some(new_peer_info) = &opt_new_peer_info { if new_peer_info.equivalent(old_peer_info) { veilid_log!(self debug "[PublicInternet] Not publishing peer info because it is equivalent"); @@ -154,12 +156,13 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { *ppi_lock = opt_new_peer_info.clone(); - opt_new_peer_info + (opt_old_peer_info, opt_new_peer_info) }; if let Err(e) = rti.event_bus().post(PeerInfoChangeEvent { routing_domain: RoutingDomain::PublicInternet, - opt_peer_info, + opt_old_peer_info, + opt_new_peer_info, }) { veilid_log!(self debug "Failed to post event: {}", e); } diff --git a/veilid-core/src/routing_table/types/events.rs b/veilid-core/src/routing_table/types/events.rs new file mode 100644 index 00000000..a24519c1 --- /dev/null +++ b/veilid-core/src/routing_table/types/events.rs @@ -0,0 +1,7 @@ +use super::*; + +pub struct PeerInfoChangeEvent { + pub routing_domain: RoutingDomain, + pub opt_old_peer_info: Option>, + pub opt_new_peer_info: Option>, +} diff --git a/veilid-core/src/routing_table/types/mod.rs b/veilid-core/src/routing_table/types/mod.rs index d195273a..ed854de2 100644 --- a/veilid-core/src/routing_table/types/mod.rs +++ b/veilid-core/src/routing_table/types/mod.rs @@ -1,6 +1,7 @@ mod contact_method; mod dial_info_detail; mod direction; +mod events; #[cfg(feature = "geolocation")] mod geolocation_info; mod node_info; @@ -16,6 +17,7 @@ use super::*; pub use contact_method::*; pub use dial_info_detail::*; pub use direction::*; +pub use events::*; #[cfg(feature = "geolocation")] pub use geolocation_info::*; pub use node_info::*; diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 2f8b88ed..e991f988 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -94,6 +94,8 @@ struct StorageManagerInner { pub metadata_db: Option, /// Background processing task (not part of attachment manager tick tree so it happens when detached too) pub tick_future: Option>, + /// PeerInfo subscription + peer_info_change_subscription: Option, } impl fmt::Debug for StorageManagerInner { @@ -107,6 +109,10 @@ impl fmt::Debug for StorageManagerInner { .field("active_subkey_writes", &self.active_subkey_writes) .field("rehydration_requests", &self.rehydration_requests) .field("outbound_watch_manager", &self.outbound_watch_manager) + .field( + "peer_info_change_subscription", + &self.peer_info_change_subscription, + ) //.field("metadata_db", &self.metadata_db) //.field("tick_future", &self.tick_future) .finish() @@ -137,6 +143,9 @@ pub(crate) struct StorageManager { // for offline subkey writes, watch changes, and any other // background operations the storage manager wants to perform background_operation_processor: DeferredStreamProcessor, + + // Online check + is_online: AtomicBool, } impl fmt::Debug for StorageManager { @@ -161,6 +170,7 @@ impl fmt::Debug for StorageManager { &self.background_operation_processor, ) .field("anonymous_watch_keys", &self.anonymous_watch_keys) + .field("is_online", &self.is_online) .finish() } } @@ -216,6 +226,7 @@ impl StorageManager { outbound_watch_lock_table: AsyncTagLockTable::new(), anonymous_watch_keys, background_operation_processor: DeferredStreamProcessor::new(), + is_online: AtomicBool::new(false), }; this.setup_tasks(); @@ -295,6 +306,10 @@ impl StorageManager { #[instrument(level = "trace", target = "tstore", skip_all)] async fn post_init_async(&self) -> EyreResult<()> { + // Register event handlers + let peer_info_change_subscription = + impl_subscribe_event_bus_async!(self, Self, peer_info_change_event_handler); + let mut inner = self.inner.lock().await; // Resolve outbound watch manager noderefs @@ -312,13 +327,14 @@ impl StorageManager { } }); inner.tick_future = Some(tick_future); + inner.peer_info_change_subscription = Some(peer_info_change_subscription); Ok(()) } #[instrument(level = "trace", target = "tstore", skip_all)] async fn pre_terminate_async(&self) { - // Stop the background ticker process + // Stop background operations { let mut inner = self.inner.lock().await; // Stop ticker @@ -326,6 +342,9 @@ impl StorageManager { if let Some(f) = tick_future { f.await; } + if let Some(sub) = inner.peer_info_change_subscription.take() { + self.event_bus().unsubscribe(sub); + } } // Cancel all tasks associated with the tick future @@ -427,17 +446,7 @@ impl StorageManager { } pub(super) fn dht_is_online(&self) -> bool { - // Check if we have published peer info - // Note, this is a best-effort check, subject to race conditions on the network's state - if self - .routing_table() - .get_published_peer_info(RoutingDomain::PublicInternet) - .is_none() - { - return false; - } - - true + self.is_online.load(Ordering::Acquire) } /// Get the set of nodes in our active watches @@ -1890,4 +1899,18 @@ impl StorageManager { self.background_operation_processor .add_stream(receiver.into_stream(), handler) } + + async fn peer_info_change_event_handler(&self, evt: Arc) { + // Note when we have come back online + if evt.routing_domain == RoutingDomain::PublicInternet { + if evt.opt_old_peer_info.is_none() && evt.opt_new_peer_info.is_some() { + self.is_online.store(true, Ordering::Release); + + // Trigger online updates + self.change_inspect_all_watches().await; + } else if evt.opt_old_peer_info.is_some() && evt.opt_new_peer_info.is_none() { + self.is_online.store(false, Ordering::Release); + } + } + } } diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 2beab044..b4f0e46a 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -1292,4 +1292,32 @@ impl StorageManager { Ok(NetworkResult::value(())) } + + /// Check all watches for changes + /// Used when we come back online from being offline and may have + /// missed some ValueChanged notifications + #[instrument(level = "trace", target = "watch", skip_all)] + pub async fn change_inspect_all_watches(&self) { + let mut inner = self.inner.lock().await; + + let mut change_inspects = vec![]; + for (record_key, outbound_watch) in &inner.outbound_watch_manager.outbound_watches { + if let Some(state) = outbound_watch.state() { + let reportable_subkeys = state.params().subkeys.clone(); + change_inspects.push((*record_key, reportable_subkeys)); + } + } + + if change_inspects.is_empty() { + return; + } + + veilid_log!(self debug "change inspecting {} watches", change_inspects.len()); + + for change_inspect in change_inspects { + inner + .outbound_watch_manager + .enqueue_change_inspect(change_inspect.0, change_inspect.1); + } + } } diff --git a/veilid-core/src/table_store/tests/test_table_store.rs b/veilid-core/src/table_store/tests/test_table_store.rs index 207b9e48..07c9b970 100644 --- a/veilid-core/src/table_store/tests/test_table_store.rs +++ b/veilid-core/src/table_store/tests/test_table_store.rs @@ -283,6 +283,7 @@ pub async fn test_store_load_json_many(ts: &TableStore) { let mut r = 0; let start_ts = Timestamp::now(); + let mut keys = HashSet::new(); loop { while r < rows && unord.len() < parallel { let key = format!("key_{}", r); @@ -290,6 +291,7 @@ pub async fn test_store_load_json_many(ts: &TableStore) { unord.push(Box::pin(async { let key = key; + db.store_json(0, key.as_bytes(), &value) .await .expect("should store"); @@ -299,12 +301,21 @@ pub async fn test_store_load_json_many(ts: &TableStore) { .expect("should load") .expect("should exist"); assert_eq!(value, value2); + + key.as_bytes().to_vec() })); } - if unord.next().await.is_none() { + if let Some(res) = unord.next().await { + keys.insert(res); + } else { break; } } + + let stored_keys = db.get_keys(0).await.expect("should get keys"); + let stored_keys_set = stored_keys.into_iter().collect::>(); + assert_eq!(stored_keys_set, keys, "should have same keys"); + let end_ts = Timestamp::now(); trace!("test_store_load_json_many duration={}", (end_ts - start_ts)); }