remove nodejs support

This commit is contained in:
John Smith 2022-11-06 16:07:56 -05:00
parent 0e7f3e1c3c
commit a54da97393
18 changed files with 375 additions and 282 deletions

12
Cargo.lock generated
View File

@ -2755,15 +2755,6 @@ dependencies = [
"value-bag",
]
[[package]]
name = "lru"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6e8aaa3f231bb4bd57b84b2d5dc3ae7f350265df8aa96492e0bc394a1571909"
dependencies = [
"hashbrown",
]
[[package]]
name = "lru-cache"
version = "0.1.2"
@ -5505,7 +5496,6 @@ dependencies = [
"futures-util",
"generic-array",
"getrandom 0.2.8",
"hashbrown",
"hashlink 0.8.1",
"hex",
"ifstructs",
@ -5515,11 +5505,11 @@ dependencies = [
"js-sys",
"json",
"keyring-manager",
"keyvaluedb",
"keyvaluedb-sqlite",
"keyvaluedb-web",
"lazy_static",
"libc",
"lru",
"maplit",
"ndk 0.6.0",
"ndk-glue",

View File

@ -58,7 +58,7 @@ digest = "0.9.0"
rtnetlink = { version = "^0", default-features = false, optional = true }
async-std-resolver = { version = "^0", optional = true }
trust-dns-resolver = { version = "^0", optional = true }
keyvaluedb = { path = "../external/keyvaluedb/keyvaluedb" }
# Dependencies for native builds only
# Linux, Windows, Mac, iOS, Android
@ -72,7 +72,6 @@ async-tungstenite = { version = "^0", features = ["async-tls"] }
maplit = "^1"
config = { version = "^0", features = ["yaml"] }
keyring-manager = { path = "../external/keyring-manager" }
lru = "^0"
async-tls = "^0.11"
igd = { path = "../external/rust-igd" }
webpki = "^0"
@ -96,8 +95,6 @@ nix = "^0"
wasm-bindgen = "^0"
js-sys = "^0"
wasm-bindgen-futures = "^0"
hashbrown = "^0"
lru = {version = "^0", features = ["hashbrown"] }
no-std-net = { path = "../external/no-std-net", features = ["serde"] }
keyvaluedb-web = { path = "../external/keyvaluedb/keyvaluedb-web" }
data-encoding = { version = "^2", default_features = false, features = ["alloc"] }

View File

@ -119,12 +119,12 @@ impl Crypto {
// load caches if they are valid for this node id
let mut db = table_store.open("crypto_caches", 1).await?;
let caches_valid = match db.load(0, b"node_id").await? {
let caches_valid = match db.load(0, b"node_id")? {
Some(v) => v.as_slice() == node_id.bytes,
None => false,
};
if caches_valid {
if let Some(b) = db.load(0, b"dh_cache").await? {
if let Some(b) = db.load(0, b"dh_cache")? {
let mut inner = self.inner.lock();
bytes_to_cache(&b, &mut inner.dh_cache);
}
@ -132,7 +132,7 @@ impl Crypto {
drop(db);
table_store.delete("crypto_caches").await?;
db = table_store.open("crypto_caches", 1).await?;
db.store(0, b"node_id", &node_id.bytes).await?;
db.store(0, b"node_id", &node_id.bytes)?;
}
// Schedule flushing
@ -159,7 +159,7 @@ impl Crypto {
};
let db = table_store.open("crypto_caches", 1).await?;
db.store(0, b"dh_cache", &cache_bytes).await?;
db.store(0, b"dh_cache", &cache_bytes)?;
Ok(())
}

View File

@ -8,6 +8,8 @@ struct TableStoreInner {
opened: BTreeMap<String, Weak<Mutex<TableDBInner>>>,
}
/// Veilid Table Storage
/// Database for storing key value pairs persistently across runs
#[derive(Clone)]
pub struct TableStore {
config: VeilidConfig,
@ -20,31 +22,38 @@ impl TableStore {
opened: BTreeMap::new(),
}
}
pub fn new(config: VeilidConfig) -> Self {
pub(crate) fn new(config: VeilidConfig) -> Self {
Self {
config,
inner: Arc::new(Mutex::new(Self::new_inner())),
}
}
pub async fn delete_all(&self) -> EyreResult<()> {
// Delete all known keys
self.delete("crypto_caches").await?;
/// Delete all known tables
pub async fn delete_all(&self) {
if let Err(e) = self.delete("crypto_caches").await {
error!("failed to delete 'crypto_caches': {}", e);
}
if let Err(e) = self.delete("RouteSpecStore").await {
error!("failed to delete 'RouteSpecStore': {}", e);
}
if let Err(e) = self.delete("routing_table").await {
error!("failed to delete 'routing_table': {}", e);
}
}
pub(crate) async fn init(&self) -> EyreResult<()> {
Ok(())
}
pub async fn init(&self) -> EyreResult<()> {
Ok(())
}
pub async fn terminate(&self) {
pub(crate) async fn terminate(&self) {
assert!(
self.inner.lock().opened.is_empty(),
"all open databases should have been closed"
);
}
pub fn on_table_db_drop(&self, table: String) {
pub(crate) fn on_table_db_drop(&self, table: String) {
let mut inner = self.inner.lock();
if inner.opened.remove(&table).is_none() {
unreachable!("should have removed an item");
@ -82,6 +91,8 @@ impl TableStore {
})
}
/// Get or create a TableDB database table. If the column count is greater than an
/// existing TableDB's column count, the database will be upgraded to add the missing columns
pub async fn open(&self, name: &str, column_count: u32) -> EyreResult<TableDB> {
let table_name = self.get_table_name(name)?;
@ -121,6 +132,7 @@ impl TableStore {
Ok(table_db)
}
/// Delete a TableDB table by name
pub async fn delete(&self, name: &str) -> EyreResult<bool> {
let table_name = self.get_table_name(name)?;

View File

@ -5,8 +5,10 @@ use serde::{Deserialize, Serialize};
cfg_if! {
if #[cfg(target_arch = "wasm32")] {
use keyvaluedb_web::*;
use keyvaluedb::*;
} else {
use keyvaluedb_sqlite::*;
use keyvaluedb::*;
}
}
@ -28,7 +30,7 @@ pub struct TableDB {
}
impl TableDB {
pub fn new(table: String, table_store: TableStore, database: Database) -> Self {
pub(super) fn new(table: String, table_store: TableStore, database: Database) -> Self {
Self {
inner: Arc::new(Mutex::new(TableDBInner {
table,
@ -38,22 +40,24 @@ impl TableDB {
}
}
pub fn try_new_from_weak_inner(weak_inner: Weak<Mutex<TableDBInner>>) -> Option<Self> {
pub(super) fn try_new_from_weak_inner(weak_inner: Weak<Mutex<TableDBInner>>) -> Option<Self> {
weak_inner.upgrade().map(|table_db_inner| Self {
inner: table_db_inner,
})
}
pub fn weak_inner(&self) -> Weak<Mutex<TableDBInner>> {
pub(super) fn weak_inner(&self) -> Weak<Mutex<TableDBInner>> {
Arc::downgrade(&self.inner)
}
pub async fn get_column_count(&self) -> EyreResult<u32> {
/// Get the total number of columns in the TableDB
pub fn get_column_count(&self) -> EyreResult<u32> {
let db = &self.inner.lock().database;
db.num_columns().wrap_err("failed to get column count: {}")
}
pub async fn get_keys(&self, col: u32) -> EyreResult<Vec<Box<[u8]>>> {
/// Get the list of keys in a column of the TableDB
pub fn get_keys(&self, col: u32) -> EyreResult<Vec<Box<[u8]>>> {
let db = &self.inner.lock().database;
let mut out: Vec<Box<[u8]>> = Vec::new();
db.iter(col, None, &mut |kv| {
@ -64,14 +68,25 @@ impl TableDB {
Ok(out)
}
pub async fn store(&self, col: u32, key: &[u8], value: &[u8]) -> EyreResult<()> {
/// Start a TableDB write transaction. The transaction object must be committed or rolled back before dropping.
pub fn transact<'a>(&'a self) -> TableDBTransaction<'a> {
let dbt = {
let db = &self.inner.lock().database;
db.transaction()
};
TableDBTransaction::new(self, dbt)
}
/// Store a key with a value in a column in the TableDB. Performs a single transaction immediately.
pub fn store(&self, col: u32, key: &[u8], value: &[u8]) -> EyreResult<()> {
let db = &self.inner.lock().database;
let mut dbt = db.transaction();
dbt.put(col, key, value);
db.write(dbt).wrap_err("failed to store key")
}
pub async fn store_cbor<T>(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()>
/// Store a key in CBOR format with a value in a column in the TableDB. Performs a single transaction immediately.
pub fn store_cbor<T>(&self, col: u32, key: &[u8], value: &T) -> EyreResult<()>
where
T: Serialize,
{
@ -83,12 +98,14 @@ impl TableDB {
db.write(dbt).wrap_err("failed to store key")
}
pub async fn load(&self, col: u32, key: &[u8]) -> EyreResult<Option<Vec<u8>>> {
/// Read a key from a column in the TableDB immediately.
pub fn load(&self, col: u32, key: &[u8]) -> EyreResult<Option<Vec<u8>>> {
let db = &self.inner.lock().database;
db.get(col, key).wrap_err("failed to get key")
}
pub async fn load_cbor<T>(&self, col: u32, key: &[u8]) -> EyreResult<Option<T>>
/// Read a key from a column in the TableDB immediately, in CBOR format.
pub fn load_cbor<T>(&self, col: u32, key: &[u8]) -> EyreResult<Option<T>>
where
T: for<'de> Deserialize<'de>,
{
@ -104,7 +121,8 @@ impl TableDB {
Ok(Some(obj))
}
pub async fn delete(&self, col: u32, key: &[u8]) -> EyreResult<bool> {
/// Delete key with from a column in the TableDB
pub fn delete(&self, col: u32, key: &[u8]) -> EyreResult<bool> {
let db = &self.inner.lock().database;
let found = db.get(col, key).wrap_err("failed to get key")?;
match found {
@ -118,3 +136,66 @@ impl TableDB {
}
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/// A TableDB transaction
/// Atomically commits a group of writes or deletes to the TableDB
pub struct TableDBTransaction<'a> {
db: &'a TableDB,
dbt: Option<DBTransaction>,
_phantom: core::marker::PhantomData<&'a ()>,
}
impl<'a> TableDBTransaction<'a> {
fn new(db: &'a TableDB, dbt: DBTransaction) -> Self {
Self {
db,
dbt: Some(dbt),
_phantom: Default::default(),
}
}
/// Commit the transaction. Performs all actions atomically.
pub fn commit(mut self) -> EyreResult<()> {
self.db
.inner
.lock()
.database
.write(self.dbt.take().unwrap())
.wrap_err("commit failed")
}
/// Rollback the transaction. Does nothing to the TableDB.
pub fn rollback(mut self) {
self.dbt = None;
}
/// Store a key with a value in a column in the TableDB
pub fn store(&mut self, col: u32, key: &[u8], value: &[u8]) {
self.dbt.as_mut().unwrap().put(col, key, value);
}
/// Store a key in CBOR format with a value in a column in the TableDB
pub fn store_cbor<T>(&mut self, col: u32, key: &[u8], value: &T) -> EyreResult<()>
where
T: Serialize,
{
let v = serde_cbor::to_vec(value).wrap_err("couldn't store as CBOR")?;
self.dbt.as_mut().unwrap().put(col, key, v.as_slice());
Ok(())
}
/// Delete key with from a column in the TableDB
pub fn delete(&mut self, col: u32, key: &[u8]) {
self.dbt.as_mut().unwrap().delete(col, key);
}
}
impl<'a> Drop for TableDBTransaction<'a> {
fn drop(&mut self) {
if self.dbt.is_some() {
warn!("Dropped transaction without commit or rollback");
}
}
}

View File

@ -3,20 +3,11 @@ use crate::xx::*;
use crate::*;
use data_encoding::BASE64URL_NOPAD;
use js_sys::*;
use send_wrapper::*;
use serde::{Deserialize, Serialize};
use wasm_bindgen_futures::*;
use web_sys::*;
#[wasm_bindgen]
extern "C" {
#[wasm_bindgen(catch, js_name = setPassword, js_namespace = ["global", "wasmhost", "keytar"])]
fn keytar_setPassword(service: &str, account: &str, password: &str)
-> Result<Promise, JsValue>;
#[wasm_bindgen(catch, js_name = getPassword, js_namespace = ["global", "wasmhost", "keytar"])]
fn keytar_getPassword(service: &str, account: &str) -> Result<Promise, JsValue>;
#[wasm_bindgen(catch, js_name = deletePassword, js_namespace = ["global", "wasmhost", "keytar"])]
fn keytar_deletePassword(service: &str, account: &str) -> Result<Promise, JsValue>;
}
#[derive(Clone)]
pub struct ProtectedStore {
config: VeilidConfig,
@ -71,33 +62,9 @@ impl ProtectedStore {
}
}
#[instrument(level = "trace", skip(self, value), ret, err)]
//#[instrument(level = "trace", skip(self, value), ret, err)]
pub async fn save_user_secret_string(&self, key: &str, value: &str) -> EyreResult<bool> {
if is_nodejs() {
let prev = match JsFuture::from(
keytar_getPassword(self.keyring_name().as_str(), key)
.map_err(map_jsvalue_error)
.wrap_err("exception thrown")?,
)
.await
{
Ok(v) => v.is_truthy(),
Err(_) => false,
};
match JsFuture::from(
keytar_setPassword(self.keyring_name().as_str(), key, value)
.map_err(map_jsvalue_error)
.wrap_err("exception thrown")?,
)
.await
{
Ok(_) => {}
Err(_) => bail!("Failed to set password"),
}
Ok(prev)
} else if is_browser() {
if is_browser() {
let win = match window() {
Some(w) => w,
None => {
@ -139,24 +106,7 @@ impl ProtectedStore {
#[instrument(level = "trace", skip(self), err)]
pub async fn load_user_secret_string(&self, key: &str) -> EyreResult<Option<String>> {
if is_nodejs() {
let prev = match JsFuture::from(
keytar_getPassword(self.keyring_name().as_str(), key)
.map_err(map_jsvalue_error)
.wrap_err("exception thrown")?,
)
.await
{
Ok(p) => p,
Err(_) => JsValue::UNDEFINED,
};
if prev.is_undefined() || prev.is_null() {
return Ok(None);
}
Ok(prev.as_string())
} else if is_browser() {
if is_browser() {
let win = match window() {
Some(w) => w,
None => {
@ -252,18 +202,7 @@ impl ProtectedStore {
#[instrument(level = "trace", skip(self), ret, err)]
pub async fn remove_user_secret(&self, key: &str) -> EyreResult<bool> {
if is_nodejs() {
match JsFuture::from(
keytar_deletePassword(self.keyring_name().as_str(), key)
.map_err(map_jsvalue_error)
.wrap_err("exception thrown")?,
)
.await
{
Ok(v) => Ok(v.is_truthy()),
Err(_) => bail!("Failed to delete"),
}
} else if is_browser() {
if is_browser() {
let win = match window() {
Some(w) => w,
None => {

View File

@ -19,10 +19,8 @@ extern "C" {
pub fn get_timestamp() -> u64 {
if utils::is_browser() {
return (Date::now() * 1000.0f64) as u64;
} else if utils::is_nodejs() {
return (Date::now() * 1000.0f64) as u64;
} else {
panic!("WASM requires browser or nodejs environment");
panic!("WASM requires browser environment");
}
}
@ -85,18 +83,22 @@ pub fn spawn<Out>(future: impl Future<Output = Out> + Send + 'static) -> MustJoi
where
Out: Send + 'static,
{
MustJoinHandle::new(Bindgen
MustJoinHandle::new(
Bindgen
.spawn_handle(future)
.expect("wasm-bindgen-futures spawn should never error out"))
.expect("wasm-bindgen-futures spawn should never error out"),
)
}
pub fn spawn_local<Out>(future: impl Future<Output = Out> + 'static) -> MustJoinHandle<Out>
where
Out: 'static,
{
MustJoinHandle::new(Bindgen
MustJoinHandle::new(
Bindgen
.spawn_handle_local(future)
.expect("wasm-bindgen-futures spawn_local should never error out"))
.expect("wasm-bindgen-futures spawn_local should never error out"),
)
}
// pub fn spawn_with_local_set<Out>(
@ -114,10 +116,10 @@ where
{
Bindgen
.spawn_handle_local(future)
.expect("wasm-bindgen-futures spawn_local should never error out").detach()
.expect("wasm-bindgen-futures spawn_local should never error out")
.detach()
}
pub fn interval<F, FUT>(freq_ms: u32, callback: F) -> SendPinBoxFuture<()>
where
F: Fn() -> FUT + Send + Sync + 'static,
@ -177,20 +179,12 @@ pub async fn get_outbound_relay_peer() -> Option<crate::veilid_api::PeerInfo> {
// // .unwrap();
// // });
// // JsFuture::from(promise).await.unwrap();
// } else if utils::is_nodejs() {
// // let promise = Promise::new(&mut |yes, _| {
// // nodejs_global_set_timeout_with_callback_and_timeout_and_arguments_0(&yes, millis)
// // .unwrap();
// // });
// // JsFuture::from(promise).await.unwrap();
// } else {
// panic!("WASM requires browser or nodejs environment");
// panic!("WASM requires browser environment");
// }
// }
pub async fn txt_lookup<S: AsRef<str>>(_host: S) -> EyreResult<Vec<String>> {
bail!("wasm does not support txt lookup")
}

View File

@ -22,7 +22,7 @@ impl TableStore {
opened: BTreeMap::new(),
}
}
pub fn new(config: VeilidConfig) -> Self {
pub(crate) fn new(config: VeilidConfig) -> Self {
Self {
config,
inner: Arc::new(Mutex::new(Self::new_inner())),
@ -30,12 +30,25 @@ impl TableStore {
}
}
pub async fn init(&self) -> EyreResult<()> {
/// Delete all known tables
pub async fn delete_all(&self) {
if let Err(e) = self.delete("crypto_caches").await {
error!("failed to delete 'crypto_caches': {}", e);
}
if let Err(e) = self.delete("RouteSpecStore").await {
error!("failed to delete 'RouteSpecStore': {}", e);
}
if let Err(e) = self.delete("routing_table").await {
error!("failed to delete 'routing_table': {}", e);
}
}
pub(crate) async fn init(&self) -> EyreResult<()> {
let _async_guard = self.async_lock.lock().await;
Ok(())
}
pub async fn terminate(&self) {
pub(crate) async fn terminate(&self) {
let _async_guard = self.async_lock.lock().await;
assert!(
self.inner.lock().opened.len() == 0,
@ -43,7 +56,7 @@ impl TableStore {
);
}
pub fn on_table_db_drop(&self, table: String) {
pub(crate) fn on_table_db_drop(&self, table: String) {
let mut inner = self.inner.lock();
match inner.opened.remove(&table) {
Some(_) => (),
@ -69,6 +82,8 @@ impl TableStore {
})
}
/// Get or create a TableDB database table. If the column count is greater than an
/// existing TableDB's column count, the database will be upgraded to add the missing columns
pub async fn open(&self, name: &str, column_count: u32) -> EyreResult<TableDB> {
let _async_guard = self.async_lock.lock().await;
let table_name = self.get_table_name(name)?;
@ -89,7 +104,10 @@ impl TableStore {
let db = Database::open(table_name.clone(), column_count)
.await
.wrap_err("failed to open tabledb")?;
info!("opened table store '{}' with table name '{:?}' with {} columns", name, table_name, column_count);
info!(
"opened table store '{}' with table name '{:?}' with {} columns",
name, table_name, column_count
);
let table_db = TableDB::new(table_name.clone(), self.clone(), db);
@ -101,6 +119,7 @@ impl TableStore {
Ok(table_db)
}
/// Delete a TableDB table by name
pub async fn delete(&self, name: &str) -> EyreResult<bool> {
let _async_guard = self.async_lock.lock().await;
trace!("TableStore::delete {}", name);
@ -117,9 +136,7 @@ impl TableStore {
}
}
if utils::is_nodejs() {
unimplemented!();
} else if utils::is_browser() {
if utils::is_browser() {
let out = match Database::delete(table_name.clone()).await {
Ok(_) => true,
Err(_) => false,

View File

@ -15,21 +15,6 @@ extern "C" {
pub fn alert(s: &str);
}
pub fn is_nodejs() -> bool {
static CACHE: AtomicI8 = AtomicI8::new(-1);
let cache = CACHE.load(Ordering::Relaxed);
if cache != -1 {
return cache != 0;
}
let res = js_sys::eval("process.release.name === 'node'")
.map(|res| res.is_truthy())
.unwrap_or_default();
CACHE.store(res as i8, Ordering::Relaxed);
res
}
pub fn is_browser() -> bool {
static CACHE: AtomicI8 = AtomicI8::new(-1);
let cache = CACHE.load(Ordering::Relaxed);
@ -60,24 +45,6 @@ pub fn is_browser() -> bool {
// res
// }
// pub fn node_require(module: &str) -> JsValue {
// if !is_nodejs() {
// return JsValue::UNDEFINED;
// }
// let mut home = env!("CARGO_MANIFEST_DIR");
// if home.len() == 0 {
// home = ".";
// }
// match js_sys::eval(format!("require(\"{}/{}\")", home, module).as_str()) {
// Ok(v) => v,
// Err(e) => {
// panic!("node_require failed: {:?}", e);
// }
// }
// }
#[derive(ThisError, Debug, Clone, Eq, PartialEq)]
#[error("JsValue error")]
pub struct JsValueError(String);

View File

@ -8,6 +8,8 @@ pub struct Bucket {
}
pub(super) type EntriesIter<'a> = alloc::collections::btree_map::Iter<'a, DHTKey, Arc<BucketEntry>>;
type BucketData = (Vec<(DHTKey, Vec<u8>)>, Option<DHTKey>);
fn state_ordering(state: BucketEntryState) -> usize {
match state {
BucketEntryState::Dead => 0,
@ -25,6 +27,32 @@ impl Bucket {
}
}
pub(super) fn load_bucket(&mut self, data: &[u8]) -> EyreResult<()> {
let bucket_data: BucketData =
serde_cbor::from_slice::<BucketData>(data).wrap_err("failed to deserialize bucket")?;
for (k, d) in bucket_data.0 {
let entryinner = serde_cbor::from_slice::<BucketEntryInner>(&d)
.wrap_err("failed to deserialize bucket entry")?;
self.entries
.insert(k, Arc::new(BucketEntry::new_with_inner(entryinner)));
}
self.newest_entry = bucket_data.1;
Ok(())
}
pub(super) fn save_bucket(&self) -> EyreResult<Vec<u8>> {
let mut entry_vec = Vec::new();
for (k, v) in &self.entries {
let entry_bytes = v.with_mut_inner(|e| serde_cbor::to_vec(e))?;
entry_vec.push((*k, entry_bytes));
}
let bucket_data: BucketData = (entry_vec, self.newest_entry.clone());
let out = serde_cbor::to_vec(&bucket_data)?;
Ok(out)
}
pub(super) fn add_entry(&mut self, node_id: DHTKey) -> NodeRef {
log_rtab!("Node added: {}", node_id.encode());

View File

@ -1,5 +1,6 @@
use super::*;
use core::sync::atomic::{AtomicU32, Ordering};
use serde::{Deserialize, Serialize};
/// Reliable pings are done with increased spacing between pings
@ -42,7 +43,7 @@ pub enum BucketEntryState {
struct LastConnectionKey(ProtocolType, AddressType);
/// Bucket entry information specific to the LocalNetwork RoutingDomain
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct BucketEntryPublicInternet {
/// The PublicInternet node info
signed_node_info: Option<Box<SignedNodeInfo>>,
@ -53,7 +54,7 @@ pub struct BucketEntryPublicInternet {
}
/// Bucket entry information specific to the LocalNetwork RoutingDomain
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct BucketEntryLocalNetwork {
/// The LocalNetwork node info
signed_node_info: Option<Box<SignedNodeInfo>>,
@ -63,19 +64,24 @@ pub struct BucketEntryLocalNetwork {
node_status: Option<LocalNetworkNodeStatus>,
}
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct BucketEntryInner {
min_max_version: Option<(u8, u8)>,
updated_since_last_network_change: bool,
#[serde(skip)]
last_connections: BTreeMap<LastConnectionKey, (ConnectionDescriptor, u64)>,
public_internet: BucketEntryPublicInternet,
local_network: BucketEntryLocalNetwork,
peer_stats: PeerStats,
#[serde(skip)]
latency_stats_accounting: LatencyStatsAccounting,
#[serde(skip)]
transfer_stats_accounting: TransferStatsAccounting,
#[cfg(feature = "tracking")]
#[serde(skip)]
next_track_id: usize,
#[cfg(feature = "tracking")]
#[serde(skip)]
node_ref_tracks: HashMap<usize, backtrace::Backtrace>,
}
@ -657,6 +663,13 @@ impl BucketEntry {
}
}
pub(super) fn new_with_inner(inner: BucketEntryInner) -> Self {
Self {
ref_count: AtomicU32::new(0),
inner: RwLock::new(inner),
}
}
// Note, that this requires -also- holding the RoutingTable read lock, as an
// immutable reference to RoutingTableInner must be passed in to get this
// This ensures that an operation on the routing table can not change entries

View File

@ -154,6 +154,20 @@ impl RoutingTable {
pub async fn init(&self) -> EyreResult<()> {
debug!("starting routing table init");
// Set up routing buckets
{
let mut inner = self.inner.write();
inner.init_buckets(self.clone());
}
// Load bucket entries from table db if possible
debug!("loading routing table entries");
if let Err(e) = self.load_buckets().await {
log_rtab!(warn "Error loading buckets from storage: {}. Resetting.", e);
let mut inner = self.inner.write();
inner.init_buckets(self.clone());
}
// Set up routespecstore
debug!("starting route spec store init");
let route_spec_store = match RouteSpecStore::load(self.clone()).await {
@ -165,10 +179,10 @@ impl RoutingTable {
};
debug!("finished route spec store init");
{
let mut inner = self.inner.write();
inner.init(self.clone())?;
inner.route_spec_store = Some(route_spec_store);
}
debug!("finished routing table init");
Ok(())
@ -188,6 +202,12 @@ impl RoutingTable {
error!("kick_buckets_task not stopped: {}", e);
}
// Load bucket entries from table db if possible
debug!("saving routing table entries");
if let Err(e) = self.save_buckets().await {
error!("failed to save routing table entries: {}", e);
}
debug!("saving route spec store");
let rss = {
let mut inner = self.inner.write();
@ -201,12 +221,67 @@ impl RoutingTable {
debug!("shutting down routing table");
let mut inner = self.inner.write();
inner.terminate();
*inner = RoutingTableInner::new(self.unlocked_inner.clone());
debug!("finished routing table terminate");
}
async fn save_buckets(&self) -> EyreResult<()> {
// Serialize all entries
let mut bucketvec: Vec<Vec<u8>> = Vec::new();
{
let inner = &*self.inner.read();
for bucket in &inner.buckets {
bucketvec.push(bucket.save_bucket()?)
}
}
let table_store = self.network_manager().table_store();
let tdb = table_store.open("routing_table", 1).await?;
let bucket_count = bucketvec.len();
let mut dbx = tdb.transact();
if let Err(e) = dbx.store_cbor(0, b"bucket_count", &bucket_count) {
dbx.rollback();
return Err(e);
}
for (n, b) in bucketvec.iter().enumerate() {
dbx.store(0, format!("bucket_{}", n).as_bytes(), b)
}
dbx.commit()?;
Ok(())
}
async fn load_buckets(&self) -> EyreResult<()> {
// Deserialize all entries
let inner = &mut *self.inner.write();
let tstore = self.network_manager().table_store();
let tdb = tstore.open("routing_table", 1).await?;
let Some(bucket_count): Option<usize> = tdb.load_cbor(0, b"bucket_count")? else {
log_rtab!(debug "no bucket count in saved routing table");
return Ok(());
};
if bucket_count != inner.buckets.len() {
// Must have the same number of buckets
warn!("bucket count is different, not loading routing table");
return Ok(());
}
let mut bucketdata_vec: Vec<Vec<u8>> = Vec::new();
for n in 0..bucket_count {
let Some(bucketdata): Option<Vec<u8>> =
tdb.load(0, format!("bucket_{}", n).as_bytes())? else {
warn!("bucket data not loading, skipping loading routing table");
return Ok(());
};
bucketdata_vec.push(bucketdata);
}
for n in 0..bucket_count {
inner.buckets[n].load_bucket(&bucketdata_vec[n])?;
}
Ok(())
}
/// Set up the local network routing domain with our local routing table configuration
pub fn configure_local_network_routing_domain(&self, local_networks: Vec<(IpAddr, IpAddr)>) {
log_net!(debug "configure_local_network_routing_domain: {:#?}", local_networks);

View File

@ -225,7 +225,23 @@ impl RouteSpecStore {
let table_store = routing_table.network_manager().table_store();
let rsstdb = table_store.open("RouteSpecStore", 1).await?;
let mut content: RouteSpecStoreContent =
rsstdb.load_cbor(0, b"content").await?.unwrap_or_default();
rsstdb.load_cbor(0, b"content")?.unwrap_or_default();
// Look up all route hop noderefs since we can't serialize those
let mut dead_keys = Vec::new();
for (k, rsd) in &mut content.details {
for h in &rsd.hops {
let Some(nr) = routing_table.lookup_node_ref(*h) else {
dead_keys.push(*k);
break;
};
rsd.hop_node_refs.push(nr);
}
}
for k in dead_keys {
log_rtab!(debug "no entry, killing off private route: {}", k.encode());
content.details.remove(&k);
}
// Load secrets from pstore
let pstore = routing_table.network_manager().protected_store();
@ -280,7 +296,7 @@ impl RouteSpecStore {
.network_manager()
.table_store();
let rsstdb = table_store.open("RouteSpecStore", 1).await?;
rsstdb.store_cbor(0, b"content", &content).await?;
rsstdb.store_cbor(0, b"content", &content)?;
// // Keep secrets in protected store as well
let pstore = self
@ -1168,10 +1184,9 @@ impl RouteSpecStore {
let mut buffer = vec![];
capnp::serialize_packed::write_message(&mut buffer, &pr_message)
.map_err(RPCError::internal)
.wrap_err("failed to convert builder to vec")?;
Ok(buffer)
// builder_to_vec(pr_message).wrap_err("failed to convert builder to vec")
}
/// Convert binary blob to private route
@ -1180,11 +1195,12 @@ impl RouteSpecStore {
blob.as_slice(),
capnp::message::ReaderOptions::new(),
)
.map_err(RPCError::internal)
.wrap_err("failed to make message reader")?;
//let reader = ::capnp::message::Reader::new(RPCMessageData::new(blob), Default::default());
let pr_reader = reader
.get_root::<veilid_capnp::private_route::Reader>()
.map_err(RPCError::internal)
.wrap_err("failed to make reader for private_route")?;
decode_private_route(&pr_reader).wrap_err("failed to decode private route")
}

View File

@ -323,19 +323,16 @@ impl RoutingTableInner {
}
}
pub fn init(&mut self, routing_table: RoutingTable) -> EyreResult<()> {
pub fn init_buckets(&mut self, routing_table: RoutingTable) {
// Size the buckets (one per bit)
self.buckets.clear();
self.buckets.reserve(DHT_KEY_LENGTH * 8);
for _ in 0..DHT_KEY_LENGTH * 8 {
let bucket = Bucket::new(routing_table.clone());
self.buckets.push(bucket);
}
Ok(())
}
pub fn terminate(&mut self) {}
pub fn configure_local_network_routing_domain(
&mut self,
local_networks: Vec<(IpAddr, IpAddr)>,

View File

@ -59,78 +59,67 @@ pub async fn test_store_delete_load(ts: TableStore) {
);
assert_eq!(
db.load(0, b"foo").await.unwrap(),
db.load(0, b"foo").unwrap(),
None,
"should not load missing key"
);
assert!(
db.store(1, b"foo", b"1234567890").await.is_ok(),
db.store(1, b"foo", b"1234567890").is_ok(),
"should store new key"
);
assert_eq!(
db.load(0, b"foo").await.unwrap(),
db.load(0, b"foo").unwrap(),
None,
"should not load missing key"
);
assert_eq!(
db.load(1, b"foo").await.unwrap(),
Some(b"1234567890".to_vec())
);
assert_eq!(db.load(1, b"foo").unwrap(), Some(b"1234567890".to_vec()));
assert!(
db.store(1, b"bar", b"FNORD").await.is_ok(),
db.store(1, b"bar", b"FNORD").is_ok(),
"should store new key"
);
assert!(
db.store(0, b"bar", b"ABCDEFGHIJKLMNOPQRSTUVWXYZ")
.await
.is_ok(),
db.store(0, b"bar", b"ABCDEFGHIJKLMNOPQRSTUVWXYZ").is_ok(),
"should store new key"
);
assert!(
db.store(2, b"bar", b"FNORD").await.is_ok(),
db.store(2, b"bar", b"FNORD").is_ok(),
"should store new key"
);
assert!(
db.store(2, b"baz", b"QWERTY").await.is_ok(),
db.store(2, b"baz", b"QWERTY").is_ok(),
"should store new key"
);
assert!(
db.store(2, b"bar", b"QWERTYUIOP").await.is_ok(),
db.store(2, b"bar", b"QWERTYUIOP").is_ok(),
"should store new key"
);
assert_eq!(db.load(1, b"bar").await.unwrap(), Some(b"FNORD".to_vec()));
assert_eq!(db.load(1, b"bar").unwrap(), Some(b"FNORD".to_vec()));
assert_eq!(
db.load(0, b"bar").await.unwrap(),
db.load(0, b"bar").unwrap(),
Some(b"ABCDEFGHIJKLMNOPQRSTUVWXYZ".to_vec())
);
assert_eq!(
db.load(2, b"bar").await.unwrap(),
Some(b"QWERTYUIOP".to_vec())
);
assert_eq!(db.load(2, b"baz").await.unwrap(), Some(b"QWERTY".to_vec()));
assert_eq!(db.load(2, b"bar").unwrap(), Some(b"QWERTYUIOP".to_vec()));
assert_eq!(db.load(2, b"baz").unwrap(), Some(b"QWERTY".to_vec()));
assert_eq!(db.delete(1, b"bar").await.unwrap(), true);
assert_eq!(db.delete(1, b"bar").await.unwrap(), false);
assert_eq!(db.delete(1, b"bar").unwrap(), true);
assert_eq!(db.delete(1, b"bar").unwrap(), false);
assert!(
db.delete(4, b"bar").await.is_err(),
db.delete(4, b"bar").is_err(),
"can't delete from column that doesn't exist"
);
drop(db);
let db = ts.open("test", 3).await.expect("should have opened");
assert_eq!(db.load(1, b"bar").await.unwrap(), None);
assert_eq!(db.load(1, b"bar").unwrap(), None);
assert_eq!(
db.load(0, b"bar").await.unwrap(),
db.load(0, b"bar").unwrap(),
Some(b"ABCDEFGHIJKLMNOPQRSTUVWXYZ".to_vec())
);
assert_eq!(
db.load(2, b"bar").await.unwrap(),
Some(b"QWERTYUIOP".to_vec())
);
assert_eq!(db.load(2, b"baz").await.unwrap(), Some(b"QWERTY".to_vec()));
assert_eq!(db.load(2, b"bar").unwrap(), Some(b"QWERTYUIOP".to_vec()));
assert_eq!(db.load(2, b"baz").unwrap(), Some(b"QWERTY".to_vec()));
}
pub async fn test_cbor(ts: TableStore) {
@ -140,11 +129,11 @@ pub async fn test_cbor(ts: TableStore) {
let db = ts.open("test", 3).await.expect("should have opened");
let (dht_key, _) = generate_secret();
assert!(db.store_cbor(0, b"asdf", &dht_key).await.is_ok());
assert!(db.store_cbor(0, b"asdf", &dht_key).is_ok());
assert_eq!(db.load_cbor::<DHTKey>(0, b"qwer").await.unwrap(), None);
assert_eq!(db.load_cbor::<DHTKey>(0, b"qwer").unwrap(), None);
let d = match db.load_cbor::<DHTKey>(0, b"asdf").await {
let d = match db.load_cbor::<DHTKey>(0, b"asdf") {
Ok(x) => x,
Err(e) => {
panic!("couldn't decode cbor: {}", e);
@ -153,12 +142,12 @@ pub async fn test_cbor(ts: TableStore) {
assert_eq!(d, Some(dht_key), "keys should be equal");
assert!(
db.store(1, b"foo", b"1234567890").await.is_ok(),
db.store(1, b"foo", b"1234567890").is_ok(),
"should store new key"
);
assert!(
db.load_cbor::<DHTKey>(1, b"foo").await.is_err(),
db.load_cbor::<DHTKey>(1, b"foo").is_err(),
"should fail to load cbor"
);
}

View File

@ -203,11 +203,11 @@ fn config_callback(key: String) -> ConfigCallbackReturn {
"network.routing_table.limit_attached_good" => Ok(Box::new(8u32)),
"network.routing_table.limit_attached_weak" => Ok(Box::new(4u32)),
"network.rpc.concurrency" => Ok(Box::new(2u32)),
"network.rpc.queue_size" => Ok(Box::new(128u32)),
"network.rpc.queue_size" => Ok(Box::new(1024u32)),
"network.rpc.max_timestamp_behind_ms" => Ok(Box::new(Some(10_000u32))),
"network.rpc.max_timestamp_ahead_ms" => Ok(Box::new(Some(10_000u32))),
"network.rpc.timeout_ms" => Ok(Box::new(10_000u32)),
"network.rpc.max_route_hop_count" => Ok(Box::new(7u8)),
"network.rpc.max_route_hop_count" => Ok(Box::new(4u8)),
"network.rpc.default_route_hop_count" => Ok(Box::new(2u8)),
"network.dht.resolve_node_timeout_ms" => Ok(Box::new(Option::<u32>::None)),
"network.dht.resolve_node_count" => Ok(Box::new(20u32)),
@ -323,9 +323,9 @@ pub async fn test_config() {
assert_eq!(inner.network.bootstrap, Vec::<String>::new());
assert_eq!(inner.network.bootstrap_nodes, Vec::<String>::new());
assert_eq!(inner.network.rpc.concurrency, 2u32);
assert_eq!(inner.network.rpc.queue_size, 128u32);
assert_eq!(inner.network.rpc.queue_size, 1024u32);
assert_eq!(inner.network.rpc.timeout_ms, 10_000u32);
assert_eq!(inner.network.rpc.max_route_hop_count, 7u8);
assert_eq!(inner.network.rpc.max_route_hop_count, 4u8);
assert_eq!(inner.network.rpc.default_route_hop_count, 2u8);
assert_eq!(inner.network.routing_table.limit_over_attached, 64u32);
assert_eq!(inner.network.routing_table.limit_fully_attached, 32u32);

View File

@ -55,10 +55,7 @@ fn get_route_id(rss: RouteSpecStore) -> impl Fn(&str) -> Option<DHTKey> {
}
fn get_safety_selection(text: &str, rss: RouteSpecStore) -> Option<SafetySelection> {
if text.len() == 0 {
return None;
}
if &text[0..1] == "-" {
if text.len() != 0 && &text[0..1] == "-" {
// Unsafe
let text = &text[1..];
let seq = get_sequencing(text).unwrap_or(Sequencing::NoPreference);

View File

@ -41,61 +41,42 @@ pub type PinBoxFutureLifetime<'a, T> = PinBox<dyn Future<Output = T> + 'a>;
pub type SendPinBoxFuture<T> = PinBox<dyn Future<Output = T> + Send + 'static>;
pub type SendPinBoxFutureLifetime<'a, T> = PinBox<dyn Future<Output = T> + Send + 'a>;
pub use std::borrow::{Cow, ToOwned};
pub use std::boxed::Box;
pub use std::cell::RefCell;
pub use std::cmp;
pub use std::collections::btree_map::BTreeMap;
pub use std::collections::btree_set::BTreeSet;
pub use std::collections::hash_map::HashMap;
pub use std::collections::hash_set::HashSet;
pub use std::collections::LinkedList;
pub use std::collections::VecDeque;
pub use std::convert::{TryFrom, TryInto};
pub use std::fmt;
pub use std::future::Future;
pub use std::mem;
pub use std::ops::{Fn, FnMut, FnOnce};
pub use std::pin::Pin;
pub use std::rc::Rc;
pub use std::string::String;
pub use std::sync::atomic::{AtomicBool, Ordering};
pub use std::sync::{Arc, Weak};
pub use std::task;
pub use std::time::Duration;
pub use std::vec::Vec;
cfg_if! {
if #[cfg(target_arch = "wasm32")] {
pub use alloc::string::String;
pub use alloc::vec::Vec;
pub use alloc::collections::LinkedList;
pub use alloc::collections::VecDeque;
pub use alloc::collections::btree_map::BTreeMap;
pub use alloc::collections::btree_set::BTreeSet;
pub use hashbrown::hash_map::HashMap;
pub use hashbrown::hash_set::HashSet;
pub use alloc::boxed::Box;
pub use alloc::borrow::{Cow, ToOwned};
pub use wasm_bindgen::prelude::*;
pub use core::cmp;
pub use core::convert::{TryFrom, TryInto};
pub use core::mem;
pub use core::fmt;
pub use alloc::rc::Rc;
pub use core::cell::RefCell;
pub use core::task;
pub use core::future::Future;
pub use core::time::Duration;
pub use core::pin::Pin;
pub use core::sync::atomic::{Ordering, AtomicBool};
pub use alloc::sync::{Arc, Weak};
pub use core::ops::{FnOnce, FnMut, Fn};
pub use async_lock::Mutex as AsyncMutex;
pub use async_lock::MutexGuard as AsyncMutexGuard;
pub use async_lock::MutexGuardArc as AsyncMutexGuardArc;
pub use no_std_net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr };
pub use async_executors::JoinHandle as LowLevelJoinHandle;
pub use no_std_net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr };
} else {
pub use std::string::String;
pub use std::vec::Vec;
pub use std::collections::LinkedList;
pub use std::collections::VecDeque;
pub use std::collections::btree_map::BTreeMap;
pub use std::collections::btree_set::BTreeSet;
pub use std::collections::hash_map::HashMap;
pub use std::collections::hash_set::HashSet;
pub use std::boxed::Box;
pub use std::borrow::{Cow, ToOwned};
pub use std::cmp;
pub use std::convert::{TryFrom, TryInto};
pub use std::mem;
pub use std::fmt;
pub use std::sync::atomic::{Ordering, AtomicBool};
pub use std::sync::{Arc, Weak};
pub use std::rc::Rc;
pub use std::cell::RefCell;
pub use std::task;
pub use std::future::Future;
pub use std::time::Duration;
pub use std::pin::Pin;
pub use std::ops::{FnOnce, FnMut, Fn};
cfg_if! {
if #[cfg(feature="rt-async-std")] {
pub use async_std::sync::Mutex as AsyncMutex;