diff --git a/Cargo.lock b/Cargo.lock index 356bf366..507882b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2755,15 +2755,6 @@ dependencies = [ "value-bag", ] -[[package]] -name = "lru" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6e8aaa3f231bb4bd57b84b2d5dc3ae7f350265df8aa96492e0bc394a1571909" -dependencies = [ - "hashbrown", -] - [[package]] name = "lru-cache" version = "0.1.2" @@ -5505,7 +5496,6 @@ dependencies = [ "futures-util", "generic-array", "getrandom 0.2.8", - "hashbrown", "hashlink 0.8.1", "hex", "ifstructs", @@ -5515,11 +5505,11 @@ dependencies = [ "js-sys", "json", "keyring-manager", + "keyvaluedb", "keyvaluedb-sqlite", "keyvaluedb-web", "lazy_static", "libc", - "lru", "maplit", "ndk 0.6.0", "ndk-glue", diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index b59c6c7c..cb828dfc 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -58,7 +58,7 @@ digest = "0.9.0" rtnetlink = { version = "^0", default-features = false, optional = true } async-std-resolver = { version = "^0", optional = true } trust-dns-resolver = { version = "^0", optional = true } - +keyvaluedb = { path = "../external/keyvaluedb/keyvaluedb" } # Dependencies for native builds only # Linux, Windows, Mac, iOS, Android @@ -72,7 +72,6 @@ async-tungstenite = { version = "^0", features = ["async-tls"] } maplit = "^1" config = { version = "^0", features = ["yaml"] } keyring-manager = { path = "../external/keyring-manager" } -lru = "^0" async-tls = "^0.11" igd = { path = "../external/rust-igd" } webpki = "^0" @@ -96,8 +95,6 @@ nix = "^0" wasm-bindgen = "^0" js-sys = "^0" wasm-bindgen-futures = "^0" -hashbrown = "^0" -lru = {version = "^0", features = ["hashbrown"] } no-std-net = { path = "../external/no-std-net", features = ["serde"] } keyvaluedb-web = { path = "../external/keyvaluedb/keyvaluedb-web" } data-encoding = { version = "^2", default_features = false, features = ["alloc"] } diff --git a/veilid-core/src/crypto/mod.rs b/veilid-core/src/crypto/mod.rs index c7836445..4184c54b 100644 --- a/veilid-core/src/crypto/mod.rs +++ b/veilid-core/src/crypto/mod.rs @@ -119,12 +119,12 @@ impl Crypto { // load caches if they are valid for this node id let mut db = table_store.open("crypto_caches", 1).await?; - let caches_valid = match db.load(0, b"node_id").await? { + let caches_valid = match db.load(0, b"node_id")? { Some(v) => v.as_slice() == node_id.bytes, None => false, }; if caches_valid { - if let Some(b) = db.load(0, b"dh_cache").await? { + if let Some(b) = db.load(0, b"dh_cache")? { let mut inner = self.inner.lock(); bytes_to_cache(&b, &mut inner.dh_cache); } @@ -132,7 +132,7 @@ impl Crypto { drop(db); table_store.delete("crypto_caches").await?; db = table_store.open("crypto_caches", 1).await?; - db.store(0, b"node_id", &node_id.bytes).await?; + db.store(0, b"node_id", &node_id.bytes)?; } // Schedule flushing @@ -159,7 +159,7 @@ impl Crypto { }; let db = table_store.open("crypto_caches", 1).await?; - db.store(0, b"dh_cache", &cache_bytes).await?; + db.store(0, b"dh_cache", &cache_bytes)?; Ok(()) } diff --git a/veilid-core/src/intf/native/table_store.rs b/veilid-core/src/intf/native/table_store.rs index 041c4182..5d5ceb14 100644 --- a/veilid-core/src/intf/native/table_store.rs +++ b/veilid-core/src/intf/native/table_store.rs @@ -8,6 +8,8 @@ struct TableStoreInner { opened: BTreeMap>>, } +/// Veilid Table Storage +/// Database for storing key value pairs persistently across runs #[derive(Clone)] pub struct TableStore { config: VeilidConfig, @@ -20,31 +22,38 @@ impl TableStore { opened: BTreeMap::new(), } } - pub fn new(config: VeilidConfig) -> Self { + pub(crate) fn new(config: VeilidConfig) -> Self { Self { config, inner: Arc::new(Mutex::new(Self::new_inner())), } } - pub async fn delete_all(&self) -> EyreResult<()> { - // Delete all known keys - self.delete("crypto_caches").await?; + /// Delete all known tables + pub async fn delete_all(&self) { + if let Err(e) = self.delete("crypto_caches").await { + error!("failed to delete 'crypto_caches': {}", e); + } + if let Err(e) = self.delete("RouteSpecStore").await { + error!("failed to delete 'RouteSpecStore': {}", e); + } + if let Err(e) = self.delete("routing_table").await { + error!("failed to delete 'routing_table': {}", e); + } + } + + pub(crate) async fn init(&self) -> EyreResult<()> { Ok(()) } - pub async fn init(&self) -> EyreResult<()> { - Ok(()) - } - - pub async fn terminate(&self) { + pub(crate) async fn terminate(&self) { assert!( self.inner.lock().opened.is_empty(), "all open databases should have been closed" ); } - pub fn on_table_db_drop(&self, table: String) { + pub(crate) fn on_table_db_drop(&self, table: String) { let mut inner = self.inner.lock(); if inner.opened.remove(&table).is_none() { unreachable!("should have removed an item"); @@ -82,6 +91,8 @@ impl TableStore { }) } + /// Get or create a TableDB database table. If the column count is greater than an + /// existing TableDB's column count, the database will be upgraded to add the missing columns pub async fn open(&self, name: &str, column_count: u32) -> EyreResult { let table_name = self.get_table_name(name)?; @@ -121,6 +132,7 @@ impl TableStore { Ok(table_db) } + /// Delete a TableDB table by name pub async fn delete(&self, name: &str) -> EyreResult { let table_name = self.get_table_name(name)?; diff --git a/veilid-core/src/intf/table_db.rs b/veilid-core/src/intf/table_db.rs index d3bdc7d7..09fbe5a3 100644 --- a/veilid-core/src/intf/table_db.rs +++ b/veilid-core/src/intf/table_db.rs @@ -5,8 +5,10 @@ use serde::{Deserialize, Serialize}; cfg_if! { if #[cfg(target_arch = "wasm32")] { use keyvaluedb_web::*; + use keyvaluedb::*; } else { use keyvaluedb_sqlite::*; + use keyvaluedb::*; } } @@ -28,7 +30,7 @@ pub struct TableDB { } impl TableDB { - pub fn new(table: String, table_store: TableStore, database: Database) -> Self { + pub(super) fn new(table: String, table_store: TableStore, database: Database) -> Self { Self { inner: Arc::new(Mutex::new(TableDBInner { table, @@ -38,22 +40,24 @@ impl TableDB { } } - pub fn try_new_from_weak_inner(weak_inner: Weak>) -> Option { + pub(super) fn try_new_from_weak_inner(weak_inner: Weak>) -> Option { weak_inner.upgrade().map(|table_db_inner| Self { inner: table_db_inner, }) } - pub fn weak_inner(&self) -> Weak> { + pub(super) fn weak_inner(&self) -> Weak> { Arc::downgrade(&self.inner) } - pub async fn get_column_count(&self) -> EyreResult { + /// Get the total number of columns in the TableDB + pub fn get_column_count(&self) -> EyreResult { let db = &self.inner.lock().database; db.num_columns().wrap_err("failed to get column count: {}") } - pub async fn get_keys(&self, col: u32) -> EyreResult>> { + /// Get the list of keys in a column of the TableDB + pub fn get_keys(&self, col: u32) -> EyreResult>> { let db = &self.inner.lock().database; let mut out: Vec> = Vec::new(); db.iter(col, None, &mut |kv| { @@ -64,14 +68,25 @@ impl TableDB { Ok(out) } - pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> EyreResult<()> { + /// Start a TableDB write transaction. The transaction object must be committed or rolled back before dropping. + pub fn transact<'a>(&'a self) -> TableDBTransaction<'a> { + let dbt = { + let db = &self.inner.lock().database; + db.transaction() + }; + TableDBTransaction::new(self, dbt) + } + + /// Store a key with a value in a column in the TableDB. Performs a single transaction immediately. + pub fn store(&self, col: u32, key: &[u8], value: &[u8]) -> EyreResult<()> { let db = &self.inner.lock().database; let mut dbt = db.transaction(); dbt.put(col, key, value); db.write(dbt).wrap_err("failed to store key") } - pub async fn store_cbor(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()> + /// Store a key in CBOR format with a value in a column in the TableDB. Performs a single transaction immediately. + pub fn store_cbor(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()> where T: Serialize, { @@ -83,12 +98,14 @@ impl TableDB { db.write(dbt).wrap_err("failed to store key") } - pub async fn load(&self, col: u32, key: &[u8]) -> EyreResult>> { + /// Read a key from a column in the TableDB immediately. + pub fn load(&self, col: u32, key: &[u8]) -> EyreResult>> { let db = &self.inner.lock().database; db.get(col, key).wrap_err("failed to get key") } - pub async fn load_cbor(&self, col: u32, key: &[u8]) -> EyreResult> + /// Read a key from a column in the TableDB immediately, in CBOR format. + pub fn load_cbor(&self, col: u32, key: &[u8]) -> EyreResult> where T: for<'de> Deserialize<'de>, { @@ -104,7 +121,8 @@ impl TableDB { Ok(Some(obj)) } - pub async fn delete(&self, col: u32, key: &[u8]) -> EyreResult { + /// Delete key with from a column in the TableDB + pub fn delete(&self, col: u32, key: &[u8]) -> EyreResult { let db = &self.inner.lock().database; let found = db.get(col, key).wrap_err("failed to get key")?; match found { @@ -118,3 +136,66 @@ impl TableDB { } } } + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +/// A TableDB transaction +/// Atomically commits a group of writes or deletes to the TableDB +pub struct TableDBTransaction<'a> { + db: &'a TableDB, + dbt: Option, + _phantom: core::marker::PhantomData<&'a ()>, +} + +impl<'a> TableDBTransaction<'a> { + fn new(db: &'a TableDB, dbt: DBTransaction) -> Self { + Self { + db, + dbt: Some(dbt), + _phantom: Default::default(), + } + } + + /// Commit the transaction. Performs all actions atomically. + pub fn commit(mut self) -> EyreResult<()> { + self.db + .inner + .lock() + .database + .write(self.dbt.take().unwrap()) + .wrap_err("commit failed") + } + + /// Rollback the transaction. Does nothing to the TableDB. + pub fn rollback(mut self) { + self.dbt = None; + } + + /// Store a key with a value in a column in the TableDB + pub fn store(&mut self, col: u32, key: &[u8], value: &[u8]) { + self.dbt.as_mut().unwrap().put(col, key, value); + } + + /// Store a key in CBOR format with a value in a column in the TableDB + pub fn store_cbor(&mut self, col: u32, key: &[u8], value: &T) -> EyreResult<()> + where + T: Serialize, + { + let v = serde_cbor::to_vec(value).wrap_err("couldn't store as CBOR")?; + self.dbt.as_mut().unwrap().put(col, key, v.as_slice()); + Ok(()) + } + + /// Delete key with from a column in the TableDB + pub fn delete(&mut self, col: u32, key: &[u8]) { + self.dbt.as_mut().unwrap().delete(col, key); + } +} + +impl<'a> Drop for TableDBTransaction<'a> { + fn drop(&mut self) { + if self.dbt.is_some() { + warn!("Dropped transaction without commit or rollback"); + } + } +} diff --git a/veilid-core/src/intf/wasm/protected_store.rs b/veilid-core/src/intf/wasm/protected_store.rs index 54ba80fa..8dd3f427 100644 --- a/veilid-core/src/intf/wasm/protected_store.rs +++ b/veilid-core/src/intf/wasm/protected_store.rs @@ -3,20 +3,11 @@ use crate::xx::*; use crate::*; use data_encoding::BASE64URL_NOPAD; use js_sys::*; +use send_wrapper::*; +use serde::{Deserialize, Serialize}; use wasm_bindgen_futures::*; use web_sys::*; -#[wasm_bindgen] -extern "C" { - #[wasm_bindgen(catch, js_name = setPassword, js_namespace = ["global", "wasmhost", "keytar"])] - fn keytar_setPassword(service: &str, account: &str, password: &str) - -> Result; - #[wasm_bindgen(catch, js_name = getPassword, js_namespace = ["global", "wasmhost", "keytar"])] - fn keytar_getPassword(service: &str, account: &str) -> Result; - #[wasm_bindgen(catch, js_name = deletePassword, js_namespace = ["global", "wasmhost", "keytar"])] - fn keytar_deletePassword(service: &str, account: &str) -> Result; -} - #[derive(Clone)] pub struct ProtectedStore { config: VeilidConfig, @@ -71,33 +62,9 @@ impl ProtectedStore { } } - #[instrument(level = "trace", skip(self, value), ret, err)] + //#[instrument(level = "trace", skip(self, value), ret, err)] pub async fn save_user_secret_string(&self, key: &str, value: &str) -> EyreResult { - if is_nodejs() { - let prev = match JsFuture::from( - keytar_getPassword(self.keyring_name().as_str(), key) - .map_err(map_jsvalue_error) - .wrap_err("exception thrown")?, - ) - .await - { - Ok(v) => v.is_truthy(), - Err(_) => false, - }; - - match JsFuture::from( - keytar_setPassword(self.keyring_name().as_str(), key, value) - .map_err(map_jsvalue_error) - .wrap_err("exception thrown")?, - ) - .await - { - Ok(_) => {} - Err(_) => bail!("Failed to set password"), - } - - Ok(prev) - } else if is_browser() { + if is_browser() { let win = match window() { Some(w) => w, None => { @@ -139,24 +106,7 @@ impl ProtectedStore { #[instrument(level = "trace", skip(self), err)] pub async fn load_user_secret_string(&self, key: &str) -> EyreResult> { - if is_nodejs() { - let prev = match JsFuture::from( - keytar_getPassword(self.keyring_name().as_str(), key) - .map_err(map_jsvalue_error) - .wrap_err("exception thrown")?, - ) - .await - { - Ok(p) => p, - Err(_) => JsValue::UNDEFINED, - }; - - if prev.is_undefined() || prev.is_null() { - return Ok(None); - } - - Ok(prev.as_string()) - } else if is_browser() { + if is_browser() { let win = match window() { Some(w) => w, None => { @@ -252,18 +202,7 @@ impl ProtectedStore { #[instrument(level = "trace", skip(self), ret, err)] pub async fn remove_user_secret(&self, key: &str) -> EyreResult { - if is_nodejs() { - match JsFuture::from( - keytar_deletePassword(self.keyring_name().as_str(), key) - .map_err(map_jsvalue_error) - .wrap_err("exception thrown")?, - ) - .await - { - Ok(v) => Ok(v.is_truthy()), - Err(_) => bail!("Failed to delete"), - } - } else if is_browser() { + if is_browser() { let win = match window() { Some(w) => w, None => { diff --git a/veilid-core/src/intf/wasm/system.rs b/veilid-core/src/intf/wasm/system.rs index f30f8a81..00158ad8 100644 --- a/veilid-core/src/intf/wasm/system.rs +++ b/veilid-core/src/intf/wasm/system.rs @@ -19,10 +19,8 @@ extern "C" { pub fn get_timestamp() -> u64 { if utils::is_browser() { return (Date::now() * 1000.0f64) as u64; - } else if utils::is_nodejs() { - return (Date::now() * 1000.0f64) as u64; } else { - panic!("WASM requires browser or nodejs environment"); + panic!("WASM requires browser environment"); } } @@ -85,18 +83,22 @@ pub fn spawn(future: impl Future + Send + 'static) -> MustJoi where Out: Send + 'static, { - MustJoinHandle::new(Bindgen - .spawn_handle(future) - .expect("wasm-bindgen-futures spawn should never error out")) + MustJoinHandle::new( + Bindgen + .spawn_handle(future) + .expect("wasm-bindgen-futures spawn should never error out"), + ) } pub fn spawn_local(future: impl Future + 'static) -> MustJoinHandle where Out: 'static, { - MustJoinHandle::new(Bindgen - .spawn_handle_local(future) - .expect("wasm-bindgen-futures spawn_local should never error out")) + MustJoinHandle::new( + Bindgen + .spawn_handle_local(future) + .expect("wasm-bindgen-futures spawn_local should never error out"), + ) } // pub fn spawn_with_local_set( @@ -114,10 +116,10 @@ where { Bindgen .spawn_handle_local(future) - .expect("wasm-bindgen-futures spawn_local should never error out").detach() + .expect("wasm-bindgen-futures spawn_local should never error out") + .detach() } - pub fn interval(freq_ms: u32, callback: F) -> SendPinBoxFuture<()> where F: Fn() -> FUT + Send + Sync + 'static, @@ -160,12 +162,12 @@ pub async fn get_outbound_relay_peer() -> Option { // pub async fn get_pwa_web_server_config() -> { // if utils::is_browser() { - + // let win = window().unwrap(); // let doc = win.document().unwrap(); // let html_document = document.dyn_into::().unwrap(); // let cookie = html_document.cookie().unwrap(); - + // // let wait_millis = if millis > u32::MAX { // // i32::MAX // // } else { @@ -177,22 +179,14 @@ pub async fn get_outbound_relay_peer() -> Option { // // .unwrap(); // // }); -// // JsFuture::from(promise).await.unwrap(); -// } else if utils::is_nodejs() { -// // let promise = Promise::new(&mut |yes, _| { -// // nodejs_global_set_timeout_with_callback_and_timeout_and_arguments_0(&yes, millis) -// // .unwrap(); -// // }); - // // JsFuture::from(promise).await.unwrap(); // } else { -// panic!("WASM requires browser or nodejs environment"); -// } +// panic!("WASM requires browser environment"); +// } // } - pub async fn txt_lookup>(_host: S) -> EyreResult> { - bail!("wasm does not support txt lookup") + bail!("wasm does not support txt lookup") } pub async fn ptr_lookup(_ip_addr: IpAddr) -> EyreResult { diff --git a/veilid-core/src/intf/wasm/table_store.rs b/veilid-core/src/intf/wasm/table_store.rs index 2e0aaa03..b49481ff 100644 --- a/veilid-core/src/intf/wasm/table_store.rs +++ b/veilid-core/src/intf/wasm/table_store.rs @@ -22,7 +22,7 @@ impl TableStore { opened: BTreeMap::new(), } } - pub fn new(config: VeilidConfig) -> Self { + pub(crate) fn new(config: VeilidConfig) -> Self { Self { config, inner: Arc::new(Mutex::new(Self::new_inner())), @@ -30,12 +30,25 @@ impl TableStore { } } - pub async fn init(&self) -> EyreResult<()> { + /// Delete all known tables + pub async fn delete_all(&self) { + if let Err(e) = self.delete("crypto_caches").await { + error!("failed to delete 'crypto_caches': {}", e); + } + if let Err(e) = self.delete("RouteSpecStore").await { + error!("failed to delete 'RouteSpecStore': {}", e); + } + if let Err(e) = self.delete("routing_table").await { + error!("failed to delete 'routing_table': {}", e); + } + } + + pub(crate) async fn init(&self) -> EyreResult<()> { let _async_guard = self.async_lock.lock().await; Ok(()) } - pub async fn terminate(&self) { + pub(crate) async fn terminate(&self) { let _async_guard = self.async_lock.lock().await; assert!( self.inner.lock().opened.len() == 0, @@ -43,7 +56,7 @@ impl TableStore { ); } - pub fn on_table_db_drop(&self, table: String) { + pub(crate) fn on_table_db_drop(&self, table: String) { let mut inner = self.inner.lock(); match inner.opened.remove(&table) { Some(_) => (), @@ -69,12 +82,14 @@ impl TableStore { }) } + /// Get or create a TableDB database table. If the column count is greater than an + /// existing TableDB's column count, the database will be upgraded to add the missing columns pub async fn open(&self, name: &str, column_count: u32) -> EyreResult { let _async_guard = self.async_lock.lock().await; let table_name = self.get_table_name(name)?; { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock(); if let Some(table_db_weak_inner) = inner.opened.get(&table_name) { match TableDB::try_new_from_weak_inner(table_db_weak_inner.clone()) { Some(tdb) => { @@ -89,7 +104,10 @@ impl TableStore { let db = Database::open(table_name.clone(), column_count) .await .wrap_err("failed to open tabledb")?; - info!("opened table store '{}' with table name '{:?}' with {} columns", name, table_name, column_count); + info!( + "opened table store '{}' with table name '{:?}' with {} columns", + name, table_name, column_count + ); let table_db = TableDB::new(table_name.clone(), self.clone(), db); @@ -101,11 +119,12 @@ impl TableStore { Ok(table_db) } + /// Delete a TableDB table by name pub async fn delete(&self, name: &str) -> EyreResult { let _async_guard = self.async_lock.lock().await; trace!("TableStore::delete {}", name); let table_name = self.get_table_name(name)?; - + { let inner = self.inner.lock(); if inner.opened.contains_key(&table_name) { @@ -117,9 +136,7 @@ impl TableStore { } } - if utils::is_nodejs() { - unimplemented!(); - } else if utils::is_browser() { + if utils::is_browser() { let out = match Database::delete(table_name.clone()).await { Ok(_) => true, Err(_) => false, diff --git a/veilid-core/src/intf/wasm/utils/mod.rs b/veilid-core/src/intf/wasm/utils/mod.rs index 5a4f853f..fcec19f7 100644 --- a/veilid-core/src/intf/wasm/utils/mod.rs +++ b/veilid-core/src/intf/wasm/utils/mod.rs @@ -15,21 +15,6 @@ extern "C" { pub fn alert(s: &str); } -pub fn is_nodejs() -> bool { - static CACHE: AtomicI8 = AtomicI8::new(-1); - let cache = CACHE.load(Ordering::Relaxed); - if cache != -1 { - return cache != 0; - } - - let res = js_sys::eval("process.release.name === 'node'") - .map(|res| res.is_truthy()) - .unwrap_or_default(); - - CACHE.store(res as i8, Ordering::Relaxed); - res -} - pub fn is_browser() -> bool { static CACHE: AtomicI8 = AtomicI8::new(-1); let cache = CACHE.load(Ordering::Relaxed); @@ -60,24 +45,6 @@ pub fn is_browser() -> bool { // res // } -// pub fn node_require(module: &str) -> JsValue { -// if !is_nodejs() { -// return JsValue::UNDEFINED; -// } - -// let mut home = env!("CARGO_MANIFEST_DIR"); -// if home.len() == 0 { -// home = "."; -// } - -// match js_sys::eval(format!("require(\"{}/{}\")", home, module).as_str()) { -// Ok(v) => v, -// Err(e) => { -// panic!("node_require failed: {:?}", e); -// } -// } -// } - #[derive(ThisError, Debug, Clone, Eq, PartialEq)] #[error("JsValue error")] pub struct JsValueError(String); diff --git a/veilid-core/src/routing_table/bucket.rs b/veilid-core/src/routing_table/bucket.rs index 48199bfc..7ffee2f7 100644 --- a/veilid-core/src/routing_table/bucket.rs +++ b/veilid-core/src/routing_table/bucket.rs @@ -8,6 +8,8 @@ pub struct Bucket { } pub(super) type EntriesIter<'a> = alloc::collections::btree_map::Iter<'a, DHTKey, Arc>; +type BucketData = (Vec<(DHTKey, Vec)>, Option); + fn state_ordering(state: BucketEntryState) -> usize { match state { BucketEntryState::Dead => 0, @@ -25,6 +27,32 @@ impl Bucket { } } + pub(super) fn load_bucket(&mut self, data: &[u8]) -> EyreResult<()> { + let bucket_data: BucketData = + serde_cbor::from_slice::(data).wrap_err("failed to deserialize bucket")?; + + for (k, d) in bucket_data.0 { + let entryinner = serde_cbor::from_slice::(&d) + .wrap_err("failed to deserialize bucket entry")?; + self.entries + .insert(k, Arc::new(BucketEntry::new_with_inner(entryinner))); + } + + self.newest_entry = bucket_data.1; + + Ok(()) + } + pub(super) fn save_bucket(&self) -> EyreResult> { + let mut entry_vec = Vec::new(); + for (k, v) in &self.entries { + let entry_bytes = v.with_mut_inner(|e| serde_cbor::to_vec(e))?; + entry_vec.push((*k, entry_bytes)); + } + let bucket_data: BucketData = (entry_vec, self.newest_entry.clone()); + let out = serde_cbor::to_vec(&bucket_data)?; + Ok(out) + } + pub(super) fn add_entry(&mut self, node_id: DHTKey) -> NodeRef { log_rtab!("Node added: {}", node_id.encode()); diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 6bad7999..422e030a 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -1,5 +1,6 @@ use super::*; use core::sync::atomic::{AtomicU32, Ordering}; +use serde::{Deserialize, Serialize}; /// Reliable pings are done with increased spacing between pings @@ -42,7 +43,7 @@ pub enum BucketEntryState { struct LastConnectionKey(ProtocolType, AddressType); /// Bucket entry information specific to the LocalNetwork RoutingDomain -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct BucketEntryPublicInternet { /// The PublicInternet node info signed_node_info: Option>, @@ -53,7 +54,7 @@ pub struct BucketEntryPublicInternet { } /// Bucket entry information specific to the LocalNetwork RoutingDomain -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct BucketEntryLocalNetwork { /// The LocalNetwork node info signed_node_info: Option>, @@ -63,19 +64,24 @@ pub struct BucketEntryLocalNetwork { node_status: Option, } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct BucketEntryInner { min_max_version: Option<(u8, u8)>, updated_since_last_network_change: bool, + #[serde(skip)] last_connections: BTreeMap, public_internet: BucketEntryPublicInternet, local_network: BucketEntryLocalNetwork, peer_stats: PeerStats, + #[serde(skip)] latency_stats_accounting: LatencyStatsAccounting, + #[serde(skip)] transfer_stats_accounting: TransferStatsAccounting, #[cfg(feature = "tracking")] + #[serde(skip)] next_track_id: usize, #[cfg(feature = "tracking")] + #[serde(skip)] node_ref_tracks: HashMap, } @@ -657,6 +663,13 @@ impl BucketEntry { } } + pub(super) fn new_with_inner(inner: BucketEntryInner) -> Self { + Self { + ref_count: AtomicU32::new(0), + inner: RwLock::new(inner), + } + } + // Note, that this requires -also- holding the RoutingTable read lock, as an // immutable reference to RoutingTableInner must be passed in to get this // This ensures that an operation on the routing table can not change entries diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index b1d5c4fe..3ab5d8d5 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -154,6 +154,20 @@ impl RoutingTable { pub async fn init(&self) -> EyreResult<()> { debug!("starting routing table init"); + // Set up routing buckets + { + let mut inner = self.inner.write(); + inner.init_buckets(self.clone()); + } + + // Load bucket entries from table db if possible + debug!("loading routing table entries"); + if let Err(e) = self.load_buckets().await { + log_rtab!(warn "Error loading buckets from storage: {}. Resetting.", e); + let mut inner = self.inner.write(); + inner.init_buckets(self.clone()); + } + // Set up routespecstore debug!("starting route spec store init"); let route_spec_store = match RouteSpecStore::load(self.clone()).await { @@ -165,10 +179,10 @@ impl RoutingTable { }; debug!("finished route spec store init"); - let mut inner = self.inner.write(); - inner.init(self.clone())?; - - inner.route_spec_store = Some(route_spec_store); + { + let mut inner = self.inner.write(); + inner.route_spec_store = Some(route_spec_store); + } debug!("finished routing table init"); Ok(()) @@ -188,6 +202,12 @@ impl RoutingTable { error!("kick_buckets_task not stopped: {}", e); } + // Load bucket entries from table db if possible + debug!("saving routing table entries"); + if let Err(e) = self.save_buckets().await { + error!("failed to save routing table entries: {}", e); + } + debug!("saving route spec store"); let rss = { let mut inner = self.inner.write(); @@ -201,12 +221,67 @@ impl RoutingTable { debug!("shutting down routing table"); let mut inner = self.inner.write(); - inner.terminate(); *inner = RoutingTableInner::new(self.unlocked_inner.clone()); debug!("finished routing table terminate"); } + async fn save_buckets(&self) -> EyreResult<()> { + // Serialize all entries + let mut bucketvec: Vec> = Vec::new(); + { + let inner = &*self.inner.read(); + for bucket in &inner.buckets { + bucketvec.push(bucket.save_bucket()?) + } + } + let table_store = self.network_manager().table_store(); + let tdb = table_store.open("routing_table", 1).await?; + let bucket_count = bucketvec.len(); + let mut dbx = tdb.transact(); + if let Err(e) = dbx.store_cbor(0, b"bucket_count", &bucket_count) { + dbx.rollback(); + return Err(e); + } + + for (n, b) in bucketvec.iter().enumerate() { + dbx.store(0, format!("bucket_{}", n).as_bytes(), b) + } + dbx.commit()?; + Ok(()) + } + + async fn load_buckets(&self) -> EyreResult<()> { + // Deserialize all entries + let inner = &mut *self.inner.write(); + + let tstore = self.network_manager().table_store(); + let tdb = tstore.open("routing_table", 1).await?; + let Some(bucket_count): Option = tdb.load_cbor(0, b"bucket_count")? else { + log_rtab!(debug "no bucket count in saved routing table"); + return Ok(()); + }; + if bucket_count != inner.buckets.len() { + // Must have the same number of buckets + warn!("bucket count is different, not loading routing table"); + return Ok(()); + } + let mut bucketdata_vec: Vec> = Vec::new(); + for n in 0..bucket_count { + let Some(bucketdata): Option> = + tdb.load(0, format!("bucket_{}", n).as_bytes())? else { + warn!("bucket data not loading, skipping loading routing table"); + return Ok(()); + }; + bucketdata_vec.push(bucketdata); + } + for n in 0..bucket_count { + inner.buckets[n].load_bucket(&bucketdata_vec[n])?; + } + + Ok(()) + } + /// Set up the local network routing domain with our local routing table configuration pub fn configure_local_network_routing_domain(&self, local_networks: Vec<(IpAddr, IpAddr)>) { log_net!(debug "configure_local_network_routing_domain: {:#?}", local_networks); diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index e0345437..75d4f586 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -225,7 +225,23 @@ impl RouteSpecStore { let table_store = routing_table.network_manager().table_store(); let rsstdb = table_store.open("RouteSpecStore", 1).await?; let mut content: RouteSpecStoreContent = - rsstdb.load_cbor(0, b"content").await?.unwrap_or_default(); + rsstdb.load_cbor(0, b"content")?.unwrap_or_default(); + + // Look up all route hop noderefs since we can't serialize those + let mut dead_keys = Vec::new(); + for (k, rsd) in &mut content.details { + for h in &rsd.hops { + let Some(nr) = routing_table.lookup_node_ref(*h) else { + dead_keys.push(*k); + break; + }; + rsd.hop_node_refs.push(nr); + } + } + for k in dead_keys { + log_rtab!(debug "no entry, killing off private route: {}", k.encode()); + content.details.remove(&k); + } // Load secrets from pstore let pstore = routing_table.network_manager().protected_store(); @@ -280,7 +296,7 @@ impl RouteSpecStore { .network_manager() .table_store(); let rsstdb = table_store.open("RouteSpecStore", 1).await?; - rsstdb.store_cbor(0, b"content", &content).await?; + rsstdb.store_cbor(0, b"content", &content)?; // // Keep secrets in protected store as well let pstore = self @@ -1168,10 +1184,9 @@ impl RouteSpecStore { let mut buffer = vec![]; capnp::serialize_packed::write_message(&mut buffer, &pr_message) + .map_err(RPCError::internal) .wrap_err("failed to convert builder to vec")?; Ok(buffer) - - // builder_to_vec(pr_message).wrap_err("failed to convert builder to vec") } /// Convert binary blob to private route @@ -1180,11 +1195,12 @@ impl RouteSpecStore { blob.as_slice(), capnp::message::ReaderOptions::new(), ) + .map_err(RPCError::internal) .wrap_err("failed to make message reader")?; - //let reader = ::capnp::message::Reader::new(RPCMessageData::new(blob), Default::default()); let pr_reader = reader .get_root::() + .map_err(RPCError::internal) .wrap_err("failed to make reader for private_route")?; decode_private_route(&pr_reader).wrap_err("failed to decode private route") } diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index cab84df6..6435cd1e 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -323,19 +323,16 @@ impl RoutingTableInner { } } - pub fn init(&mut self, routing_table: RoutingTable) -> EyreResult<()> { + pub fn init_buckets(&mut self, routing_table: RoutingTable) { // Size the buckets (one per bit) + self.buckets.clear(); self.buckets.reserve(DHT_KEY_LENGTH * 8); for _ in 0..DHT_KEY_LENGTH * 8 { let bucket = Bucket::new(routing_table.clone()); self.buckets.push(bucket); } - - Ok(()) } - pub fn terminate(&mut self) {} - pub fn configure_local_network_routing_domain( &mut self, local_networks: Vec<(IpAddr, IpAddr)>, diff --git a/veilid-core/src/tests/common/test_table_store.rs b/veilid-core/src/tests/common/test_table_store.rs index 9a4b73a9..f10f322e 100644 --- a/veilid-core/src/tests/common/test_table_store.rs +++ b/veilid-core/src/tests/common/test_table_store.rs @@ -59,78 +59,67 @@ pub async fn test_store_delete_load(ts: TableStore) { ); assert_eq!( - db.load(0, b"foo").await.unwrap(), + db.load(0, b"foo").unwrap(), None, "should not load missing key" ); assert!( - db.store(1, b"foo", b"1234567890").await.is_ok(), + db.store(1, b"foo", b"1234567890").is_ok(), "should store new key" ); assert_eq!( - db.load(0, b"foo").await.unwrap(), + db.load(0, b"foo").unwrap(), None, "should not load missing key" ); - assert_eq!( - db.load(1, b"foo").await.unwrap(), - Some(b"1234567890".to_vec()) - ); + assert_eq!(db.load(1, b"foo").unwrap(), Some(b"1234567890".to_vec())); assert!( - db.store(1, b"bar", b"FNORD").await.is_ok(), + db.store(1, b"bar", b"FNORD").is_ok(), "should store new key" ); assert!( - db.store(0, b"bar", b"ABCDEFGHIJKLMNOPQRSTUVWXYZ") - .await - .is_ok(), + db.store(0, b"bar", b"ABCDEFGHIJKLMNOPQRSTUVWXYZ").is_ok(), "should store new key" ); assert!( - db.store(2, b"bar", b"FNORD").await.is_ok(), + db.store(2, b"bar", b"FNORD").is_ok(), "should store new key" ); assert!( - db.store(2, b"baz", b"QWERTY").await.is_ok(), + db.store(2, b"baz", b"QWERTY").is_ok(), "should store new key" ); assert!( - db.store(2, b"bar", b"QWERTYUIOP").await.is_ok(), + db.store(2, b"bar", b"QWERTYUIOP").is_ok(), "should store new key" ); - assert_eq!(db.load(1, b"bar").await.unwrap(), Some(b"FNORD".to_vec())); + assert_eq!(db.load(1, b"bar").unwrap(), Some(b"FNORD".to_vec())); assert_eq!( - db.load(0, b"bar").await.unwrap(), + db.load(0, b"bar").unwrap(), Some(b"ABCDEFGHIJKLMNOPQRSTUVWXYZ".to_vec()) ); - assert_eq!( - db.load(2, b"bar").await.unwrap(), - Some(b"QWERTYUIOP".to_vec()) - ); - assert_eq!(db.load(2, b"baz").await.unwrap(), Some(b"QWERTY".to_vec())); + assert_eq!(db.load(2, b"bar").unwrap(), Some(b"QWERTYUIOP".to_vec())); + assert_eq!(db.load(2, b"baz").unwrap(), Some(b"QWERTY".to_vec())); - assert_eq!(db.delete(1, b"bar").await.unwrap(), true); - assert_eq!(db.delete(1, b"bar").await.unwrap(), false); + assert_eq!(db.delete(1, b"bar").unwrap(), true); + assert_eq!(db.delete(1, b"bar").unwrap(), false); assert!( - db.delete(4, b"bar").await.is_err(), + db.delete(4, b"bar").is_err(), "can't delete from column that doesn't exist" ); drop(db); let db = ts.open("test", 3).await.expect("should have opened"); - assert_eq!(db.load(1, b"bar").await.unwrap(), None); + assert_eq!(db.load(1, b"bar").unwrap(), None); assert_eq!( - db.load(0, b"bar").await.unwrap(), + db.load(0, b"bar").unwrap(), Some(b"ABCDEFGHIJKLMNOPQRSTUVWXYZ".to_vec()) ); - assert_eq!( - db.load(2, b"bar").await.unwrap(), - Some(b"QWERTYUIOP".to_vec()) - ); - assert_eq!(db.load(2, b"baz").await.unwrap(), Some(b"QWERTY".to_vec())); + assert_eq!(db.load(2, b"bar").unwrap(), Some(b"QWERTYUIOP".to_vec())); + assert_eq!(db.load(2, b"baz").unwrap(), Some(b"QWERTY".to_vec())); } pub async fn test_cbor(ts: TableStore) { @@ -140,11 +129,11 @@ pub async fn test_cbor(ts: TableStore) { let db = ts.open("test", 3).await.expect("should have opened"); let (dht_key, _) = generate_secret(); - assert!(db.store_cbor(0, b"asdf", &dht_key).await.is_ok()); + assert!(db.store_cbor(0, b"asdf", &dht_key).is_ok()); - assert_eq!(db.load_cbor::(0, b"qwer").await.unwrap(), None); + assert_eq!(db.load_cbor::(0, b"qwer").unwrap(), None); - let d = match db.load_cbor::(0, b"asdf").await { + let d = match db.load_cbor::(0, b"asdf") { Ok(x) => x, Err(e) => { panic!("couldn't decode cbor: {}", e); @@ -153,12 +142,12 @@ pub async fn test_cbor(ts: TableStore) { assert_eq!(d, Some(dht_key), "keys should be equal"); assert!( - db.store(1, b"foo", b"1234567890").await.is_ok(), + db.store(1, b"foo", b"1234567890").is_ok(), "should store new key" ); assert!( - db.load_cbor::(1, b"foo").await.is_err(), + db.load_cbor::(1, b"foo").is_err(), "should fail to load cbor" ); } diff --git a/veilid-core/src/tests/common/test_veilid_config.rs b/veilid-core/src/tests/common/test_veilid_config.rs index c9f52197..423d93fa 100644 --- a/veilid-core/src/tests/common/test_veilid_config.rs +++ b/veilid-core/src/tests/common/test_veilid_config.rs @@ -203,11 +203,11 @@ fn config_callback(key: String) -> ConfigCallbackReturn { "network.routing_table.limit_attached_good" => Ok(Box::new(8u32)), "network.routing_table.limit_attached_weak" => Ok(Box::new(4u32)), "network.rpc.concurrency" => Ok(Box::new(2u32)), - "network.rpc.queue_size" => Ok(Box::new(128u32)), + "network.rpc.queue_size" => Ok(Box::new(1024u32)), "network.rpc.max_timestamp_behind_ms" => Ok(Box::new(Some(10_000u32))), "network.rpc.max_timestamp_ahead_ms" => Ok(Box::new(Some(10_000u32))), "network.rpc.timeout_ms" => Ok(Box::new(10_000u32)), - "network.rpc.max_route_hop_count" => Ok(Box::new(7u8)), + "network.rpc.max_route_hop_count" => Ok(Box::new(4u8)), "network.rpc.default_route_hop_count" => Ok(Box::new(2u8)), "network.dht.resolve_node_timeout_ms" => Ok(Box::new(Option::::None)), "network.dht.resolve_node_count" => Ok(Box::new(20u32)), @@ -323,9 +323,9 @@ pub async fn test_config() { assert_eq!(inner.network.bootstrap, Vec::::new()); assert_eq!(inner.network.bootstrap_nodes, Vec::::new()); assert_eq!(inner.network.rpc.concurrency, 2u32); - assert_eq!(inner.network.rpc.queue_size, 128u32); + assert_eq!(inner.network.rpc.queue_size, 1024u32); assert_eq!(inner.network.rpc.timeout_ms, 10_000u32); - assert_eq!(inner.network.rpc.max_route_hop_count, 7u8); + assert_eq!(inner.network.rpc.max_route_hop_count, 4u8); assert_eq!(inner.network.rpc.default_route_hop_count, 2u8); assert_eq!(inner.network.routing_table.limit_over_attached, 64u32); assert_eq!(inner.network.routing_table.limit_fully_attached, 32u32); diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index ee00b36e..9bc78d7d 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -55,10 +55,7 @@ fn get_route_id(rss: RouteSpecStore) -> impl Fn(&str) -> Option { } fn get_safety_selection(text: &str, rss: RouteSpecStore) -> Option { - if text.len() == 0 { - return None; - } - if &text[0..1] == "-" { + if text.len() != 0 && &text[0..1] == "-" { // Unsafe let text = &text[1..]; let seq = get_sequencing(text).unwrap_or(Sequencing::NoPreference); diff --git a/veilid-core/src/xx/mod.rs b/veilid-core/src/xx/mod.rs index d9f48ffd..a4c8e008 100644 --- a/veilid-core/src/xx/mod.rs +++ b/veilid-core/src/xx/mod.rs @@ -41,61 +41,42 @@ pub type PinBoxFutureLifetime<'a, T> = PinBox + 'a>; pub type SendPinBoxFuture = PinBox + Send + 'static>; pub type SendPinBoxFutureLifetime<'a, T> = PinBox + Send + 'a>; +pub use std::borrow::{Cow, ToOwned}; +pub use std::boxed::Box; +pub use std::cell::RefCell; +pub use std::cmp; +pub use std::collections::btree_map::BTreeMap; +pub use std::collections::btree_set::BTreeSet; +pub use std::collections::hash_map::HashMap; +pub use std::collections::hash_set::HashSet; +pub use std::collections::LinkedList; +pub use std::collections::VecDeque; +pub use std::convert::{TryFrom, TryInto}; +pub use std::fmt; +pub use std::future::Future; +pub use std::mem; +pub use std::ops::{Fn, FnMut, FnOnce}; +pub use std::pin::Pin; +pub use std::rc::Rc; +pub use std::string::String; +pub use std::sync::atomic::{AtomicBool, Ordering}; +pub use std::sync::{Arc, Weak}; +pub use std::task; +pub use std::time::Duration; +pub use std::vec::Vec; + cfg_if! { if #[cfg(target_arch = "wasm32")] { - pub use alloc::string::String; - pub use alloc::vec::Vec; - pub use alloc::collections::LinkedList; - pub use alloc::collections::VecDeque; - pub use alloc::collections::btree_map::BTreeMap; - pub use alloc::collections::btree_set::BTreeSet; - pub use hashbrown::hash_map::HashMap; - pub use hashbrown::hash_set::HashSet; - pub use alloc::boxed::Box; - pub use alloc::borrow::{Cow, ToOwned}; pub use wasm_bindgen::prelude::*; - pub use core::cmp; - pub use core::convert::{TryFrom, TryInto}; - pub use core::mem; - pub use core::fmt; - pub use alloc::rc::Rc; - pub use core::cell::RefCell; - pub use core::task; - pub use core::future::Future; - pub use core::time::Duration; - pub use core::pin::Pin; - pub use core::sync::atomic::{Ordering, AtomicBool}; - pub use alloc::sync::{Arc, Weak}; - pub use core::ops::{FnOnce, FnMut, Fn}; + pub use async_lock::Mutex as AsyncMutex; pub use async_lock::MutexGuard as AsyncMutexGuard; pub use async_lock::MutexGuardArc as AsyncMutexGuardArc; - pub use no_std_net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr }; pub use async_executors::JoinHandle as LowLevelJoinHandle; + + pub use no_std_net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr }; } else { - pub use std::string::String; - pub use std::vec::Vec; - pub use std::collections::LinkedList; - pub use std::collections::VecDeque; - pub use std::collections::btree_map::BTreeMap; - pub use std::collections::btree_set::BTreeSet; - pub use std::collections::hash_map::HashMap; - pub use std::collections::hash_set::HashSet; - pub use std::boxed::Box; - pub use std::borrow::{Cow, ToOwned}; - pub use std::cmp; - pub use std::convert::{TryFrom, TryInto}; - pub use std::mem; - pub use std::fmt; - pub use std::sync::atomic::{Ordering, AtomicBool}; - pub use std::sync::{Arc, Weak}; - pub use std::rc::Rc; - pub use std::cell::RefCell; - pub use std::task; - pub use std::future::Future; - pub use std::time::Duration; - pub use std::pin::Pin; - pub use std::ops::{FnOnce, FnMut, Fn}; + cfg_if! { if #[cfg(feature="rt-async-std")] { pub use async_std::sync::Mutex as AsyncMutex;