[skip ci] checkpoint. refactor.

This commit is contained in:
Christien Rioux 2025-01-25 20:17:43 -05:00
parent 6337d445ed
commit ed4bbceb17
36 changed files with 1072 additions and 957 deletions

View File

@ -0,0 +1,195 @@
use std::marker::PhantomData;
use super::*;
trait AsAnyArcSendSync {
fn as_any_arc_send_sync(self: Arc<Self>) -> Arc<dyn core::any::Any + Send + Sync>;
}
impl<T: Send + Sync + 'static> AsAnyArcSendSync for T {
fn as_any_arc_send_sync(self: Arc<Self>) -> Arc<dyn core::any::Any + Send + Sync> {
self
}
}
pub trait VeilidComponent: AsAnyArcSendSync + core::fmt::Debug {
fn registry(&self) -> VeilidComponentRegistry;
fn init(&self) -> SendPinBoxFutureLifetime<'_, EyreResult<()>>;
fn terminate(&self) -> SendPinBoxFutureLifetime<'_, ()>;
// Registry shortcuts
fn config(&self) -> VeilidConfig {
self.registry().config()
}
fn update_callback(&self) -> UpdateCallback {
self.registry().update_callback()
}
fn event_bus(&self) -> EventBus {
self.registry().event_bus()
}
}
pub struct VeilidComponentGuard<'a, T: VeilidComponent + Send + Sync + 'static> {
component: Arc<T>,
_phantom: core::marker::PhantomData<&'a T>,
}
impl<'a, T> core::ops::Deref for VeilidComponentGuard<'a, T>
where
T: VeilidComponent + Send + Sync + 'static,
{
type Target = T;
fn deref(&self) -> &Self::Target {
&self.component
}
}
#[derive(Debug)]
struct VeilidComponentRegistryInner {
type_map: HashMap<core::any::TypeId, Arc<dyn VeilidComponent + Send + Sync>>,
init_order: Vec<core::any::TypeId>,
}
#[derive(Clone, Debug)]
pub struct VeilidComponentRegistry {
inner: Arc<Mutex<VeilidComponentRegistryInner>>,
config: VeilidConfig,
event_bus: EventBus,
init_lock: Arc<AsyncMutex<bool>>,
}
impl VeilidComponentRegistry {
pub fn new(config: VeilidConfig) -> Self {
Self {
inner: Arc::new(Mutex::new(VeilidComponentRegistryInner {
type_map: HashMap::new(),
init_order: Vec::new(),
})),
config,
event_bus: EventBus::new(),
init_lock: Arc::new(AsyncMutex::new(false)),
}
}
pub fn register<
T: VeilidComponent + Send + Sync + 'static,
F: FnOnce(VeilidComponentRegistry) -> T,
>(
&self,
component_constructor: F,
) {
let component = Arc::new(component_constructor(self.clone()));
let component_type_id = core::any::TypeId::of::<T>();
let mut inner = self.inner.lock();
assert!(
inner
.type_map
.insert(component_type_id, component)
.is_none(),
"should not register same component twice"
);
inner.init_order.push(component_type_id);
}
pub async fn init(&self) -> EyreResult<()> {
let Some(mut _init_guard) = asyncmutex_try_lock!(self.init_lock) else {
bail!("init should only happen one at a time");
};
if *_init_guard {
bail!("already initialized");
}
let init_order = self.get_init_order();
let mut initialized = vec![];
for component in init_order {
if let Err(e) = component.init().await {
self.terminate_inner(initialized).await;
return Err(e);
}
initialized.push(component);
}
*_init_guard = true;
Ok(())
}
pub async fn terminate(&self) {
let Some(mut _init_guard) = asyncmutex_try_lock!(self.init_lock) else {
panic!("terminate should only happen one at a time");
};
if !*_init_guard {
panic!("not initialized");
}
let init_order = self.get_init_order();
self.terminate_inner(init_order).await;
*_init_guard = false;
}
async fn terminate_inner(&self, initialized: Vec<Arc<dyn VeilidComponent + Send + Sync>>) {
for component in initialized.iter().rev() {
component.terminate().await;
}
}
fn get_init_order(&self) -> Vec<Arc<dyn VeilidComponent + Send + Sync>> {
let inner = self.inner.lock();
inner
.init_order
.iter()
.map(|id| inner.type_map.get(id).unwrap().clone())
.collect::<Vec<_>>()
}
//////////////////////////////////////////////////////////////
pub fn config(&self) -> VeilidConfig {
self.config.clone()
}
pub fn update_callback(&self) -> UpdateCallback {
self.config.update_callback()
}
pub fn event_bus(&self) -> EventBus {
self.event_bus.clone()
}
pub fn lookup<'a, T: VeilidComponent + Send + Sync + 'static>(
&self,
) -> Option<VeilidComponentGuard<'a, T>> {
let inner = self.inner.lock();
let component_type_id = core::any::TypeId::of::<T>();
let component_dyn = inner.type_map.get(&component_type_id)?.clone();
let component = component_dyn
.as_any_arc_send_sync()
.downcast::<T>()
.unwrap();
Some(VeilidComponentGuard {
component,
_phantom: PhantomData {},
})
}
}
macro_rules! impl_veilid_component {
($component_name:ident) => {
impl VeilidComponent for $component_name {
fn registry(&self) -> VeilidComponentRegistry {
self.registry.clone()
}
fn init(&self) -> SendPinBoxFutureLifetime<'_, EyreResult<()>> {
Box::pin(async { self.init_async().await })
}
fn terminate(&self) -> SendPinBoxFutureLifetime<'_, ()> {
Box::pin(async { self.terminate_async().await })
}
}
};
}
pub(crate) use impl_veilid_component;

View File

@ -8,234 +8,11 @@ use crate::*;
pub type UpdateCallback = Arc<dyn Fn(VeilidUpdate) + Send + Sync>; pub type UpdateCallback = Arc<dyn Fn(VeilidUpdate) + Send + Sync>;
/// Internal services startup mechanism. type InitKey = (String, String);
/// Ensures that everything is started up, and shut down in the right order
/// and provides an atomic state for if the system is properly operational.
struct StartupShutdownContext {
pub config: VeilidConfig,
pub update_callback: UpdateCallback,
pub event_bus: Option<EventBus>,
pub protected_store: Option<ProtectedStore>,
pub table_store: Option<TableStore>,
#[cfg(feature = "unstable-blockstore")]
pub block_store: Option<BlockStore>,
pub crypto: Option<Crypto>,
pub attachment_manager: Option<AttachmentManager>,
pub storage_manager: Option<StorageManager>,
}
impl StartupShutdownContext {
pub fn new_empty(config: VeilidConfig, update_callback: UpdateCallback) -> Self {
Self {
config,
update_callback,
event_bus: None,
protected_store: None,
table_store: None,
#[cfg(feature = "unstable-blockstore")]
block_store: None,
crypto: None,
attachment_manager: None,
storage_manager: None,
}
}
#[allow(clippy::too_many_arguments)]
pub fn new_full(context: VeilidCoreContext) -> Self {
Self {
config: context.config,
update_callback: context.update_callback,
event_bus: Some(context.event_bus),
protected_store: Some(context.protected_store),
table_store: Some(context.table_store),
#[cfg(feature = "unstable-blockstore")]
block_store: Some(context.block_store),
crypto: Some(context.crypto),
attachment_manager: Some(context.attachment_manager),
storage_manager: Some(context.storage_manager),
}
}
#[instrument(level = "trace", target = "core_context", err, skip_all)]
pub async fn startup(mut self) -> EyreResult<VeilidCoreContext> {
info!("Veilid API starting up");
info!("init api tracing");
let (program_name, namespace) = {
let config = self.config.get();
(config.program_name.clone(), config.namespace.clone())
};
ApiTracingLayer::add_callback(program_name, namespace, self.update_callback.clone())
.await?;
// Add the event bus
let event_bus = EventBus::new();
if let Err(e) = event_bus.startup().await {
error!("failed to start up event bus: {}", e);
self.shutdown().await;
return Err(e.into());
}
self.event_bus = Some(event_bus.clone());
// Set up protected store
let protected_store = ProtectedStore::new(event_bus.clone(), self.config.clone());
if let Err(e) = protected_store.init().await {
error!("failed to init protected store: {}", e);
self.shutdown().await;
return Err(e);
}
self.protected_store = Some(protected_store.clone());
// Set up tablestore and crypto system
let table_store = TableStore::new(
event_bus.clone(),
self.config.clone(),
protected_store.clone(),
);
let crypto = Crypto::new(event_bus.clone(), self.config.clone(), table_store.clone());
table_store.set_crypto(crypto.clone());
// Initialize table store first, so crypto code can load caches
// Tablestore can use crypto during init, just not any cached operations or things
// that require flushing back to the tablestore
if let Err(e) = table_store.init().await {
error!("failed to init table store: {}", e);
self.shutdown().await;
return Err(e);
}
self.table_store = Some(table_store.clone());
// Set up crypto
if let Err(e) = crypto.init().await {
error!("failed to init crypto: {}", e);
self.shutdown().await;
return Err(e);
}
self.crypto = Some(crypto.clone());
// Set up block store
#[cfg(feature = "unstable-blockstore")]
{
let block_store = BlockStore::new(event_bus.clone(), self.config.clone());
if let Err(e) = block_store.init().await {
error!("failed to init block store: {}", e);
self.shutdown().await;
return Err(e);
}
self.block_store = Some(block_store.clone());
}
// Set up storage manager
let update_callback = self.update_callback.clone();
let storage_manager = StorageManager::new(
event_bus.clone(),
self.config.clone(),
self.crypto.clone().unwrap(),
self.table_store.clone().unwrap(),
#[cfg(feature = "unstable-blockstore")]
self.block_store.clone().unwrap(),
);
if let Err(e) = storage_manager.init(update_callback).await {
error!("failed to init storage manager: {}", e);
self.shutdown().await;
return Err(e);
}
self.storage_manager = Some(storage_manager.clone());
// Set up attachment manager
let update_callback = self.update_callback.clone();
let attachment_manager = AttachmentManager::new(
event_bus.clone(),
self.config.clone(),
storage_manager,
table_store,
#[cfg(feature = "unstable-blockstore")]
block_store,
crypto,
);
if let Err(e) = attachment_manager.init(update_callback).await {
error!("failed to init attachment manager: {}", e);
self.shutdown().await;
return Err(e);
}
self.attachment_manager = Some(attachment_manager);
info!("Veilid API startup complete");
Ok(VeilidCoreContext {
config: self.config,
update_callback: self.update_callback,
event_bus: self.event_bus.unwrap(),
storage_manager: self.storage_manager.unwrap(),
protected_store: self.protected_store.unwrap(),
table_store: self.table_store.unwrap(),
#[cfg(feature = "unstable-blockstore")]
block_store: self.block_store.unwrap(),
crypto: self.crypto.unwrap(),
attachment_manager: self.attachment_manager.unwrap(),
})
}
#[instrument(level = "trace", target = "core_context", skip_all)]
pub async fn shutdown(mut self) {
info!("Veilid API shutting down");
if let Some(attachment_manager) = self.attachment_manager.take() {
attachment_manager.terminate().await;
}
if let Some(storage_manager) = self.storage_manager.take() {
storage_manager.terminate().await;
}
#[cfg(feature = "unstable-blockstore")]
if let Some(block_store) = self.block_store.take() {
block_store.terminate().await;
}
if let Some(crypto) = self.crypto.take() {
crypto.terminate().await;
}
if let Some(table_store) = self.table_store.take() {
table_store.terminate().await;
}
if let Some(protected_store) = self.protected_store.take() {
protected_store.terminate().await;
}
if let Some(event_bus) = self.event_bus.take() {
event_bus.shutdown().await;
}
info!("Veilid API shutdown complete");
// api logger terminate is idempotent
let (program_name, namespace) = {
let config = self.config.get();
(config.program_name.clone(), config.namespace.clone())
};
if let Err(e) = ApiTracingLayer::remove_callback(program_name, namespace).await {
error!("Error removing callback from ApiTracingLayer: {}", e);
}
// send final shutdown update
(self.update_callback)(VeilidUpdate::Shutdown);
}
}
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
pub struct VeilidCoreContext { pub(crate) struct VeilidCoreContext {
pub config: VeilidConfig, registry: VeilidComponentRegistry,
pub update_callback: UpdateCallback,
// Event bus
pub event_bus: EventBus,
// Services
pub crypto: Crypto,
pub protected_store: ProtectedStore,
pub table_store: TableStore,
#[cfg(feature = "unstable-blockstore")]
pub block_store: BlockStore,
pub storage_manager: StorageManager,
pub attachment_manager: AttachmentManager,
} }
impl VeilidCoreContext { impl VeilidCoreContext {
@ -245,10 +22,9 @@ impl VeilidCoreContext {
config_callback: ConfigCallback, config_callback: ConfigCallback,
) -> VeilidAPIResult<VeilidCoreContext> { ) -> VeilidAPIResult<VeilidCoreContext> {
// Set up config from callback // Set up config from callback
let mut config = VeilidConfig::new(); let config = VeilidConfig::new_from_callback(config_callback, update_callback)?;
config.setup(config_callback, update_callback.clone())?;
Self::new_common(update_callback, config).await Self::new_common(config).await
} }
#[instrument(level = "trace", target = "core_context", err, skip_all)] #[instrument(level = "trace", target = "core_context", err, skip_all)]
@ -257,16 +33,12 @@ impl VeilidCoreContext {
config_inner: VeilidConfigInner, config_inner: VeilidConfigInner,
) -> VeilidAPIResult<VeilidCoreContext> { ) -> VeilidAPIResult<VeilidCoreContext> {
// Set up config from json // Set up config from json
let mut config = VeilidConfig::new(); let config = VeilidConfig::new_from_config(config_inner, update_callback);
config.setup_from_config(config_inner, update_callback.clone())?; Self::new_common(config).await
Self::new_common(update_callback, config).await
} }
#[instrument(level = "trace", target = "core_context", err, skip_all)] #[instrument(level = "trace", target = "core_context", err, skip_all)]
async fn new_common( async fn new_common(config: VeilidConfig) -> VeilidAPIResult<VeilidCoreContext> {
update_callback: UpdateCallback,
config: VeilidConfig,
) -> VeilidAPIResult<VeilidCoreContext> {
cfg_if! { cfg_if! {
if #[cfg(target_os = "android")] { if #[cfg(target_os = "android")] {
if !crate::intf::android::is_android_ready() { if !crate::intf::android::is_android_ready() {
@ -275,19 +47,88 @@ impl VeilidCoreContext {
} }
} }
let sc = StartupShutdownContext::new_empty(config.clone(), update_callback); info!("Veilid API starting up");
sc.startup().await.map_err(VeilidAPIError::generic)
let (program_name, namespace, update_callback) = {
let cfginner = config.get();
(
cfginner.program_name.clone(),
cfginner.namespace.clone(),
config.update_callback(),
)
};
ApiTracingLayer::add_callback(program_name, namespace, update_callback.clone()).await?;
// Create component registry
let registry = VeilidComponentRegistry::new(config);
// Register all components
registry.register(ProtectedStore::new);
// Initialize table store first, so crypto code can load caches
// Tablestore can use crypto during init, just not any cached operations or things
// that require flushing back to the tablestore
registry.register(TableStore::new);
registry.register(Crypto::new);
#[cfg(feature = "unstable-blockstore")]
registry.register(BlockStore::new);
registry.register(StorageManager::new);
registry.register(AttachmentManager::new);
// Run initialization
registry.init().await.map_err(VeilidAPIError::internal)?;
info!("Veilid API startup complete");
Ok(Self { registry })
} }
#[instrument(level = "trace", target = "core_context", skip_all)] #[instrument(level = "trace", target = "core_context", skip_all)]
async fn shutdown(self) { async fn shutdown(self) {
let sc = StartupShutdownContext::new_full(self); info!("Veilid API shutdown complete");
sc.shutdown().await;
let (program_name, namespace, update_callback) = {
let config = self.registry.config();
let cfginner = config.get();
(
cfginner.program_name.clone(),
cfginner.namespace.clone(),
config.update_callback(),
)
};
self.registry.terminate().await;
if let Err(e) = ApiTracingLayer::remove_callback(program_name, namespace).await {
error!("Error removing callback from ApiTracingLayer: {}", e);
}
// send final shutdown update
update_callback(VeilidUpdate::Shutdown);
}
pub fn registry(&self) -> VeilidComponentRegistry {
self.registry.clone()
} }
} }
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
pub trait RegisteredComponents: VeilidComponent {
fn protected_store(&self) -> VeilidComponentGuard<'_, ProtectedStore> {
self.registry().lookup::<ProtectedStore>().unwrap()
}
fn table_store(&self) -> VeilidComponentGuard<'_, TableStore> {
self.registry().lookup::<TableStore>().unwrap()
}
fn crypto(&self) -> VeilidComponentGuard<'_, Crypto> {
self.registry().lookup::<Crypto>().unwrap()
}
}
impl<T: VeilidComponent> RegisteredComponents for T {}
/////////////////////////////////////////////////////////////////////////////
lazy_static::lazy_static! { lazy_static::lazy_static! {
static ref INITIALIZED: Mutex<HashSet<(String,String)>> = Mutex::new(HashSet::new()); static ref INITIALIZED: Mutex<HashSet<(String,String)>> = Mutex::new(HashSet::new());
static ref STARTUP_TABLE: AsyncTagLockTable<(String, String)> = AsyncTagLockTable::new(); static ref STARTUP_TABLE: AsyncTagLockTable<(String, String)> = AsyncTagLockTable::new();
@ -384,7 +225,6 @@ pub async fn api_startup_config(
// Get the program_name and namespace we're starting up in // Get the program_name and namespace we're starting up in
let program_name = config.program_name.clone(); let program_name = config.program_name.clone();
let namespace = config.namespace.clone(); let namespace = config.namespace.clone();
let init_key = (program_name, namespace); let init_key = (program_name, namespace);
// Only allow one startup/shutdown per program_name+namespace combination simultaneously // Only allow one startup/shutdown per program_name+namespace combination simultaneously
@ -410,8 +250,10 @@ pub async fn api_startup_config(
#[instrument(level = "trace", target = "core_context", skip_all)] #[instrument(level = "trace", target = "core_context", skip_all)]
pub(crate) async fn api_shutdown(context: VeilidCoreContext) { pub(crate) async fn api_shutdown(context: VeilidCoreContext) {
let init_key = { let init_key = {
let config = context.config.get(); let registry = context.registry();
(config.program_name.clone(), config.namespace.clone()) let config = registry.config();
let cfginner = config.get();
(cfginner.program_name.clone(), cfginner.namespace.clone())
}; };
// Only allow one startup/shutdown per program_name+namespace combination simultaneously // Only allow one startup/shutdown per program_name+namespace combination simultaneously

View File

@ -5,7 +5,7 @@ const VEILID_DOMAIN_API: &[u8] = b"VEILID_API";
pub trait CryptoSystem { pub trait CryptoSystem {
// Accessors // Accessors
fn kind(&self) -> CryptoKind; fn kind(&self) -> CryptoKind;
fn crypto(&self) -> Crypto; fn crypto(&self) -> VeilidComponentGuard<'_, Crypto>;
// Cached Operations // Cached Operations
fn cached_dh(&self, key: &PublicKey, secret: &SecretKey) -> VeilidAPIResult<SharedSecret>; fn cached_dh(&self, key: &PublicKey, secret: &SecretKey) -> VeilidAPIResult<SharedSecret>;

View File

@ -67,7 +67,7 @@ impl Envelope {
#[instrument(level = "trace", target = "envelope", skip_all)] #[instrument(level = "trace", target = "envelope", skip_all)]
pub fn from_signed_data( pub fn from_signed_data(
crypto: Crypto, crypto: &Crypto,
data: &[u8], data: &[u8],
network_key: &Option<SharedSecret>, network_key: &Option<SharedSecret>,
) -> VeilidAPIResult<Envelope> { ) -> VeilidAPIResult<Envelope> {
@ -193,7 +193,7 @@ impl Envelope {
#[instrument(level = "trace", target = "envelope", skip_all)] #[instrument(level = "trace", target = "envelope", skip_all)]
pub fn decrypt_body( pub fn decrypt_body(
&self, &self,
crypto: Crypto, crypto: &Crypto,
data: &[u8], data: &[u8],
node_id_secret: &SecretKey, node_id_secret: &SecretKey,
network_key: &Option<SharedSecret>, network_key: &Option<SharedSecret>,
@ -226,7 +226,7 @@ impl Envelope {
#[instrument(level = "trace", target = "envelope", skip_all, err)] #[instrument(level = "trace", target = "envelope", skip_all, err)]
pub fn to_encrypted_data( pub fn to_encrypted_data(
&self, &self,
crypto: Crypto, crypto: &Crypto,
body: &[u8], body: &[u8],
node_id_secret: &SecretKey, node_id_secret: &SecretKey,
network_key: &Option<SharedSecret>, network_key: &Option<SharedSecret>,

View File

@ -29,9 +29,21 @@ use core::convert::TryInto;
use dh_cache::*; use dh_cache::*;
use hashlink::linked_hash_map::Entry; use hashlink::linked_hash_map::Entry;
use hashlink::LruCache; use hashlink::LruCache;
use std::marker::PhantomData;
/// Handle to a particular cryptosystem /// Guard to access a particular cryptosystem
pub type CryptoSystemVersion = Arc<dyn CryptoSystem + Send + Sync>; pub struct CryptoSystemGuard<'a> {
crypto_system: Arc<dyn CryptoSystem + Send + Sync>,
_phantom: core::marker::PhantomData<&'a (dyn CryptoSystem + Send + Sync)>,
}
impl<'a> core::ops::Deref for CryptoSystemGuard<'a> {
type Target = dyn CryptoSystem + Send + Sync;
fn deref(&self) -> &Self::Target {
self.crypto_system.as_ref()
}
}
cfg_if! { cfg_if! {
if #[cfg(all(feature = "enable-crypto-none", feature = "enable-crypto-vld0"))] { if #[cfg(all(feature = "enable-crypto-none", feature = "enable-crypto-vld0"))] {
@ -72,89 +84,71 @@ pub fn best_envelope_version() -> EnvelopeVersion {
struct CryptoInner { struct CryptoInner {
dh_cache: DHCache, dh_cache: DHCache,
flush_future: Option<SendPinBoxFuture<()>>, flush_future: Option<SendPinBoxFuture<()>>,
#[cfg(feature = "enable-crypto-vld0")]
crypto_vld0: Option<Arc<dyn CryptoSystem + Send + Sync>>,
#[cfg(feature = "enable-crypto-none")]
crypto_none: Option<Arc<dyn CryptoSystem + Send + Sync>>,
} }
struct CryptoUnlockedInner { impl fmt::Debug for CryptoInner {
_event_bus: EventBus, fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
config: VeilidConfig, f.debug_struct("CryptoInner")
table_store: TableStore, //.field("dh_cache", &self.dh_cache)
// .field("flush_future", &self.flush_future)
// .field("crypto_vld0", &self.crypto_vld0)
// .field("crypto_none", &self.crypto_none)
.finish()
}
} }
/// Crypto factory implementation /// Crypto factory implementation
#[derive(Clone)] #[derive(Debug)]
pub struct Crypto { pub struct Crypto {
unlocked_inner: Arc<CryptoUnlockedInner>, registry: VeilidComponentRegistry,
inner: Arc<Mutex<CryptoInner>>, inner: Arc<Mutex<CryptoInner>>,
#[cfg(feature = "enable-crypto-vld0")]
crypto_vld0: Arc<dyn CryptoSystem + Send + Sync>,
#[cfg(feature = "enable-crypto-none")]
crypto_none: Arc<dyn CryptoSystem + Send + Sync>,
} }
impl_veilid_component!(Crypto);
impl Crypto { impl Crypto {
fn new_inner() -> CryptoInner { fn new_inner() -> CryptoInner {
CryptoInner { CryptoInner {
dh_cache: DHCache::new(DH_CACHE_SIZE), dh_cache: DHCache::new(DH_CACHE_SIZE),
flush_future: None, flush_future: None,
#[cfg(feature = "enable-crypto-vld0")]
crypto_vld0: None,
#[cfg(feature = "enable-crypto-none")]
crypto_none: None,
} }
} }
pub fn new(event_bus: EventBus, config: VeilidConfig, table_store: TableStore) -> Self { pub fn new(registry: VeilidComponentRegistry) -> Self {
let out = Self { Self {
unlocked_inner: Arc::new(CryptoUnlockedInner { registry: registry.clone(),
_event_bus: event_bus,
config,
table_store,
}),
inner: Arc::new(Mutex::new(Self::new_inner())), inner: Arc::new(Mutex::new(Self::new_inner())),
}; #[cfg(feature = "enable-crypto-vld0")]
crypto_vld0: Arc::new(vld0::CryptoSystemVLD0::new(registry.clone())),
#[cfg(feature = "enable-crypto-vld0")] #[cfg(feature = "enable-crypto-none")]
{ crypto_none: Arc::new(none::CryptoSystemNONE::new(registry.clone())),
out.inner.lock().crypto_vld0 = Some(Arc::new(vld0::CryptoSystemVLD0::new(out.clone())));
} }
#[cfg(feature = "enable-crypto-none")]
{
out.inner.lock().crypto_none = Some(Arc::new(none::CryptoSystemNONE::new(out.clone())));
}
out
}
pub fn config(&self) -> VeilidConfig {
self.unlocked_inner.config.clone()
} }
#[instrument(level = "trace", target = "crypto", skip_all, err)] #[instrument(level = "trace", target = "crypto", skip_all, err)]
pub async fn init(&self) -> EyreResult<()> { async fn init_async(&self) -> EyreResult<()> {
let table_store = self.unlocked_inner.table_store.clone();
// Init node id from config // Init node id from config
if let Err(e) = self if let Err(e) = self.init_node_ids().await {
.unlocked_inner
.config
.init_node_ids(self.clone(), table_store.clone())
.await
{
return Err(e).wrap_err("init node id failed"); return Err(e).wrap_err("init node id failed");
} }
// make local copy of node id for easy access // make local copy of node id for easy access
let mut cache_validity_key: Vec<u8> = Vec::new(); let mut cache_validity_key: Vec<u8> = Vec::new();
{ self.config().with(|c| {
let c = self.unlocked_inner.config.get();
for ck in VALID_CRYPTO_KINDS { for ck in VALID_CRYPTO_KINDS {
if let Some(nid) = c.network.routing_table.node_id.get(ck) { if let Some(nid) = c.network.routing_table.node_id.get(ck) {
cache_validity_key.append(&mut nid.value.bytes.to_vec()); cache_validity_key.append(&mut nid.value.bytes.to_vec());
} }
} }
}; });
// load caches if they are valid for this node id // load caches if they are valid for this node id
let table_store = self.table_store();
let mut db = table_store let mut db = table_store
.open("crypto_caches", 1) .open("crypto_caches", 1)
.await .await
@ -177,11 +171,11 @@ impl Crypto {
} }
// Schedule flushing // Schedule flushing
let this = self.clone(); let registry = self.registry();
let flush_future = interval("crypto flush", 60000, move || { let flush_future = interval("crypto flush", 60000, move || {
let this = this.clone(); let crypto = registry.lookup::<Crypto>().unwrap();
async move { async move {
if let Err(e) = this.flush().await { if let Err(e) = crypto.flush().await {
warn!("flush failed: {}", e); warn!("flush failed: {}", e);
} }
} }
@ -197,16 +191,12 @@ impl Crypto {
cache_to_bytes(&inner.dh_cache) cache_to_bytes(&inner.dh_cache)
}; };
let db = self let db = self.table_store().open("crypto_caches", 1).await?;
.unlocked_inner
.table_store
.open("crypto_caches", 1)
.await?;
db.store(0, b"dh_cache", &cache_bytes).await?; db.store(0, b"dh_cache", &cache_bytes).await?;
Ok(()) Ok(())
} }
pub async fn terminate(&self) { async fn terminate_async(&self) {
let flush_future = self.inner.lock().flush_future.take(); let flush_future = self.inner.lock().flush_future.take();
if let Some(f) = flush_future { if let Some(f) = flush_future {
f.await; f.await;
@ -223,19 +213,25 @@ impl Crypto {
} }
/// Factory method to get a specific crypto version /// Factory method to get a specific crypto version
pub fn get(&self, kind: CryptoKind) -> Option<CryptoSystemVersion> { pub fn get(&self, kind: CryptoKind) -> Option<CryptoSystemGuard<'_>> {
let inner = self.inner.lock(); let inner = self.inner.lock();
match kind { match kind {
#[cfg(feature = "enable-crypto-vld0")] #[cfg(feature = "enable-crypto-vld0")]
CRYPTO_KIND_VLD0 => Some(inner.crypto_vld0.clone().unwrap()), CRYPTO_KIND_VLD0 => Some(CryptoSystemGuard {
crypto_system: inner.crypto_vld0.clone().unwrap(),
_phantom: PhantomData {},
}),
#[cfg(feature = "enable-crypto-none")] #[cfg(feature = "enable-crypto-none")]
CRYPTO_KIND_NONE => Some(inner.crypto_none.clone().unwrap()), CRYPTO_KIND_NONE => Some(CryptoSystemGuard {
crypto_system: inner.crypto_none.clone().unwrap(),
_phantom: PhantomData {},
}),
_ => None, _ => None,
} }
} }
// Factory method to get the best crypto version // Factory method to get the best crypto version
pub fn best(&self) -> CryptoSystemVersion { pub fn best(&self) -> CryptoSystemGuard<'_> {
self.get(best_crypto_kind()).unwrap() self.get(best_crypto_kind()).unwrap()
} }
@ -331,4 +327,118 @@ impl Crypto {
} }
Ok(()) Ok(())
} }
#[cfg(not(test))]
async fn init_node_id(
&self,
vcrypto: CryptoSystemGuard<'_>,
) -> VeilidAPIResult<(TypedKey, TypedSecret)> {
let config = self.config();
let ck = vcrypto.kind();
let (mut node_id, mut node_id_secret) = config.with(|c| {
(
c.network.routing_table.node_id.get(ck),
c.network.routing_table.node_id_secret.get(ck),
)
});
// See if node id was previously stored in the table store
let table_store = self.table_store();
let config_table = table_store.open("__veilid_config", 1).await?;
let table_key_node_id = format!("node_id_{}", ck);
let table_key_node_id_secret = format!("node_id_secret_{}", ck);
if node_id.is_none() {
log_tstore!(debug "pulling {} from storage", table_key_node_id);
if let Ok(Some(stored_node_id)) = config_table
.load_json::<TypedKey>(0, table_key_node_id.as_bytes())
.await
{
log_tstore!(debug "{} found in storage", table_key_node_id);
node_id = Some(stored_node_id);
} else {
log_tstore!(debug "{} not found in storage", table_key_node_id);
}
}
// See if node id secret was previously stored in the protected store
if node_id_secret.is_none() {
log_tstore!(debug "pulling {} from storage", table_key_node_id_secret);
if let Ok(Some(stored_node_id_secret)) = config_table
.load_json::<TypedSecret>(0, table_key_node_id_secret.as_bytes())
.await
{
log_tstore!(debug "{} found in storage", table_key_node_id_secret);
node_id_secret = Some(stored_node_id_secret);
} else {
log_tstore!(debug "{} not found in storage", table_key_node_id_secret);
}
}
// If we have a node id from storage, check it
let (node_id, node_id_secret) =
if let (Some(node_id), Some(node_id_secret)) = (node_id, node_id_secret) {
// Validate node id
if !vcrypto.validate_keypair(&node_id.value, &node_id_secret.value) {
apibail_generic!(format!(
"node_id_secret_{} and node_id_key_{} don't match",
ck, ck
));
}
(node_id, node_id_secret)
} else {
// If we still don't have a valid node id, generate one
log_tstore!(debug "generating new node_id_{}", ck);
let kp = vcrypto.generate_keypair();
(TypedKey::new(ck, kp.key), TypedSecret::new(ck, kp.secret))
};
info!("Node Id: {}", node_id);
// Save the node id / secret in storage
config_table
.store_json(0, table_key_node_id.as_bytes(), &node_id)
.await?;
config_table
.store_json(0, table_key_node_id_secret.as_bytes(), &node_id_secret)
.await?;
Ok((node_id, node_id_secret))
}
/// Get the node id from config if one is specified.
/// Must be done -after- protected store startup.
#[cfg_attr(test, allow(unused_variables))]
pub(crate) async fn init_node_ids(&self) -> VeilidAPIResult<()> {
let mut out_node_id = TypedKeyGroup::new();
let mut out_node_id_secret = TypedSecretGroup::new();
for ck in VALID_CRYPTO_KINDS {
let vcrypto = self
.get(ck)
.expect("Valid crypto kind is not actually valid.");
#[cfg(test)]
let (node_id, node_id_secret) = {
let kp = vcrypto.generate_keypair();
(TypedKey::new(ck, kp.key), TypedSecret::new(ck, kp.secret))
};
#[cfg(not(test))]
let (node_id, node_id_secret) = self.init_node_id(vcrypto).await?;
// Save for config
out_node_id.add(node_id);
out_node_id_secret.add(node_id_secret);
}
// Commit back to config
self.config().try_with_mut(|c| {
c.network.routing_table.node_id = out_node_id;
c.network.routing_table.node_id_secret = out_node_id_secret;
Ok(())
})?;
Ok(())
}
} }

View File

@ -49,14 +49,13 @@ fn is_bytes_eq_32(a: &[u8], v: u8) -> bool {
} }
/// None CryptoSystem /// None CryptoSystem
#[derive(Clone)]
pub struct CryptoSystemNONE { pub struct CryptoSystemNONE {
crypto: Crypto, registry: VeilidComponentRegistry,
} }
impl CryptoSystemNONE { impl CryptoSystemNONE {
pub fn new(crypto: Crypto) -> Self { pub fn new(registry: VeilidComponentRegistry) -> Self {
Self { crypto } Self { registry }
} }
} }
@ -66,13 +65,13 @@ impl CryptoSystem for CryptoSystemNONE {
CRYPTO_KIND_NONE CRYPTO_KIND_NONE
} }
fn crypto(&self) -> Crypto { fn crypto(&self) -> VeilidComponentGuard<'_, Crypto> {
self.crypto.clone() self.registry().lookup::<Crypto>().unwrap()
} }
// Cached Operations // Cached Operations
fn cached_dh(&self, key: &PublicKey, secret: &SecretKey) -> VeilidAPIResult<SharedSecret> { fn cached_dh(&self, key: &PublicKey, secret: &SecretKey) -> VeilidAPIResult<SharedSecret> {
self.crypto self.crypto()
.cached_dh_internal::<CryptoSystemNONE>(self, key, secret) .cached_dh_internal::<CryptoSystemNONE>(self, key, secret)
} }

View File

@ -68,7 +68,7 @@ impl Receipt {
} }
#[instrument(level = "trace", target = "receipt", skip_all, err)] #[instrument(level = "trace", target = "receipt", skip_all, err)]
pub fn from_signed_data(crypto: Crypto, data: &[u8]) -> VeilidAPIResult<Receipt> { pub fn from_signed_data(crypto: &Crypto, data: &[u8]) -> VeilidAPIResult<Receipt> {
// Ensure we are at least the length of the envelope // Ensure we are at least the length of the envelope
if data.len() < MIN_RECEIPT_SIZE { if data.len() < MIN_RECEIPT_SIZE {
apibail_parse_error!("receipt too small", data.len()); apibail_parse_error!("receipt too small", data.len());
@ -157,7 +157,7 @@ impl Receipt {
} }
#[instrument(level = "trace", target = "receipt", skip_all, err)] #[instrument(level = "trace", target = "receipt", skip_all, err)]
pub fn to_signed_data(&self, crypto: Crypto, secret: &SecretKey) -> VeilidAPIResult<Vec<u8>> { pub fn to_signed_data(&self, crypto: &Crypto, secret: &SecretKey) -> VeilidAPIResult<Vec<u8>> {
// Ensure extra data isn't too long // Ensure extra data isn't too long
let receipt_size: usize = self.extra_data.len() + MIN_RECEIPT_SIZE; let receipt_size: usize = self.extra_data.len() + MIN_RECEIPT_SIZE;
if receipt_size > MAX_RECEIPT_SIZE { if receipt_size > MAX_RECEIPT_SIZE {

View File

@ -2,7 +2,7 @@ use super::*;
static LOREM_IPSUM:&[u8] = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. "; static LOREM_IPSUM:&[u8] = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. ";
pub async fn test_aead(vcrypto: CryptoSystemVersion) { pub async fn test_aead(vcrypto: &CryptoSystemGuard<'_>) {
trace!("test_aead"); trace!("test_aead");
let n1 = vcrypto.random_nonce(); let n1 = vcrypto.random_nonce();
@ -82,7 +82,7 @@ pub async fn test_aead(vcrypto: CryptoSystemVersion) {
assert_eq!(body5, body7); assert_eq!(body5, body7);
} }
pub async fn test_no_auth(vcrypto: CryptoSystemVersion) { pub async fn test_no_auth(vcrypto: &CryptoSystemGuard<'_>) {
trace!("test_no_auth"); trace!("test_no_auth");
let n1 = vcrypto.random_nonce(); let n1 = vcrypto.random_nonce();
@ -136,7 +136,7 @@ pub async fn test_no_auth(vcrypto: CryptoSystemVersion) {
assert_eq!(body5, body7); assert_eq!(body5, body7);
} }
pub async fn test_dh(vcrypto: CryptoSystemVersion) { pub async fn test_dh(vcrypto: &CryptoSystemGuard<'_>) {
trace!("test_dh"); trace!("test_dh");
let (dht_key, dht_key_secret) = vcrypto.generate_keypair().into_split(); let (dht_key, dht_key_secret) = vcrypto.generate_keypair().into_split();
assert!(vcrypto.validate_keypair(&dht_key, &dht_key_secret)); assert!(vcrypto.validate_keypair(&dht_key, &dht_key_secret));
@ -164,7 +164,7 @@ pub async fn test_dh(vcrypto: CryptoSystemVersion) {
trace!("cached_dh: {:?}", r5); trace!("cached_dh: {:?}", r5);
} }
pub async fn test_generation(vcrypto: CryptoSystemVersion) { pub async fn test_generation(vcrypto: &CryptoSystemGuard<'_>) {
let b1 = vcrypto.random_bytes(32); let b1 = vcrypto.random_bytes(32);
let b2 = vcrypto.random_bytes(32); let b2 = vcrypto.random_bytes(32);
assert_ne!(b1, b2); assert_ne!(b1, b2);
@ -231,10 +231,10 @@ pub async fn test_all() {
// Test versions // Test versions
for v in VALID_CRYPTO_KINDS { for v in VALID_CRYPTO_KINDS {
let vcrypto = crypto.get(v).unwrap(); let vcrypto = crypto.get(v).unwrap();
test_aead(vcrypto.clone()).await; test_aead(&vcrypto).await;
test_no_auth(vcrypto.clone()).await; test_no_auth(&vcrypto).await;
test_dh(vcrypto.clone()).await; test_dh(&vcrypto).await;
test_generation(vcrypto).await; test_generation(&vcrypto).await;
} }
crypto_tests_shutdown(api.clone()).await; crypto_tests_shutdown(api.clone()).await;

View File

@ -2,9 +2,10 @@ use super::*;
pub async fn test_envelope_round_trip( pub async fn test_envelope_round_trip(
envelope_version: EnvelopeVersion, envelope_version: EnvelopeVersion,
vcrypto: CryptoSystemVersion, vcrypto: &CryptoSystemGuard<'_>,
network_key: Option<SharedSecret>, network_key: Option<SharedSecret>,
) { ) {
let crypto = vcrypto.crypto();
if network_key.is_some() { if network_key.is_some() {
info!( info!(
"--- test envelope round trip {} w/network key ---", "--- test envelope round trip {} w/network key ---",
@ -33,15 +34,15 @@ pub async fn test_envelope_round_trip(
// Serialize to bytes // Serialize to bytes
let enc_data = envelope let enc_data = envelope
.to_encrypted_data(vcrypto.crypto(), body, &sender_secret, &network_key) .to_encrypted_data(&crypto, body, &sender_secret, &network_key)
.expect("failed to encrypt data"); .expect("failed to encrypt data");
// Deserialize from bytes // Deserialize from bytes
let envelope2 = Envelope::from_signed_data(vcrypto.crypto(), &enc_data, &network_key) let envelope2 = Envelope::from_signed_data(&crypto, &enc_data, &network_key)
.expect("failed to deserialize envelope from data"); .expect("failed to deserialize envelope from data");
let body2 = envelope2 let body2 = envelope2
.decrypt_body(vcrypto.crypto(), &enc_data, &recipient_secret, &network_key) .decrypt_body(&crypto, &enc_data, &recipient_secret, &network_key)
.expect("failed to decrypt envelope body"); .expect("failed to decrypt envelope body");
// Compare envelope and body // Compare envelope and body
@ -53,21 +54,22 @@ pub async fn test_envelope_round_trip(
let mut mod_enc_data = enc_data.clone(); let mut mod_enc_data = enc_data.clone();
mod_enc_data[enc_data_len - 1] ^= 0x80u8; mod_enc_data[enc_data_len - 1] ^= 0x80u8;
assert!( assert!(
Envelope::from_signed_data(vcrypto.crypto(), &mod_enc_data, &network_key).is_err(), Envelope::from_signed_data(&crypto, &mod_enc_data, &network_key).is_err(),
"should have failed to decode envelope with modified signature" "should have failed to decode envelope with modified signature"
); );
let mut mod_enc_data2 = enc_data.clone(); let mut mod_enc_data2 = enc_data.clone();
mod_enc_data2[enc_data_len - 65] ^= 0x80u8; mod_enc_data2[enc_data_len - 65] ^= 0x80u8;
assert!( assert!(
Envelope::from_signed_data(vcrypto.crypto(), &mod_enc_data2, &network_key).is_err(), Envelope::from_signed_data(&crypto, &mod_enc_data2, &network_key).is_err(),
"should have failed to decode envelope with modified data" "should have failed to decode envelope with modified data"
); );
} }
pub async fn test_receipt_round_trip( pub async fn test_receipt_round_trip(
envelope_version: EnvelopeVersion, envelope_version: EnvelopeVersion,
vcrypto: CryptoSystemVersion, vcrypto: &CryptoSystemGuard<'_>,
) { ) {
let crypto = vcrypto.crypto();
info!("--- test receipt round trip ---"); info!("--- test receipt round trip ---");
// Create arbitrary body // Create arbitrary body
let body = b"This is an arbitrary body"; let body = b"This is an arbitrary body";
@ -80,16 +82,16 @@ pub async fn test_receipt_round_trip(
// Serialize to bytes // Serialize to bytes
let mut enc_data = receipt let mut enc_data = receipt
.to_signed_data(vcrypto.crypto(), &sender_secret) .to_signed_data(&crypto, &sender_secret)
.expect("failed to make signed data"); .expect("failed to make signed data");
// Deserialize from bytes // Deserialize from bytes
let receipt2 = Receipt::from_signed_data(vcrypto.crypto(), &enc_data) let receipt2 = Receipt::from_signed_data(&crypto, &enc_data)
.expect("failed to deserialize envelope from data"); .expect("failed to deserialize envelope from data");
// Should not validate even when a single bit is changed // Should not validate even when a single bit is changed
enc_data[5] = 0x01; enc_data[5] = 0x01;
Receipt::from_signed_data(vcrypto.crypto(), &enc_data) Receipt::from_signed_data(&crypto, &enc_data)
.expect_err("should have failed to decrypt using wrong secret"); .expect_err("should have failed to decrypt using wrong secret");
// Compare receipts // Compare receipts
@ -105,10 +107,9 @@ pub async fn test_all() {
for v in VALID_CRYPTO_KINDS { for v in VALID_CRYPTO_KINDS {
let vcrypto = crypto.get(v).unwrap(); let vcrypto = crypto.get(v).unwrap();
test_envelope_round_trip(ev, vcrypto.clone(), None).await; test_envelope_round_trip(ev, &vcrypto, None).await;
test_envelope_round_trip(ev, vcrypto.clone(), Some(vcrypto.random_shared_secret())) test_envelope_round_trip(ev, &vcrypto, Some(vcrypto.random_shared_secret())).await;
.await; test_receipt_round_trip(ev, &vcrypto).await;
test_receipt_round_trip(ev, vcrypto).await;
} }
} }

View File

@ -6,7 +6,7 @@ static CHEEZBURGER: &str = "I can has cheezburger";
static EMPTY_KEY: [u8; PUBLIC_KEY_LENGTH] = [0u8; PUBLIC_KEY_LENGTH]; static EMPTY_KEY: [u8; PUBLIC_KEY_LENGTH] = [0u8; PUBLIC_KEY_LENGTH];
static EMPTY_KEY_SECRET: [u8; SECRET_KEY_LENGTH] = [0u8; SECRET_KEY_LENGTH]; static EMPTY_KEY_SECRET: [u8; SECRET_KEY_LENGTH] = [0u8; SECRET_KEY_LENGTH];
pub async fn test_generate_secret(vcrypto: CryptoSystemVersion) { pub async fn test_generate_secret(vcrypto: &CryptoSystemGuard<'_>) {
// Verify keys generate // Verify keys generate
let (dht_key, dht_key_secret) = vcrypto.generate_keypair().into_split(); let (dht_key, dht_key_secret) = vcrypto.generate_keypair().into_split();
let (dht_key2, dht_key_secret2) = vcrypto.generate_keypair().into_split(); let (dht_key2, dht_key_secret2) = vcrypto.generate_keypair().into_split();
@ -20,7 +20,7 @@ pub async fn test_generate_secret(vcrypto: CryptoSystemVersion) {
assert_ne!(dht_key_secret, dht_key_secret2); assert_ne!(dht_key_secret, dht_key_secret2);
} }
pub async fn test_sign_and_verify(vcrypto: CryptoSystemVersion) { pub async fn test_sign_and_verify(vcrypto: &CryptoSystemGuard<'_>) {
// Make two keys // Make two keys
let (dht_key, dht_key_secret) = vcrypto.generate_keypair().into_split(); let (dht_key, dht_key_secret) = vcrypto.generate_keypair().into_split();
let (dht_key2, dht_key_secret2) = vcrypto.generate_keypair().into_split(); let (dht_key2, dht_key_secret2) = vcrypto.generate_keypair().into_split();
@ -115,7 +115,7 @@ pub async fn test_sign_and_verify(vcrypto: CryptoSystemVersion) {
); );
} }
pub async fn test_key_conversions(vcrypto: CryptoSystemVersion) { pub async fn test_key_conversions(vcrypto: &CryptoSystemGuard<'_>) {
// Test default key // Test default key
let (dht_key, dht_key_secret) = (PublicKey::default(), SecretKey::default()); let (dht_key, dht_key_secret) = (PublicKey::default(), SecretKey::default());
assert_eq!(dht_key.bytes, EMPTY_KEY); assert_eq!(dht_key.bytes, EMPTY_KEY);
@ -185,7 +185,7 @@ pub async fn test_key_conversions(vcrypto: CryptoSystemVersion) {
.is_err()); .is_err());
} }
pub async fn test_encode_decode(vcrypto: CryptoSystemVersion) { pub async fn test_encode_decode(vcrypto: &CryptoSystemGuard<'_>) {
let dht_key = PublicKey::try_decode("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").unwrap(); let dht_key = PublicKey::try_decode("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").unwrap();
let dht_key_secret = let dht_key_secret =
SecretKey::try_decode("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").unwrap(); SecretKey::try_decode("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").unwrap();
@ -229,7 +229,7 @@ pub async fn test_encode_decode(vcrypto: CryptoSystemVersion) {
assert!(f2.is_err()); assert!(f2.is_err());
} }
pub async fn test_typed_convert(vcrypto: CryptoSystemVersion) { pub async fn test_typed_convert(vcrypto: &CryptoSystemGuard<'_>) {
let tks1 = format!( let tks1 = format!(
"{}:7lxDEabK_qgjbe38RtBa3IZLrud84P6NhGP-pRTZzdQ", "{}:7lxDEabK_qgjbe38RtBa3IZLrud84P6NhGP-pRTZzdQ",
vcrypto.kind() vcrypto.kind()
@ -261,7 +261,7 @@ pub async fn test_typed_convert(vcrypto: CryptoSystemVersion) {
assert!(tks6x.ends_with(&tks6)); assert!(tks6x.ends_with(&tks6));
} }
async fn test_hash(vcrypto: CryptoSystemVersion) { async fn test_hash(vcrypto: &CryptoSystemGuard<'_>) {
let mut s = BTreeSet::<PublicKey>::new(); let mut s = BTreeSet::<PublicKey>::new();
let k1 = vcrypto.generate_hash("abc".as_bytes()); let k1 = vcrypto.generate_hash("abc".as_bytes());
@ -301,7 +301,7 @@ async fn test_hash(vcrypto: CryptoSystemVersion) {
vcrypto.validate_hash(CHEEZBURGER.as_bytes(), &v6); vcrypto.validate_hash(CHEEZBURGER.as_bytes(), &v6);
} }
async fn test_operations(vcrypto: CryptoSystemVersion) { async fn test_operations(vcrypto: &CryptoSystemGuard<'_>) {
let k1 = vcrypto.generate_hash(LOREM_IPSUM.as_bytes()); let k1 = vcrypto.generate_hash(LOREM_IPSUM.as_bytes());
let k2 = vcrypto.generate_hash(CHEEZBURGER.as_bytes()); let k2 = vcrypto.generate_hash(CHEEZBURGER.as_bytes());
let k3 = vcrypto.generate_hash("abc".as_bytes()); let k3 = vcrypto.generate_hash("abc".as_bytes());
@ -395,13 +395,13 @@ pub async fn test_all() {
for v in VALID_CRYPTO_KINDS { for v in VALID_CRYPTO_KINDS {
let vcrypto = crypto.get(v).unwrap(); let vcrypto = crypto.get(v).unwrap();
test_generate_secret(vcrypto.clone()).await; test_generate_secret(&vcrypto).await;
test_sign_and_verify(vcrypto.clone()).await; test_sign_and_verify(&vcrypto).await;
test_key_conversions(vcrypto.clone()).await; test_key_conversions(&vcrypto).await;
test_encode_decode(vcrypto.clone()).await; test_encode_decode(&vcrypto).await;
test_typed_convert(vcrypto.clone()).await; test_typed_convert(&vcrypto).await;
test_hash(vcrypto.clone()).await; test_hash(&vcrypto).await;
test_operations(vcrypto).await; test_operations(&vcrypto).await;
} }
crypto_tests_shutdown(api.clone()).await; crypto_tests_shutdown(api.clone()).await;

View File

@ -47,14 +47,13 @@ pub fn vld0_generate_keypair() -> KeyPair {
} }
/// V0 CryptoSystem /// V0 CryptoSystem
#[derive(Clone)]
pub struct CryptoSystemVLD0 { pub struct CryptoSystemVLD0 {
crypto: Crypto, registry: VeilidComponentRegistry,
} }
impl CryptoSystemVLD0 { impl CryptoSystemVLD0 {
pub fn new(crypto: Crypto) -> Self { pub fn new(registry: VeilidComponentRegistry) -> Self {
Self { crypto } Self { registry }
} }
} }
@ -64,14 +63,14 @@ impl CryptoSystem for CryptoSystemVLD0 {
CRYPTO_KIND_VLD0 CRYPTO_KIND_VLD0
} }
fn crypto(&self) -> Crypto { fn crypto(&self) -> VeilidComponentGuard<'_, Crypto> {
self.crypto.clone() self.registry.lookup::<Crypto>().unwrap()
} }
// Cached Operations // Cached Operations
#[instrument(level = "trace", skip_all)] #[instrument(level = "trace", skip_all)]
fn cached_dh(&self, key: &PublicKey, secret: &SecretKey) -> VeilidAPIResult<SharedSecret> { fn cached_dh(&self, key: &PublicKey, secret: &SecretKey) -> VeilidAPIResult<SharedSecret> {
self.crypto self.crypto()
.cached_dh_internal::<CryptoSystemVLD0>(self, key, secret) .cached_dh_internal::<CryptoSystemVLD0>(self, key, secret)
} }

View File

@ -4,31 +4,37 @@ struct BlockStoreInner {
// //
} }
#[derive(Clone)] impl fmt::Debug for BlockStoreInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BlockStoreInner").finish()
}
}
#[derive(Clone, Debug)]
pub struct BlockStore { pub struct BlockStore {
event_bus: EventBus, registry: VeilidComponentRegistry,
config: VeilidConfig,
inner: Arc<Mutex<BlockStoreInner>>, inner: Arc<Mutex<BlockStoreInner>>,
} }
impl_veilid_component!(BlockStore);
impl BlockStore { impl BlockStore {
fn new_inner() -> BlockStoreInner { fn new_inner() -> BlockStoreInner {
BlockStoreInner {} BlockStoreInner {}
} }
pub fn new(event_bus: EventBus, config: VeilidConfig) -> Self { pub fn new(registry: VeilidComponentRegistry) -> Self {
Self { Self {
event_bus, registry,
config,
inner: Arc::new(Mutex::new(Self::new_inner())), inner: Arc::new(Mutex::new(Self::new_inner())),
} }
} }
pub async fn init(&self) -> EyreResult<()> { async fn init_async(&self) -> EyreResult<()> {
// Ensure permissions are correct // Ensure permissions are correct
// ensure_file_private_owner(&dbpath)?; // ensure_file_private_owner(&dbpath)?;
Ok(()) Ok(())
} }
pub async fn terminate(&self) {} async fn terminate_async(&self) {}
} }

View File

@ -6,14 +6,20 @@ use std::path::Path;
pub struct ProtectedStoreInner { pub struct ProtectedStoreInner {
keyring_manager: Option<KeyringManager>, keyring_manager: Option<KeyringManager>,
} }
impl fmt::Debug for ProtectedStoreInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ProtectedStoreInner").finish()
}
}
#[derive(Clone)] #[derive(Debug)]
pub struct ProtectedStore { pub struct ProtectedStore {
_event_bus: EventBus, registry: VeilidComponentRegistry,
config: VeilidConfig,
inner: Arc<Mutex<ProtectedStoreInner>>, inner: Arc<Mutex<ProtectedStoreInner>>,
} }
impl_veilid_component!(ProtectedStore);
impl ProtectedStore { impl ProtectedStore {
fn new_inner() -> ProtectedStoreInner { fn new_inner() -> ProtectedStoreInner {
ProtectedStoreInner { ProtectedStoreInner {
@ -21,10 +27,9 @@ impl ProtectedStore {
} }
} }
pub fn new(event_bus: EventBus, config: VeilidConfig) -> Self { pub fn new(registry: VeilidComponentRegistry) -> Self {
Self { Self {
_event_bus: event_bus, registry,
config,
inner: Arc::new(Mutex::new(Self::new_inner())), inner: Arc::new(Mutex::new(Self::new_inner())),
} }
} }
@ -42,9 +47,10 @@ impl ProtectedStore {
} }
#[instrument(level = "debug", skip(self), err)] #[instrument(level = "debug", skip(self), err)]
pub async fn init(&self) -> EyreResult<()> { async fn init_async(&self) -> EyreResult<()> {
let delete = { let delete = {
let c = self.config.get(); let config = self.config();
let c = config.get();
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
if !c.protected_store.always_use_insecure_storage { if !c.protected_store.always_use_insecure_storage {
// Attempt to open the secure keyring // Attempt to open the secure keyring
@ -102,12 +108,13 @@ impl ProtectedStore {
} }
#[instrument(level = "debug", skip(self))] #[instrument(level = "debug", skip(self))]
pub async fn terminate(&self) { async fn terminate_async(&self) {
*self.inner.lock() = Self::new_inner(); *self.inner.lock() = Self::new_inner();
} }
fn service_name(&self) -> String { fn service_name(&self) -> String {
let c = self.config.get(); let config = self.config();
let c = config.get();
if c.namespace.is_empty() { if c.namespace.is_empty() {
"veilid_protected_store".to_owned() "veilid_protected_store".to_owned()
} else { } else {

View File

@ -4,28 +4,34 @@ struct BlockStoreInner {
// //
} }
#[derive(Clone)] impl fmt::Debug for BlockStoreInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BlockStoreInner").finish()
}
}
#[derive(Debug)]
pub struct BlockStore { pub struct BlockStore {
event_bus: EventBus, registry: VeilidComponentRegistry,
config: VeilidConfig,
inner: Arc<Mutex<BlockStoreInner>>, inner: Arc<Mutex<BlockStoreInner>>,
} }
impl_veilid_component!(BlockStore);
impl BlockStore { impl BlockStore {
fn new_inner() -> BlockStoreInner { fn new_inner() -> BlockStoreInner {
BlockStoreInner {} BlockStoreInner {}
} }
pub fn new(event_bus: EventBus, config: VeilidConfig) -> Self { pub fn new(registry: VeilidComponentRegistry) -> Self {
Self { Self {
event_bus, registry,
config,
inner: Arc::new(Mutex::new(Self::new_inner())), inner: Arc::new(Mutex::new(Self::new_inner())),
} }
} }
pub async fn init(&self) -> EyreResult<()> { async fn init_async(&self) -> EyreResult<()> {
Ok(()) Ok(())
} }
pub async fn terminate(&self) {} async fn terminate_async(&self) {}
} }

View File

@ -3,18 +3,16 @@ use data_encoding::BASE64URL_NOPAD;
use web_sys::*; use web_sys::*;
#[derive(Clone)] #[derive(Debug)]
pub struct ProtectedStore { pub struct ProtectedStore {
_event_bus: EventBus, registry: VeilidComponentRegistry,
config: VeilidConfig,
} }
impl_veilid_component!(ProtectedStore);
impl ProtectedStore { impl ProtectedStore {
pub fn new(event_bus: EventBus, config: VeilidConfig) -> Self { pub fn new(registry: VeilidComponentRegistry) -> Self {
Self { Self { registry }
_event_bus: event_bus,
config,
}
} }
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
@ -38,7 +36,7 @@ impl ProtectedStore {
pub async fn terminate(&self) {} pub async fn terminate(&self) {}
fn browser_key_name(&self, key: &str) -> String { fn browser_key_name(&self, key: &str) -> String {
let c = self.config.get(); let c = self.config();
if c.namespace.is_empty() { if c.namespace.is_empty() {
format!("__veilid_protected_store_{}", key) format!("__veilid_protected_store_{}", key)
} else { } else {

View File

@ -45,6 +45,7 @@ cfg_if::cfg_if! {
extern crate alloc; extern crate alloc;
mod attachment_manager; mod attachment_manager;
mod component;
mod core_context; mod core_context;
mod crypto; mod crypto;
mod intf; mod intf;
@ -58,6 +59,8 @@ mod veilid_api;
mod veilid_config; mod veilid_config;
mod wasm_helpers; mod wasm_helpers;
pub(crate) use self::component::*;
pub(crate) use self::core_context::RegisteredComponents;
pub use self::core_context::{api_startup, api_startup_config, api_startup_json, UpdateCallback}; pub use self::core_context::{api_startup, api_startup_config, api_startup_json, UpdateCallback};
pub use self::logging::{ pub use self::logging::{
ApiTracingLayer, VeilidLayerFilter, DEFAULT_LOG_FACILITIES_ENABLED_LIST, ApiTracingLayer, VeilidLayerFilter, DEFAULT_LOG_FACILITIES_ENABLED_LIST,

View File

@ -3,21 +3,12 @@ use super::*;
pub mod test_serialize_routing_table; pub mod test_serialize_routing_table;
pub(crate) fn mock_routing_table() -> routing_table::RoutingTable { pub(crate) fn mock_routing_table() -> routing_table::RoutingTable {
let event_bus = EventBus::new(); let veilid_config =
let veilid_config = VeilidConfig::new(); VeilidConfig::new_from_config(VeilidConfigInner::default(), Arc::new(|_| {}));
#[cfg(feature = "unstable-blockstore")] let registry = VeilidComponentRegistry::new(veilid_config);
let block_store = BlockStore::new(event_bus.clone(), veilid_config.clone()); registry.register(ProtectedStore::new);
let protected_store = ProtectedStore::new(event_bus.clone(), veilid_config.clone()); registry.register(TableStore::new);
let table_store = TableStore::new( registry.register(Crypto::new);
event_bus.clone(),
veilid_config.clone(),
protected_store.clone(),
);
let crypto = Crypto::new(
event_bus.clone(),
veilid_config.clone(),
table_store.clone(),
);
let storage_manager = storage_manager::StorageManager::new( let storage_manager = storage_manager::StorageManager::new(
event_bus.clone(), event_bus.clone(),
veilid_config.clone(), veilid_config.clone(),

View File

@ -38,15 +38,14 @@ impl StorageManager {
let routing_domain = RoutingDomain::PublicInternet; let routing_domain = RoutingDomain::PublicInternet;
// Get the DHT parameters for 'GetValue' // Get the DHT parameters for 'GetValue'
let (key_count, consensus_count, fanout, timeout_us) = { let (key_count, consensus_count, fanout, timeout_us) = self.config().with(|c| {
let c = self.unlocked_inner.config.get();
( (
c.network.dht.max_find_node_count as usize, c.network.dht.max_find_node_count as usize,
c.network.dht.get_value_count as usize, c.network.dht.get_value_count as usize,
c.network.dht.get_value_fanout as usize, c.network.dht.get_value_fanout as usize,
TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)), TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)),
) )
}; });
// Get the nodes we know are caching this value to seed the fanout // Get the nodes we know are caching this value to seed the fanout
let init_fanout_queue = { let init_fanout_queue = {

View File

@ -64,9 +64,7 @@ impl StorageManager {
// Get the DHT parameters for 'InspectValue' // Get the DHT parameters for 'InspectValue'
// Can use either 'get scope' or 'set scope' depending on the purpose of the inspection // Can use either 'get scope' or 'set scope' depending on the purpose of the inspection
let (key_count, consensus_count, fanout, timeout_us) = { let (key_count, consensus_count, fanout, timeout_us) = self.config().with(|c| {
let c = self.unlocked_inner.config.get();
if use_set_scope { if use_set_scope {
( (
c.network.dht.max_find_node_count as usize, c.network.dht.max_find_node_count as usize,
@ -82,7 +80,7 @@ impl StorageManager {
TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)), TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)),
) )
} }
}; });
// Get the nodes we know are caching this value to seed the fanout // Get the nodes we know are caching this value to seed the fanout
let init_fanout_queue = { let init_fanout_queue = {

View File

@ -44,13 +44,6 @@ struct ValueChangedInfo {
} }
struct StorageManagerUnlockedInner { struct StorageManagerUnlockedInner {
_event_bus: EventBus,
config: VeilidConfig,
crypto: Crypto,
table_store: TableStore,
#[cfg(feature = "unstable-blockstore")]
block_store: BlockStore,
// Background processes // Background processes
flush_record_stores_task: TickTask<EyreReport>, flush_record_stores_task: TickTask<EyreReport>,
offline_subkey_writes_task: TickTask<EyreReport>, offline_subkey_writes_task: TickTask<EyreReport>,
@ -62,20 +55,36 @@ struct StorageManagerUnlockedInner {
anonymous_watch_keys: TypedKeyPairGroup, anonymous_watch_keys: TypedKeyPairGroup,
} }
#[derive(Clone)] impl fmt::Debug for StorageManagerUnlockedInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StorageManagerUnlockedInner")
// .field("flush_record_stores_task", &self.flush_record_stores_task)
// .field(
// "offline_subkey_writes_task",
// &self.offline_subkey_writes_task,
// )
// .field("send_value_changes_task", &self.send_value_changes_task)
// .field("check_active_watches_task", &self.check_active_watches_task)
// .field(
// "check_watched_records_task",
// &self.check_watched_records_task,
// )
.field("anonymous_watch_keys", &self.anonymous_watch_keys)
.finish()
}
}
#[derive(Clone, Debug)]
pub(crate) struct StorageManager { pub(crate) struct StorageManager {
registry: VeilidComponentRegistry,
unlocked_inner: Arc<StorageManagerUnlockedInner>, unlocked_inner: Arc<StorageManagerUnlockedInner>,
inner: Arc<AsyncMutex<StorageManagerInner>>, inner: Arc<AsyncMutex<StorageManagerInner>>,
} }
impl_veilid_component!(StorageManager);
impl StorageManager { impl StorageManager {
fn new_unlocked_inner( fn new_unlocked_inner(crypto: Crypto) -> StorageManagerUnlockedInner {
event_bus: EventBus,
config: VeilidConfig,
crypto: Crypto,
table_store: TableStore,
#[cfg(feature = "unstable-blockstore")] block_store: BlockStore,
) -> StorageManagerUnlockedInner {
// Generate keys to use for anonymous watches // Generate keys to use for anonymous watches
let mut anonymous_watch_keys = TypedKeyPairGroup::new(); let mut anonymous_watch_keys = TypedKeyPairGroup::new();
for ck in VALID_CRYPTO_KINDS { for ck in VALID_CRYPTO_KINDS {
@ -85,12 +94,6 @@ impl StorageManager {
} }
StorageManagerUnlockedInner { StorageManagerUnlockedInner {
_event_bus: event_bus,
config,
crypto,
table_store,
#[cfg(feature = "unstable-blockstore")]
block_store,
flush_record_stores_task: TickTask::new( flush_record_stores_task: TickTask::new(
"flush_record_stores_task", "flush_record_stores_task",
FLUSH_RECORD_STORES_INTERVAL_SECS, FLUSH_RECORD_STORES_INTERVAL_SECS,
@ -119,22 +122,11 @@ impl StorageManager {
StorageManagerInner::new(unlocked_inner) StorageManagerInner::new(unlocked_inner)
} }
pub fn new( pub fn new(registry: VeilidComponentRegistry) -> StorageManager {
event_bus: EventBus, let crypto = registry.lookup::<Crypto>().unwrap();
config: VeilidConfig, let unlocked_inner = Arc::new(Self::new_unlocked_inner(crypto));
crypto: Crypto,
table_store: TableStore,
#[cfg(feature = "unstable-blockstore")] block_store: BlockStore,
) -> StorageManager {
let unlocked_inner = Arc::new(Self::new_unlocked_inner(
event_bus,
config,
crypto,
table_store,
#[cfg(feature = "unstable-blockstore")]
block_store,
));
let this = StorageManager { let this = StorageManager {
registry,
unlocked_inner: unlocked_inner.clone(), unlocked_inner: unlocked_inner.clone(),
inner: Arc::new(AsyncMutex::new(Self::new_inner(unlocked_inner))), inner: Arc::new(AsyncMutex::new(Self::new_inner(unlocked_inner))),
}; };
@ -145,17 +137,17 @@ impl StorageManager {
} }
#[instrument(level = "debug", skip_all, err)] #[instrument(level = "debug", skip_all, err)]
pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> { async fn init_async(&self) -> EyreResult<()> {
log_stor!(debug "startup storage manager"); log_stor!(debug "startup storage manager");
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
inner.init(self.clone(), update_callback).await?; inner.init(self.clone()).await?;
Ok(()) Ok(())
} }
#[instrument(level = "debug", skip_all)] #[instrument(level = "debug", skip_all)]
pub async fn terminate(&self) { async fn terminate_async(&self) {
log_stor!(debug "starting storage manager shutdown"); log_stor!(debug "starting storage manager shutdown");
// Stop the background ticker process // Stop the background ticker process
@ -177,16 +169,6 @@ impl StorageManager {
log_stor!(debug "finished storage manager shutdown"); log_stor!(debug "finished storage manager shutdown");
} }
pub async fn set_rpc_processor(&self, opt_rpc_processor: Option<RPCProcessor>) {
let mut inner = self.inner.lock().await;
inner.opt_rpc_processor = opt_rpc_processor
}
pub async fn set_routing_table(&self, opt_routing_table: Option<RoutingTable>) {
let mut inner = self.inner.lock().await;
inner.opt_routing_table = opt_routing_table
}
async fn lock(&self) -> VeilidAPIResult<AsyncMutexGuardArc<StorageManagerInner>> { async fn lock(&self) -> VeilidAPIResult<AsyncMutexGuardArc<StorageManagerInner>> {
let inner = asyncmutex_lock_arc!(&self.inner); let inner = asyncmutex_lock_arc!(&self.inner);
if !inner.initialized { if !inner.initialized {
@ -504,7 +486,7 @@ impl StorageManager {
let mut inner = self.lock().await?; let mut inner = self.lock().await?;
// Get cryptosystem // Get cryptosystem
let Some(vcrypto) = self.unlocked_inner.crypto.get(key.kind) else { let Some(vcrypto) = self.crypto().get(key.kind) else {
apibail_generic!("unsupported cryptosystem"); apibail_generic!("unsupported cryptosystem");
}; };
@ -724,13 +706,12 @@ impl StorageManager {
opened_record.clear_active_watch(); opened_record.clear_active_watch();
// Get the minimum expiration timestamp we will accept // Get the minimum expiration timestamp we will accept
let (rpc_timeout_us, max_watch_expiration_us) = { let (rpc_timeout_us, max_watch_expiration_us) = self.config().with(|c| {
let c = self.unlocked_inner.config.get();
( (
TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms)), TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms)),
TimestampDuration::from(ms_to_us(c.network.dht.max_watch_expiration_ms)), TimestampDuration::from(ms_to_us(c.network.dht.max_watch_expiration_ms)),
) )
}; });
let cur_ts = get_timestamp(); let cur_ts = get_timestamp();
let min_expiration_ts = cur_ts + rpc_timeout_us.as_u64(); let min_expiration_ts = cur_ts + rpc_timeout_us.as_u64();
let max_expiration_ts = if expiration.as_u64() == 0 { let max_expiration_ts = if expiration.as_u64() == 0 {
@ -1013,8 +994,9 @@ impl StorageManager {
match fanout_result.kind { match fanout_result.kind {
FanoutResultKind::Partial => false, FanoutResultKind::Partial => false,
FanoutResultKind::Timeout => { FanoutResultKind::Timeout => {
let get_consensus = let get_consensus = self
self.unlocked_inner.config.get().network.dht.get_value_count as usize; .config()
.with(|c| c.network.dht.get_value_count as usize);
let value_node_count = fanout_result.value_nodes.len(); let value_node_count = fanout_result.value_nodes.len();
if value_node_count < get_consensus { if value_node_count < get_consensus {
log_stor!(debug "timeout with insufficient consensus ({}<{}), adding offline subkey: {}:{}", log_stor!(debug "timeout with insufficient consensus ({}<{}), adding offline subkey: {}:{}",
@ -1029,8 +1011,9 @@ impl StorageManager {
} }
} }
FanoutResultKind::Exhausted => { FanoutResultKind::Exhausted => {
let get_consensus = let get_consensus = self
self.unlocked_inner.config.get().network.dht.get_value_count as usize; .config()
.with(|c| c.network.dht.get_value_count as usize);
let value_node_count = fanout_result.value_nodes.len(); let value_node_count = fanout_result.value_nodes.len();
if value_node_count < get_consensus { if value_node_count < get_consensus {
log_stor!(debug "exhausted with insufficient consensus ({}<{}), adding offline subkey: {}:{}", log_stor!(debug "exhausted with insufficient consensus ({}<{}), adding offline subkey: {}:{}",

View File

@ -20,6 +20,7 @@ impl InspectCacheL2 {
} }
} }
#[derive(Debug)]
pub struct InspectCache { pub struct InspectCache {
cache: LruCache<TypedKey, InspectCacheL2>, cache: LruCache<TypedKey, InspectCacheL2>,
} }

View File

@ -80,6 +80,31 @@ where
purge_dead_records_mutex: Arc<AsyncMutex<()>>, purge_dead_records_mutex: Arc<AsyncMutex<()>>,
} }
impl<D> fmt::Debug for RecordStore<D>
where
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RecordStore")
//.field("table_store", &self.table_store)
.field("name", &self.name)
.field("limits", &self.limits)
.field("record_table", &self.record_table)
.field("subkey_table", &self.subkey_table)
.field("record_index", &self.record_index)
.field("subkey_cache", &self.subkey_cache)
.field("inspect_cache", &self.inspect_cache)
.field("subkey_cache_total_size", &self.subkey_cache_total_size)
.field("total_storage_space", &self.total_storage_space)
.field("dead_records", &self.dead_records)
.field("changed_records", &self.changed_records)
.field("watched_records", &self.watched_records)
.field("changed_watched_values", &self.changed_watched_values)
.field("purge_dead_records_mutex", &self.purge_dead_records_mutex)
.finish()
}
}
/// The result of the do_get_value_operation /// The result of the do_get_value_operation
#[derive(Default, Clone, Debug)] #[derive(Default, Clone, Debug)]
pub struct GetResult { pub struct GetResult {

View File

@ -39,16 +39,16 @@ impl StorageManager {
let routing_domain = RoutingDomain::PublicInternet; let routing_domain = RoutingDomain::PublicInternet;
// Get the DHT parameters for 'SetValue' // Get the DHT parameters for 'SetValue'
let (key_count, get_consensus_count, set_consensus_count, fanout, timeout_us) = { let (key_count, get_consensus_count, set_consensus_count, fanout, timeout_us) =
let c = self.unlocked_inner.config.get(); self.config().with(|c| {
( (
c.network.dht.max_find_node_count as usize, c.network.dht.max_find_node_count as usize,
c.network.dht.get_value_count as usize, c.network.dht.get_value_count as usize,
c.network.dht.set_value_count as usize, c.network.dht.set_value_count as usize,
c.network.dht.set_value_fanout as usize, c.network.dht.set_value_fanout as usize,
TimestampDuration::from(ms_to_us(c.network.dht.set_value_timeout_ms)), TimestampDuration::from(ms_to_us(c.network.dht.set_value_timeout_ms)),
) )
}; });
// Get the nodes we know are caching this value to seed the fanout // Get the nodes we know are caching this value to seed the fanout
let init_fanout_queue = { let init_fanout_queue = {

View File

@ -26,10 +26,6 @@ pub(super) struct StorageManagerInner {
pub offline_subkey_writes: HashMap<TypedKey, OfflineSubkeyWrite>, pub offline_subkey_writes: HashMap<TypedKey, OfflineSubkeyWrite>,
/// Storage manager metadata that is persistent, including copy of offline subkey writes /// Storage manager metadata that is persistent, including copy of offline subkey writes
pub metadata_db: Option<TableDB>, pub metadata_db: Option<TableDB>,
/// RPC processor if it is available
pub opt_rpc_processor: Option<RPCProcessor>,
/// Routing table if it is available
pub opt_routing_table: Option<RoutingTable>,
/// Background processing task (not part of attachment manager tick tree so it happens when detached too) /// Background processing task (not part of attachment manager tick tree so it happens when detached too)
pub tick_future: Option<SendPinBoxFuture<()>>, pub tick_future: Option<SendPinBoxFuture<()>>,
/// Update callback to send ValueChanged notification to /// Update callback to send ValueChanged notification to
@ -41,6 +37,24 @@ pub(super) struct StorageManagerInner {
set_consensus_count: usize, set_consensus_count: usize,
} }
impl fmt::Debug for StorageManagerInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StorageManagerInner")
// .field("unlocked_inner", &self.unlocked_inner)
.field("initialized", &self.initialized)
.field("opened_records", &self.opened_records)
.field("local_record_store", &self.local_record_store)
.field("remote_record_store", &self.remote_record_store)
.field("offline_subkey_writes", &self.offline_subkey_writes)
//.field("metadata_db", &self.metadata_db)
//.field("tick_future", &self.tick_future)
//.field("update_callback", &self.update_callback)
.field("deferred_result_processor", &self.deferred_result_processor)
.field("set_consensus_count", &self.set_consensus_count)
.finish()
}
}
fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
let c = config.get(); let c = config.get();
RecordStoreLimits { RecordStoreLimits {
@ -89,7 +103,7 @@ impl StorageManagerInner {
offline_subkey_writes: Default::default(), offline_subkey_writes: Default::default(),
metadata_db: Default::default(), metadata_db: Default::default(),
opt_rpc_processor: Default::default(), opt_rpc_processor: Default::default(),
opt_routing_table: Default::default(), //opt_routing_table: Default::default(),
tick_future: Default::default(), tick_future: Default::default(),
update_callback: None, update_callback: None,
deferred_result_processor: DeferredStreamProcessor::default(), deferred_result_processor: DeferredStreamProcessor::default(),

View File

@ -213,14 +213,13 @@ impl StorageManager {
let routing_domain = RoutingDomain::PublicInternet; let routing_domain = RoutingDomain::PublicInternet;
// Get the DHT parameters for 'WatchValue', some of which are the same for 'SetValue' operations // Get the DHT parameters for 'WatchValue', some of which are the same for 'SetValue' operations
let (key_count, timeout_us, set_value_count) = { let (key_count, timeout_us, set_value_count) = self.config().with(|c| {
let c = self.unlocked_inner.config.get();
( (
c.network.dht.max_find_node_count as usize, c.network.dht.max_find_node_count as usize,
TimestampDuration::from(ms_to_us(c.network.dht.set_value_timeout_ms)), TimestampDuration::from(ms_to_us(c.network.dht.set_value_timeout_ms)),
c.network.dht.set_value_count as usize, c.network.dht.set_value_count as usize,
) )
}; });
// Get the appropriate watcher key, if anonymous use a static anonymous watch key // Get the appropriate watcher key, if anonymous use a static anonymous watch key
// which lives for the duration of the app's runtime // which lives for the duration of the app's runtime

View File

@ -70,21 +70,41 @@ struct TableStoreInner {
encryption_key: Option<TypedSharedSecret>, encryption_key: Option<TypedSharedSecret>,
all_table_names: HashMap<String, String>, all_table_names: HashMap<String, String>,
all_tables_db: Option<Database>, all_tables_db: Option<Database>,
crypto: Option<Crypto>, }
impl fmt::Debug for TableStoreInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TableStoreInner")
.field("opened", &self.opened)
.field("encryption_key", &self.encryption_key)
.field("all_table_names", &self.all_table_names)
//.field("all_tables_db", &self.all_tables_db)
.finish()
}
} }
/// Veilid Table Storage. /// Veilid Table Storage.
/// Database for storing key value pairs persistently and securely across runs. /// Database for storing key value pairs persistently and securely across runs.
#[derive(Clone)]
pub struct TableStore { pub struct TableStore {
_event_bus: EventBus, registry: VeilidComponentRegistry,
config: VeilidConfig,
protected_store: ProtectedStore,
table_store_driver: TableStoreDriver,
inner: Arc<Mutex<TableStoreInner>>, // Sync mutex here because TableDB drops can happen at any time inner: Arc<Mutex<TableStoreInner>>, // Sync mutex here because TableDB drops can happen at any time
async_lock: Arc<AsyncMutex<()>>, // Async mutex for operations table_store_driver: TableStoreDriver,
async_lock: Arc<AsyncMutex<()>>, // Async mutex for operations
} }
impl fmt::Debug for TableStore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TableStore")
.field("registry", &self.registry)
.field("inner", &self.inner)
//.field("table_store_driver", &self.table_store_driver)
.field("async_lock", &self.async_lock)
.finish()
}
}
impl_veilid_component!(TableStore);
impl TableStore { impl TableStore {
fn new_inner() -> TableStoreInner { fn new_inner() -> TableStoreInner {
TableStoreInner { TableStoreInner {
@ -92,32 +112,20 @@ impl TableStore {
encryption_key: None, encryption_key: None,
all_table_names: HashMap::new(), all_table_names: HashMap::new(),
all_tables_db: None, all_tables_db: None,
crypto: None,
} }
} }
pub(crate) fn new( pub(crate) fn new(registry: VeilidComponentRegistry) -> Self {
event_bus: EventBus,
config: VeilidConfig,
protected_store: ProtectedStore,
) -> Self {
let inner = Self::new_inner(); let inner = Self::new_inner();
let table_store_driver = TableStoreDriver::new(config.clone()); let table_store_driver = TableStoreDriver::new(registry.config());
Self { Self {
_event_bus: event_bus, registry,
config,
protected_store,
inner: Arc::new(Mutex::new(inner)), inner: Arc::new(Mutex::new(inner)),
table_store_driver, table_store_driver,
async_lock: Arc::new(AsyncMutex::new(())), async_lock: Arc::new(AsyncMutex::new(())),
} }
} }
pub(crate) fn set_crypto(&self, crypto: Crypto) {
let mut inner = self.inner.lock();
inner.crypto = Some(crypto);
}
// Flush internal control state (must not use crypto) // Flush internal control state (must not use crypto)
async fn flush(&self) { async fn flush(&self) {
let (all_table_names_value, all_tables_db) = { let (all_table_names_value, all_tables_db) = {
@ -142,8 +150,7 @@ impl TableStore {
{ {
apibail_invalid_argument!("table name is invalid", "table", table); apibail_invalid_argument!("table name is invalid", "table", table);
} }
let c = self.config.get(); let namespace = self.config().with(|c| c.namespace.clone());
let namespace = c.namespace.clone();
Ok(if namespace.is_empty() { Ok(if namespace.is_empty() {
table.to_string() table.to_string()
} else { } else {
@ -260,8 +267,7 @@ impl TableStore {
// Get cryptosystem // Get cryptosystem
let kind = FourCC::try_from(&dek_bytes[0..4]).unwrap(); let kind = FourCC::try_from(&dek_bytes[0..4]).unwrap();
let crypto = self.inner.lock().crypto.as_ref().unwrap().clone(); let Some(vcrypto) = self.crypto().get(kind) else {
let Some(vcrypto) = crypto.get(kind) else {
bail!("unsupported cryptosystem '{kind}'"); bail!("unsupported cryptosystem '{kind}'");
}; };
@ -316,8 +322,7 @@ impl TableStore {
} }
// Get cryptosystem // Get cryptosystem
let crypto = self.inner.lock().crypto.as_ref().unwrap().clone(); let Some(vcrypto) = self.crypto().get(dek.kind) else {
let Some(vcrypto) = crypto.get(dek.kind) else {
bail!("unsupported cryptosystem '{}'", dek.kind); bail!("unsupported cryptosystem '{}'", dek.kind);
}; };
@ -340,7 +345,7 @@ impl TableStore {
#[instrument(level = "trace", target = "tstore", skip_all)] #[instrument(level = "trace", target = "tstore", skip_all)]
async fn load_device_encryption_key(&self) -> EyreResult<Option<TypedSharedSecret>> { async fn load_device_encryption_key(&self) -> EyreResult<Option<TypedSharedSecret>> {
let dek_bytes: Option<Vec<u8>> = self let dek_bytes: Option<Vec<u8>> = self
.protected_store .protected_store()
.load_user_secret("device_encryption_key") .load_user_secret("device_encryption_key")
.await?; .await?;
let Some(dek_bytes) = dek_bytes else { let Some(dek_bytes) = dek_bytes else {
@ -349,10 +354,9 @@ impl TableStore {
}; };
// Get device encryption key protection password if we have it // Get device encryption key protection password if we have it
let device_encryption_key_password = { let device_encryption_key_password = self
let c = self.config.get(); .config()
c.protected_store.device_encryption_key_password.clone() .with(|c| c.protected_store.device_encryption_key_password.clone());
};
Ok(Some(self.maybe_unprotect_device_encryption_key( Ok(Some(self.maybe_unprotect_device_encryption_key(
&dek_bytes, &dek_bytes,
@ -368,7 +372,7 @@ impl TableStore {
let Some(device_encryption_key) = device_encryption_key else { let Some(device_encryption_key) = device_encryption_key else {
// Remove the device encryption key // Remove the device encryption key
let existed = self let existed = self
.protected_store .protected_store()
.remove_user_secret("device_encryption_key") .remove_user_secret("device_encryption_key")
.await?; .await?;
log_tstore!(debug "removed device encryption key. existed: {}", existed); log_tstore!(debug "removed device encryption key. existed: {}", existed);
@ -377,15 +381,15 @@ impl TableStore {
// Get new device encryption key protection password if we are changing it // Get new device encryption key protection password if we are changing it
let new_device_encryption_key_password = { let new_device_encryption_key_password = {
let c = self.config.get(); self.config()
c.protected_store.new_device_encryption_key_password.clone() .with(|c| c.protected_store.new_device_encryption_key_password.clone())
}; };
let device_encryption_key_password = let device_encryption_key_password =
if let Some(new_device_encryption_key_password) = new_device_encryption_key_password { if let Some(new_device_encryption_key_password) = new_device_encryption_key_password {
// Change password // Change password
log_tstore!(debug "changing dek password"); log_tstore!(debug "changing dek password");
self.config self.config()
.with_mut(|c| { .try_with_mut(|c| {
c.protected_store c.protected_store
.device_encryption_key_password .device_encryption_key_password
.clone_from(&new_device_encryption_key_password); .clone_from(&new_device_encryption_key_password);
@ -395,8 +399,8 @@ impl TableStore {
} else { } else {
// Get device encryption key protection password if we have it // Get device encryption key protection password if we have it
log_tstore!(debug "saving with existing dek password"); log_tstore!(debug "saving with existing dek password");
let c = self.config.get(); self.config()
c.protected_store.device_encryption_key_password.clone() .with(|c| c.protected_store.device_encryption_key_password.clone())
}; };
let dek_bytes = self.maybe_protect_device_encryption_key( let dek_bytes = self.maybe_protect_device_encryption_key(
@ -406,7 +410,7 @@ impl TableStore {
// Save the new device encryption key // Save the new device encryption key
let existed = self let existed = self
.protected_store .protected_store()
.save_user_secret("device_encryption_key", &dek_bytes) .save_user_secret("device_encryption_key", &dek_bytes)
.await?; .await?;
log_tstore!(debug "saving device encryption key. existed: {}", existed); log_tstore!(debug "saving device encryption key. existed: {}", existed);
@ -414,7 +418,7 @@ impl TableStore {
} }
#[instrument(level = "trace", target = "tstore", skip_all)] #[instrument(level = "trace", target = "tstore", skip_all)]
pub(crate) async fn init(&self) -> EyreResult<()> { async fn init_async(&self) -> EyreResult<()> {
let _async_guard = self.async_lock.lock().await; let _async_guard = self.async_lock.lock().await;
// Get device encryption key from protected store // Get device encryption key from protected store
@ -437,12 +441,11 @@ impl TableStore {
} }
// Check for password change // Check for password change
let changing_password = self let changing_password = self.config().with(|c| {
.config c.protected_store
.get() .new_device_encryption_key_password
.protected_store .is_some()
.new_device_encryption_key_password });
.is_some();
// Save encryption key if it has changed or if the protecting password wants to change // Save encryption key if it has changed or if the protecting password wants to change
if device_encryption_key_changed || changing_password { if device_encryption_key_changed || changing_password {
@ -481,10 +484,7 @@ impl TableStore {
inner.all_tables_db = Some(all_tables_db); inner.all_tables_db = Some(all_tables_db);
} }
let do_delete = { let do_delete = self.config().with(|c| c.table_store.delete);
let c = self.config.get();
c.table_store.delete
};
if do_delete { if do_delete {
self.delete_all().await; self.delete_all().await;
@ -494,7 +494,7 @@ impl TableStore {
} }
#[instrument(level = "trace", target = "tstore", skip_all)] #[instrument(level = "trace", target = "tstore", skip_all)]
pub(crate) async fn terminate(&self) { async fn terminate_async(&self) {
let _async_guard = self.async_lock.lock().await; let _async_guard = self.async_lock.lock().await;
self.flush().await; self.flush().await;
@ -599,8 +599,7 @@ impl TableStore {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let table_db = TableDB::new( let table_db = TableDB::new(
table_name.clone(), table_name.clone(),
self.clone(), self.registry(),
inner.crypto.as_ref().unwrap().clone(),
db, db,
inner.encryption_key, inner.encryption_key,
inner.encryption_key, inner.encryption_key,

View File

@ -11,20 +11,17 @@ cfg_if! {
} }
struct CryptInfo { struct CryptInfo {
vcrypto: CryptoSystemVersion, typed_key: TypedSharedSecret,
key: SharedSecret,
} }
impl CryptInfo { impl CryptInfo {
pub fn new(crypto: Crypto, typed_key: TypedSharedSecret) -> Self { pub fn new(typed_key: TypedSharedSecret) -> Self {
let vcrypto = crypto.get(typed_key.kind).unwrap(); Self { typed_key }
let key = typed_key.value;
Self { vcrypto, key }
} }
} }
pub struct TableDBUnlockedInner { pub struct TableDBUnlockedInner {
table: String, table: String,
table_store: TableStore, registry: VeilidComponentRegistry,
database: Database, database: Database,
// Encryption and decryption key will be the same unless configured for an in-place migration // Encryption and decryption key will be the same unless configured for an in-place migration
encrypt_info: Option<CryptInfo>, encrypt_info: Option<CryptInfo>,
@ -39,7 +36,8 @@ impl fmt::Debug for TableDBUnlockedInner {
impl Drop for TableDBUnlockedInner { impl Drop for TableDBUnlockedInner {
fn drop(&mut self) { fn drop(&mut self) {
self.table_store.on_table_db_drop(self.table.clone()); let table_store = self.registry.lookup::<TableStore>().unwrap();
table_store.on_table_db_drop(self.table.clone());
} }
} }
@ -52,15 +50,14 @@ pub struct TableDB {
impl TableDB { impl TableDB {
pub(super) fn new( pub(super) fn new(
table: String, table: String,
table_store: TableStore, registry: VeilidComponentRegistry,
crypto: Crypto,
database: Database, database: Database,
encryption_key: Option<TypedSharedSecret>, encryption_key: Option<TypedSharedSecret>,
decryption_key: Option<TypedSharedSecret>, decryption_key: Option<TypedSharedSecret>,
opened_column_count: u32, opened_column_count: u32,
) -> Self { ) -> Self {
let encrypt_info = encryption_key.map(|ek| CryptInfo::new(crypto.clone(), ek)); let encrypt_info = encryption_key.map(CryptInfo::new);
let decrypt_info = decryption_key.map(|dk| CryptInfo::new(crypto.clone(), dk)); let decrypt_info = decryption_key.map(CryptInfo::new);
let total_columns = database.num_columns().unwrap(); let total_columns = database.num_columns().unwrap();
@ -72,7 +69,7 @@ impl TableDB {
}, },
unlocked_inner: Arc::new(TableDBUnlockedInner { unlocked_inner: Arc::new(TableDBUnlockedInner {
table, table,
table_store, registry,
database, database,
encrypt_info, encrypt_info,
decrypt_info, decrypt_info,
@ -102,6 +99,9 @@ impl TableDB {
Arc::downgrade(&self.unlocked_inner) Arc::downgrade(&self.unlocked_inner)
} }
pub(super) fn crypto(&self) -> VeilidComponentGuard<'_, Crypto> {
self.unlocked_inner.registry.lookup::<Crypto>().unwrap()
}
/// Get the internal name of the table /// Get the internal name of the table
pub fn table_name(&self) -> String { pub fn table_name(&self) -> String {
self.unlocked_inner.table.clone() self.unlocked_inner.table.clone()
@ -131,14 +131,16 @@ impl TableDB {
fn maybe_encrypt(&self, data: &[u8], keyed_nonce: bool) -> Vec<u8> { fn maybe_encrypt(&self, data: &[u8], keyed_nonce: bool) -> Vec<u8> {
let data = compress_prepend_size(data); let data = compress_prepend_size(data);
if let Some(ei) = &self.unlocked_inner.encrypt_info { if let Some(ei) = &self.unlocked_inner.encrypt_info {
let crypto = self.crypto();
let vcrypto = crypto.get(ei.typed_key.kind).unwrap();
let mut out = unsafe { unaligned_u8_vec_uninit(NONCE_LENGTH + data.len()) }; let mut out = unsafe { unaligned_u8_vec_uninit(NONCE_LENGTH + data.len()) };
if keyed_nonce { if keyed_nonce {
// Key content nonce // Key content nonce
let mut noncedata = Vec::with_capacity(data.len() + PUBLIC_KEY_LENGTH); let mut noncedata = Vec::with_capacity(data.len() + PUBLIC_KEY_LENGTH);
noncedata.extend_from_slice(&data); noncedata.extend_from_slice(&data);
noncedata.extend_from_slice(&ei.key.bytes); noncedata.extend_from_slice(&ei.typed_key.value.bytes);
let noncehash = ei.vcrypto.generate_hash(&noncedata); let noncehash = vcrypto.generate_hash(&noncedata);
out[0..NONCE_LENGTH].copy_from_slice(&noncehash[0..NONCE_LENGTH]) out[0..NONCE_LENGTH].copy_from_slice(&noncehash[0..NONCE_LENGTH])
} else { } else {
// Random nonce // Random nonce
@ -146,11 +148,11 @@ impl TableDB {
} }
let (nonce, encout) = out.split_at_mut(NONCE_LENGTH); let (nonce, encout) = out.split_at_mut(NONCE_LENGTH);
ei.vcrypto.crypt_b2b_no_auth( vcrypto.crypt_b2b_no_auth(
&data, &data,
encout, encout,
(nonce as &[u8]).try_into().unwrap(), (nonce as &[u8]).try_into().unwrap(),
&ei.key, &ei.typed_key.value,
); );
out out
} else { } else {
@ -162,6 +164,8 @@ impl TableDB {
#[instrument(level = "trace", target = "tstore", skip_all)] #[instrument(level = "trace", target = "tstore", skip_all)]
fn maybe_decrypt(&self, data: &[u8]) -> std::io::Result<Vec<u8>> { fn maybe_decrypt(&self, data: &[u8]) -> std::io::Result<Vec<u8>> {
if let Some(di) = &self.unlocked_inner.decrypt_info { if let Some(di) = &self.unlocked_inner.decrypt_info {
let crypto = self.crypto();
let vcrypto = crypto.get(di.typed_key.kind).unwrap();
assert!(data.len() >= NONCE_LENGTH); assert!(data.len() >= NONCE_LENGTH);
if data.len() == NONCE_LENGTH { if data.len() == NONCE_LENGTH {
return Ok(Vec::new()); return Ok(Vec::new());
@ -169,11 +173,11 @@ impl TableDB {
let mut out = unsafe { unaligned_u8_vec_uninit(data.len() - NONCE_LENGTH) }; let mut out = unsafe { unaligned_u8_vec_uninit(data.len() - NONCE_LENGTH) };
di.vcrypto.crypt_b2b_no_auth( vcrypto.crypt_b2b_no_auth(
&data[NONCE_LENGTH..], &data[NONCE_LENGTH..],
&mut out, &mut out,
(&data[0..NONCE_LENGTH]).try_into().unwrap(), (&data[0..NONCE_LENGTH]).try_into().unwrap(),
&di.key, &di.typed_key.value,
); );
decompress_size_prepended(&out, None) decompress_size_prepended(&out, None)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string())) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))

View File

@ -15,7 +15,7 @@ async fn shutdown(api: VeilidAPI) {
trace!("test_table_store: finished"); trace!("test_table_store: finished");
} }
pub async fn test_delete_open_delete(ts: TableStore) { pub async fn test_delete_open_delete(ts: &TableStore) {
trace!("test_delete_open_delete"); trace!("test_delete_open_delete");
let _ = ts.delete("test").await; let _ = ts.delete("test").await;
@ -47,7 +47,7 @@ pub async fn test_delete_open_delete(ts: TableStore) {
); );
} }
pub async fn test_store_delete_load(ts: TableStore) { pub async fn test_store_delete_load(ts: &TableStore) {
trace!("test_store_delete_load"); trace!("test_store_delete_load");
let _ = ts.delete("test").await; let _ = ts.delete("test").await;
@ -132,7 +132,7 @@ pub async fn test_store_delete_load(ts: TableStore) {
assert_eq!(db.load(2, b"baz").await.unwrap(), Some(b"QWERTY".to_vec())); assert_eq!(db.load(2, b"baz").await.unwrap(), Some(b"QWERTY".to_vec()));
} }
pub async fn test_transaction(ts: TableStore) { pub async fn test_transaction(ts: &TableStore) {
trace!("test_transaction"); trace!("test_transaction");
let _ = ts.delete("test").await; let _ = ts.delete("test").await;
@ -162,7 +162,7 @@ pub async fn test_transaction(ts: TableStore) {
assert_eq!(db.load(0, b"ddd").await, Ok(None)); assert_eq!(db.load(0, b"ddd").await, Ok(None));
} }
pub async fn test_json(vcrypto: CryptoSystemVersion, ts: TableStore) { pub async fn test_json(vcrypto: &CryptoSystemGuard<'_>, ts: &TableStore) {
trace!("test_json"); trace!("test_json");
let _ = ts.delete("test").await; let _ = ts.delete("test").await;
@ -200,7 +200,7 @@ pub async fn test_json(vcrypto: CryptoSystemVersion, ts: TableStore) {
); );
} }
pub async fn test_protect_unprotect(vcrypto: CryptoSystemVersion, ts: TableStore) { pub async fn test_protect_unprotect(vcrypto: &CryptoSystemGuard<'_>, ts: &TableStore) {
trace!("test_protect_unprotect"); trace!("test_protect_unprotect");
let dek1 = TypedSharedSecret::new( let dek1 = TypedSharedSecret::new(
@ -267,11 +267,11 @@ pub async fn test_all() {
for ck in VALID_CRYPTO_KINDS { for ck in VALID_CRYPTO_KINDS {
let vcrypto = crypto.get(ck).unwrap(); let vcrypto = crypto.get(ck).unwrap();
test_protect_unprotect(vcrypto.clone(), ts.clone()).await; test_protect_unprotect(&vcrypto, &ts).await;
test_delete_open_delete(ts.clone()).await; test_delete_open_delete(&ts).await;
test_store_delete_load(ts.clone()).await; test_store_delete_load(&ts).await;
test_transaction(ts.clone()).await; test_transaction(&ts).await;
test_json(vcrypto, ts.clone()).await; test_json(&vcrypto, &ts).await;
let _ = ts.delete("test").await; let _ = ts.delete("test").await;
} }

View File

@ -284,6 +284,11 @@ pub fn config_callback(key: String) -> ConfigCallbackReturn {
"network.protocol.wss.url" => Ok(Box::new(Option::<String>::None)), "network.protocol.wss.url" => Ok(Box::new(Option::<String>::None)),
#[cfg(feature = "geolocation")] #[cfg(feature = "geolocation")]
"network.privacy.country_code_denylist" => Ok(Box::new(Vec::<CountryCode>::new())), "network.privacy.country_code_denylist" => Ok(Box::new(Vec::<CountryCode>::new())),
#[cfg(feature = "virtual-network")]
"network.virtual_network.enabled" => Ok(Box::new(false)),
#[cfg(feature = "virtual-network")]
"network.virtual_network.server_address" => Ok(Box::new("".to_owned())),
_ => { _ => {
let err = format!("config key '{}' doesn't exist", key); let err = format!("config key '{}' doesn't exist", key);
debug!("{}", err); debug!("{}", err);
@ -293,26 +298,20 @@ pub fn config_callback(key: String) -> ConfigCallbackReturn {
} }
pub fn get_config() -> VeilidConfig { pub fn get_config() -> VeilidConfig {
let mut vc = VeilidConfig::new(); let vc =
match vc.setup(Arc::new(config_callback), Arc::new(update_callback)) { match VeilidConfig::new_from_callback(Arc::new(config_callback), Arc::new(update_callback))
Ok(()) => (), {
Err(e) => { Ok(vc) => vc,
error!("Error: {}", e); Err(e) => {
unreachable!(); error!("Error: {}", e);
} unreachable!();
}; }
};
vc vc
} }
pub async fn test_config() { pub async fn test_config() {
let mut vc = VeilidConfig::new(); let vc = get_config();
match vc.setup(Arc::new(config_callback), Arc::new(update_callback)) {
Ok(()) => (),
Err(e) => {
error!("Error: {}", e);
unreachable!();
}
}
let inner = vc.get(); let inner = vc.get();
assert_eq!(inner.program_name, String::from("VeilidCoreTests")); assert_eq!(inner.program_name, String::from("VeilidCoreTests"));
@ -424,6 +423,10 @@ pub async fn test_config() {
#[cfg(feature = "geolocation")] #[cfg(feature = "geolocation")]
assert_eq!(inner.network.privacy.country_code_denylist, Vec::new()); assert_eq!(inner.network.privacy.country_code_denylist, Vec::new());
#[cfg(feature = "virtual-network")]
assert!(!inner.network.virtual_network.enabled);
#[cfg(feature = "virtual-network")]
assert_eq!(inner.network.virtual_network.server_address, "");
} }
pub async fn test_all() { pub async fn test_all() {

View File

@ -23,7 +23,7 @@ impl Drop for VeilidAPIInner {
/// The primary developer entrypoint into `veilid-core` functionality. /// The primary developer entrypoint into `veilid-core` functionality.
/// ///
/// From [VeilidAPI] one can access: /// From [VeilidAPI] one can access various components:
/// ///
/// * [VeilidConfig] - The Veilid configuration specified at startup time. /// * [VeilidConfig] - The Veilid configuration specified at startup time.
/// * [Crypto] - The available set of cryptosystems provided by Veilid. /// * [Crypto] - The available set of cryptosystems provided by Veilid.
@ -42,7 +42,7 @@ pub struct VeilidAPI {
impl VeilidAPI { impl VeilidAPI {
#[instrument(target = "veilid_api", level = "debug", skip_all)] #[instrument(target = "veilid_api", level = "debug", skip_all)]
pub(crate) fn new(context: VeilidCoreContext) -> Self { pub(crate) fn new(context: VeilidCoreContext) -> Self {
event!(target: "veilid_api", Level::DEBUG, event!(target: "veilid_api", Level::DEBUG,
"VeilidAPI::new()"); "VeilidAPI::new()");
Self { Self {
inner: Arc::new(Mutex::new(VeilidAPIInner { inner: Arc::new(Mutex::new(VeilidAPIInner {
@ -60,7 +60,7 @@ impl VeilidAPI {
/// Shut down Veilid and terminate the API. /// Shut down Veilid and terminate the API.
#[instrument(target = "veilid_api", level = "debug", skip_all)] #[instrument(target = "veilid_api", level = "debug", skip_all)]
pub async fn shutdown(self) { pub async fn shutdown(self) {
event!(target: "veilid_api", Level::DEBUG, event!(target: "veilid_api", Level::DEBUG,
"VeilidAPI::shutdown()"); "VeilidAPI::shutdown()");
let context = { self.inner.lock().context.take() }; let context = { self.inner.lock().context.take() };
if let Some(context) = context { if let Some(context) = context {
@ -79,83 +79,90 @@ impl VeilidAPI {
/// Access the configuration that Veilid was initialized with. /// Access the configuration that Veilid was initialized with.
pub fn config(&self) -> VeilidAPIResult<VeilidConfig> { pub fn config(&self) -> VeilidAPIResult<VeilidConfig> {
let inner = self.inner.lock(); let inner = self.inner.lock();
if let Some(context) = &inner.context { let Some(context) = &inner.context else {
return Ok(context.config.clone()); return Err(VeilidAPIError::NotInitialized);
} };
Err(VeilidAPIError::NotInitialized) Ok(context.registry().config())
} }
/// Get the cryptosystem manager. /// Get the cryptosystem component.
pub fn crypto(&self) -> VeilidAPIResult<Crypto> { pub fn crypto(&self) -> VeilidAPIResult<VeilidComponentGuard<'_, Crypto>> {
let inner = self.inner.lock(); let inner = self.inner.lock();
if let Some(context) = &inner.context { let Some(context) = &inner.context else {
return Ok(context.crypto.clone()); return Err(VeilidAPIError::NotInitialized);
} };
Err(VeilidAPIError::NotInitialized) context
.registry()
.lookup::<Crypto>()
.ok_or(VeilidAPIError::NotInitialized)
} }
/// Get the TableStore manager. /// Get the TableStore component.
pub fn table_store(&self) -> VeilidAPIResult<TableStore> { pub fn table_store(&self) -> VeilidAPIResult<VeilidComponentGuard<'_, TableStore>> {
let inner = self.inner.lock(); let inner = self.inner.lock();
if let Some(context) = &inner.context { let Some(context) = &inner.context else {
return Ok(context.table_store.clone()); return Err(VeilidAPIError::NotInitialized);
} };
Err(VeilidAPIError::NotInitialized) context
.registry()
.lookup::<TableStore>()
.ok_or(VeilidAPIError::NotInitialized)
} }
/// Get the ProtectedStore manager. /// Get the ProtectedStore component.
pub fn protected_store(&self) -> VeilidAPIResult<ProtectedStore> { pub fn protected_store(&self) -> VeilidAPIResult<VeilidComponentGuard<'_, ProtectedStore>> {
let inner = self.inner.lock(); let inner = self.inner.lock();
if let Some(context) = &inner.context { let Some(context) = &inner.context else {
return Ok(context.protected_store.clone()); return Err(VeilidAPIError::NotInitialized);
} };
Err(VeilidAPIError::NotInitialized) context
.registry()
.lookup::<ProtectedStore>()
.ok_or(VeilidAPIError::NotInitialized)
}
/// Get the BlockStore component.
#[cfg(feature = "unstable-blockstore")]
pub fn block_store(&self) -> VeilidAPIResult<VeilidComponentGuard<'_, BlockStore>> {
let inner = self.inner.lock();
let Some(context) = &inner.context else {
return Err(VeilidAPIError::NotInitialized);
};
context
.registry()
.lookup::<BlockStore>()
.ok_or(VeilidAPIError::NotInitialized)
} }
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
// Internal Accessors // Internal Accessors
pub(crate) fn attachment_manager(&self) -> VeilidAPIResult<AttachmentManager> { pub(crate) fn attachment_manager(
&self,
) -> VeilidAPIResult<VeilidComponentGuard<'_, AttachmentManager>> {
let inner = self.inner.lock(); let inner = self.inner.lock();
if let Some(context) = &inner.context { let Some(context) = &inner.context else {
return Ok(context.attachment_manager.clone()); return Err(VeilidAPIError::NotInitialized);
} };
Err(VeilidAPIError::NotInitialized) context
.registry()
.lookup::<AttachmentManager>()
.ok_or(VeilidAPIError::NotInitialized)
} }
pub(crate) fn network_manager(&self) -> VeilidAPIResult<NetworkManager> { pub(crate) fn network_manager(&self) -> VeilidAPIResult<NetworkManager> {
let inner = self.inner.lock(); self.attachment_manager().map(|a| a.network_manager())
if let Some(context) = &inner.context {
return Ok(context.attachment_manager.network_manager());
}
Err(VeilidAPIError::NotInitialized)
} }
pub(crate) fn rpc_processor(&self) -> VeilidAPIResult<RPCProcessor> { pub(crate) fn rpc_processor(&self) -> VeilidAPIResult<RPCProcessor> {
let inner = self.inner.lock(); self.network_manager()
if let Some(context) = &inner.context { .map(|a| a.opt_rpc_processor())?
return Ok(context.attachment_manager.network_manager().rpc_processor()); .ok_or(VeilidAPIError::NotInitialized)
}
Err(VeilidAPIError::NotInitialized)
} }
pub(crate) fn routing_table(&self) -> VeilidAPIResult<RoutingTable> { pub(crate) fn routing_table(&self) -> VeilidAPIResult<RoutingTable> {
let inner = self.inner.lock(); self.attachment_manager()
if let Some(context) = &inner.context { .map(|a| a.network_manager().routing_table())
return Ok(context.attachment_manager.network_manager().routing_table());
}
Err(VeilidAPIError::NotInitialized)
} }
pub(crate) fn storage_manager(&self) -> VeilidAPIResult<StorageManager> { pub(crate) fn storage_manager(&self) -> VeilidAPIResult<StorageManager> {
let inner = self.inner.lock(); self.attachment_manager()
if let Some(context) = &inner.context { .map(|a| a.network_manager().storage_manager())
return Ok(context.storage_manager.clone());
}
Err(VeilidAPIError::NotInitialized)
}
#[cfg(feature = "unstable-blockstore")]
pub(crate) fn block_store(&self) -> VeilidAPIResult<BlockStore> {
let inner = self.inner.lock();
if let Some(context) = &inner.context {
return Ok(context.block_store.clone());
}
Err(VeilidAPIError::NotInitialized)
} }
pub(crate) fn with_debug_cache<R, F: FnOnce(&mut DebugCache) -> R>(&self, callback: F) -> R { pub(crate) fn with_debug_cache<R, F: FnOnce(&mut DebugCache) -> R>(&self, callback: F) -> R {
@ -186,7 +193,7 @@ impl VeilidAPI {
/// Connect to the network. /// Connect to the network.
#[instrument(target = "veilid_api", level = "debug", skip_all, ret, err)] #[instrument(target = "veilid_api", level = "debug", skip_all, ret, err)]
pub async fn attach(&self) -> VeilidAPIResult<()> { pub async fn attach(&self) -> VeilidAPIResult<()> {
event!(target: "veilid_api", Level::DEBUG, event!(target: "veilid_api", Level::DEBUG,
"VeilidAPI::attach()"); "VeilidAPI::attach()");
let attachment_manager = self.attachment_manager()?; let attachment_manager = self.attachment_manager()?;
@ -199,7 +206,7 @@ impl VeilidAPI {
/// Disconnect from the network. /// Disconnect from the network.
#[instrument(target = "veilid_api", level = "debug", skip_all, ret, err)] #[instrument(target = "veilid_api", level = "debug", skip_all, ret, err)]
pub async fn detach(&self) -> VeilidAPIResult<()> { pub async fn detach(&self) -> VeilidAPIResult<()> {
event!(target: "veilid_api", Level::DEBUG, event!(target: "veilid_api", Level::DEBUG,
"VeilidAPI::detach()"); "VeilidAPI::detach()");
let attachment_manager = self.attachment_manager()?; let attachment_manager = self.attachment_manager()?;
@ -215,7 +222,7 @@ impl VeilidAPI {
/// Get a new `RoutingContext` object to use to send messages over the Veilid network with default safety, sequencing, and stability parameters. /// Get a new `RoutingContext` object to use to send messages over the Veilid network with default safety, sequencing, and stability parameters.
#[instrument(target = "veilid_api", level = "debug", skip_all, err, ret)] #[instrument(target = "veilid_api", level = "debug", skip_all, err, ret)]
pub fn routing_context(&self) -> VeilidAPIResult<RoutingContext> { pub fn routing_context(&self) -> VeilidAPIResult<RoutingContext> {
event!(target: "veilid_api", Level::DEBUG, event!(target: "veilid_api", Level::DEBUG,
"VeilidAPI::routing_context()"); "VeilidAPI::routing_context()");
RoutingContext::try_new(self.clone()) RoutingContext::try_new(self.clone())
@ -232,7 +239,7 @@ impl VeilidAPI {
pub fn parse_as_target<S: ToString>(&self, s: S) -> VeilidAPIResult<Target> { pub fn parse_as_target<S: ToString>(&self, s: S) -> VeilidAPIResult<Target> {
let s = s.to_string(); let s = s.to_string();
event!(target: "veilid_api", Level::DEBUG, event!(target: "veilid_api", Level::DEBUG,
"VeilidAPI::parse_as_target(s: {:?})", s); "VeilidAPI::parse_as_target(s: {:?})", s);
// Is this a route id? // Is this a route id?
@ -289,7 +296,7 @@ impl VeilidAPI {
stability: Stability, stability: Stability,
sequencing: Sequencing, sequencing: Sequencing,
) -> VeilidAPIResult<(RouteId, Vec<u8>)> { ) -> VeilidAPIResult<(RouteId, Vec<u8>)> {
event!(target: "veilid_api", Level::DEBUG, event!(target: "veilid_api", Level::DEBUG,
"VeilidAPI::new_custom_private_route(crypto_kinds: {:?}, stability: {:?}, sequencing: {:?})", "VeilidAPI::new_custom_private_route(crypto_kinds: {:?}, stability: {:?}, sequencing: {:?})",
crypto_kinds, crypto_kinds,
stability, stability,
@ -347,7 +354,7 @@ impl VeilidAPI {
/// Returns a route id that can be used to send private messages to the node creating this route. /// Returns a route id that can be used to send private messages to the node creating this route.
#[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)] #[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)]
pub fn import_remote_private_route(&self, blob: Vec<u8>) -> VeilidAPIResult<RouteId> { pub fn import_remote_private_route(&self, blob: Vec<u8>) -> VeilidAPIResult<RouteId> {
event!(target: "veilid_api", Level::DEBUG, event!(target: "veilid_api", Level::DEBUG,
"VeilidAPI::import_remote_private_route(blob: {:?})", blob); "VeilidAPI::import_remote_private_route(blob: {:?})", blob);
let rss = self.routing_table()?.route_spec_store(); let rss = self.routing_table()?.route_spec_store();
rss.import_remote_private_route_blob(blob) rss.import_remote_private_route_blob(blob)
@ -359,7 +366,7 @@ impl VeilidAPI {
/// or received from. /// or received from.
#[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)] #[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)]
pub fn release_private_route(&self, route_id: RouteId) -> VeilidAPIResult<()> { pub fn release_private_route(&self, route_id: RouteId) -> VeilidAPIResult<()> {
event!(target: "veilid_api", Level::DEBUG, event!(target: "veilid_api", Level::DEBUG,
"VeilidAPI::release_private_route(route_id: {:?})", route_id); "VeilidAPI::release_private_route(route_id: {:?})", route_id);
let rss = self.routing_table()?.route_spec_store(); let rss = self.routing_table()?.route_spec_store();
if !rss.release_route(route_id) { if !rss.release_route(route_id) {
@ -381,7 +388,7 @@ impl VeilidAPI {
call_id: OperationId, call_id: OperationId,
message: Vec<u8>, message: Vec<u8>,
) -> VeilidAPIResult<()> { ) -> VeilidAPIResult<()> {
event!(target: "veilid_api", Level::DEBUG, event!(target: "veilid_api", Level::DEBUG,
"VeilidAPI::app_call_reply(call_id: {:?}, message: {:?})", call_id, message); "VeilidAPI::app_call_reply(call_id: {:?}, message: {:?})", call_id, message);
let rpc_processor = self.rpc_processor()?; let rpc_processor = self.rpc_processor()?;

View File

@ -229,7 +229,9 @@ fn get_keypair(text: &str) -> Option<KeyPair> {
KeyPair::from_str(text).ok() KeyPair::from_str(text).ok()
} }
fn get_crypto_system_version(crypto: Crypto) -> impl FnOnce(&str) -> Option<CryptoSystemVersion> { fn get_crypto_system_version(
crypto: &Crypto,
) -> impl FnOnce(&str) -> Option<CryptoSystemGuard<'_>> {
move |text| { move |text| {
let kindstr = get_string(text)?; let kindstr = get_string(text)?;
let kind = CryptoKind::from_str(&kindstr).ok()?; let kind = CryptoKind::from_str(&kindstr).ok()?;

View File

@ -858,7 +858,7 @@ impl VeilidConfigInner {
/// The configuration built for each Veilid node during API startup /// The configuration built for each Veilid node during API startup
#[derive(Clone)] #[derive(Clone)]
pub struct VeilidConfig { pub struct VeilidConfig {
update_cb: Option<UpdateCallback>, update_cb: UpdateCallback,
inner: Arc<RwLock<VeilidConfigInner>>, inner: Arc<RwLock<VeilidConfigInner>>,
} }
@ -871,164 +871,144 @@ impl fmt::Debug for VeilidConfig {
} }
} }
impl Default for VeilidConfig {
fn default() -> Self {
Self::new()
}
}
impl VeilidConfig { impl VeilidConfig {
fn new_inner() -> VeilidConfigInner { pub(crate) fn new_from_config(config: VeilidConfigInner, update_cb: UpdateCallback) -> Self {
VeilidConfigInner::default()
}
pub(crate) fn new() -> Self {
Self { Self {
update_cb: None, update_cb,
inner: Arc::new(RwLock::new(Self::new_inner())), inner: Arc::new(RwLock::new(config)),
} }
} }
pub(crate) fn setup_from_config( pub(crate) fn new_from_callback(
&mut self,
config: VeilidConfigInner,
update_cb: UpdateCallback,
) -> VeilidAPIResult<()> {
self.update_cb = Some(update_cb);
self.with_mut(|inner| {
*inner = config;
Ok(())
})
}
pub(crate) fn setup(
&mut self,
cb: ConfigCallback, cb: ConfigCallback,
update_cb: UpdateCallback, update_cb: UpdateCallback,
) -> VeilidAPIResult<()> { ) -> VeilidAPIResult<Self> {
self.update_cb = Some(update_cb); let mut inner = VeilidConfigInner::default();
self.with_mut(|inner| {
// Simple config transformation
macro_rules! get_config {
($key:expr) => {
let keyname = &stringify!($key)[6..];
let v = cb(keyname.to_owned())?;
$key = match v.downcast() {
Ok(v) => *v,
Err(e) => {
apibail_generic!(format!(
"incorrect type for key {}: {:?}",
keyname,
type_name_of_val(&*e)
))
}
};
};
}
get_config!(inner.program_name); // Simple config transformation
get_config!(inner.namespace); macro_rules! get_config {
get_config!(inner.capabilities.disable); ($key:expr) => {
get_config!(inner.table_store.directory); let keyname = &stringify!($key)[6..];
get_config!(inner.table_store.delete); let v = cb(keyname.to_owned())?;
get_config!(inner.block_store.directory); $key = match v.downcast() {
get_config!(inner.block_store.delete); Ok(v) => *v,
get_config!(inner.protected_store.allow_insecure_fallback); Err(e) => {
get_config!(inner.protected_store.always_use_insecure_storage); apibail_generic!(format!(
get_config!(inner.protected_store.directory); "incorrect type for key {}: {:?}",
get_config!(inner.protected_store.delete); keyname,
get_config!(inner.protected_store.device_encryption_key_password); type_name_of_val(&*e)
get_config!(inner.protected_store.new_device_encryption_key_password); ))
get_config!(inner.network.connection_initial_timeout_ms); }
get_config!(inner.network.connection_inactivity_timeout_ms); };
get_config!(inner.network.max_connections_per_ip4); };
get_config!(inner.network.max_connections_per_ip6_prefix); }
get_config!(inner.network.max_connections_per_ip6_prefix_size);
get_config!(inner.network.max_connection_frequency_per_min); get_config!(inner.program_name);
get_config!(inner.network.client_allowlist_timeout_ms); get_config!(inner.namespace);
get_config!(inner.network.reverse_connection_receipt_time_ms); get_config!(inner.capabilities.disable);
get_config!(inner.network.hole_punch_receipt_time_ms); get_config!(inner.table_store.directory);
get_config!(inner.network.network_key_password); get_config!(inner.table_store.delete);
get_config!(inner.network.routing_table.node_id); get_config!(inner.block_store.directory);
get_config!(inner.network.routing_table.node_id_secret); get_config!(inner.block_store.delete);
get_config!(inner.network.routing_table.bootstrap); get_config!(inner.protected_store.allow_insecure_fallback);
get_config!(inner.network.routing_table.limit_over_attached); get_config!(inner.protected_store.always_use_insecure_storage);
get_config!(inner.network.routing_table.limit_fully_attached); get_config!(inner.protected_store.directory);
get_config!(inner.network.routing_table.limit_attached_strong); get_config!(inner.protected_store.delete);
get_config!(inner.network.routing_table.limit_attached_good); get_config!(inner.protected_store.device_encryption_key_password);
get_config!(inner.network.routing_table.limit_attached_weak); get_config!(inner.protected_store.new_device_encryption_key_password);
get_config!(inner.network.dht.max_find_node_count); get_config!(inner.network.connection_initial_timeout_ms);
get_config!(inner.network.dht.resolve_node_timeout_ms); get_config!(inner.network.connection_inactivity_timeout_ms);
get_config!(inner.network.dht.resolve_node_count); get_config!(inner.network.max_connections_per_ip4);
get_config!(inner.network.dht.resolve_node_fanout); get_config!(inner.network.max_connections_per_ip6_prefix);
get_config!(inner.network.dht.get_value_timeout_ms); get_config!(inner.network.max_connections_per_ip6_prefix_size);
get_config!(inner.network.dht.get_value_count); get_config!(inner.network.max_connection_frequency_per_min);
get_config!(inner.network.dht.get_value_fanout); get_config!(inner.network.client_allowlist_timeout_ms);
get_config!(inner.network.dht.set_value_timeout_ms); get_config!(inner.network.reverse_connection_receipt_time_ms);
get_config!(inner.network.dht.set_value_count); get_config!(inner.network.hole_punch_receipt_time_ms);
get_config!(inner.network.dht.set_value_fanout); get_config!(inner.network.network_key_password);
get_config!(inner.network.dht.min_peer_count); get_config!(inner.network.routing_table.node_id);
get_config!(inner.network.dht.min_peer_refresh_time_ms); get_config!(inner.network.routing_table.node_id_secret);
get_config!(inner.network.dht.validate_dial_info_receipt_time_ms); get_config!(inner.network.routing_table.bootstrap);
get_config!(inner.network.dht.local_subkey_cache_size); get_config!(inner.network.routing_table.limit_over_attached);
get_config!(inner.network.dht.local_max_subkey_cache_memory_mb); get_config!(inner.network.routing_table.limit_fully_attached);
get_config!(inner.network.dht.remote_subkey_cache_size); get_config!(inner.network.routing_table.limit_attached_strong);
get_config!(inner.network.dht.remote_max_records); get_config!(inner.network.routing_table.limit_attached_good);
get_config!(inner.network.dht.remote_max_subkey_cache_memory_mb); get_config!(inner.network.routing_table.limit_attached_weak);
get_config!(inner.network.dht.remote_max_storage_space_mb); get_config!(inner.network.dht.max_find_node_count);
get_config!(inner.network.dht.public_watch_limit); get_config!(inner.network.dht.resolve_node_timeout_ms);
get_config!(inner.network.dht.member_watch_limit); get_config!(inner.network.dht.resolve_node_count);
get_config!(inner.network.dht.max_watch_expiration_ms); get_config!(inner.network.dht.resolve_node_fanout);
get_config!(inner.network.rpc.concurrency); get_config!(inner.network.dht.get_value_timeout_ms);
get_config!(inner.network.rpc.queue_size); get_config!(inner.network.dht.get_value_count);
get_config!(inner.network.rpc.max_timestamp_behind_ms); get_config!(inner.network.dht.get_value_fanout);
get_config!(inner.network.rpc.max_timestamp_ahead_ms); get_config!(inner.network.dht.set_value_timeout_ms);
get_config!(inner.network.rpc.timeout_ms); get_config!(inner.network.dht.set_value_count);
get_config!(inner.network.rpc.max_route_hop_count); get_config!(inner.network.dht.set_value_fanout);
get_config!(inner.network.rpc.default_route_hop_count); get_config!(inner.network.dht.min_peer_count);
get_config!(inner.network.upnp); get_config!(inner.network.dht.min_peer_refresh_time_ms);
get_config!(inner.network.detect_address_changes); get_config!(inner.network.dht.validate_dial_info_receipt_time_ms);
get_config!(inner.network.restricted_nat_retries); get_config!(inner.network.dht.local_subkey_cache_size);
get_config!(inner.network.tls.certificate_path); get_config!(inner.network.dht.local_max_subkey_cache_memory_mb);
get_config!(inner.network.tls.private_key_path); get_config!(inner.network.dht.remote_subkey_cache_size);
get_config!(inner.network.tls.connection_initial_timeout_ms); get_config!(inner.network.dht.remote_max_records);
get_config!(inner.network.application.https.enabled); get_config!(inner.network.dht.remote_max_subkey_cache_memory_mb);
get_config!(inner.network.application.https.listen_address); get_config!(inner.network.dht.remote_max_storage_space_mb);
get_config!(inner.network.application.https.path); get_config!(inner.network.dht.public_watch_limit);
get_config!(inner.network.application.https.url); get_config!(inner.network.dht.member_watch_limit);
get_config!(inner.network.application.http.enabled); get_config!(inner.network.dht.max_watch_expiration_ms);
get_config!(inner.network.application.http.listen_address); get_config!(inner.network.rpc.concurrency);
get_config!(inner.network.application.http.path); get_config!(inner.network.rpc.queue_size);
get_config!(inner.network.application.http.url); get_config!(inner.network.rpc.max_timestamp_behind_ms);
get_config!(inner.network.protocol.udp.enabled); get_config!(inner.network.rpc.max_timestamp_ahead_ms);
get_config!(inner.network.protocol.udp.socket_pool_size); get_config!(inner.network.rpc.timeout_ms);
get_config!(inner.network.protocol.udp.listen_address); get_config!(inner.network.rpc.max_route_hop_count);
get_config!(inner.network.protocol.udp.public_address); get_config!(inner.network.rpc.default_route_hop_count);
get_config!(inner.network.protocol.tcp.connect); get_config!(inner.network.upnp);
get_config!(inner.network.protocol.tcp.listen); get_config!(inner.network.detect_address_changes);
get_config!(inner.network.protocol.tcp.max_connections); get_config!(inner.network.restricted_nat_retries);
get_config!(inner.network.protocol.tcp.listen_address); get_config!(inner.network.tls.certificate_path);
get_config!(inner.network.protocol.tcp.public_address); get_config!(inner.network.tls.private_key_path);
get_config!(inner.network.protocol.ws.connect); get_config!(inner.network.tls.connection_initial_timeout_ms);
get_config!(inner.network.protocol.ws.listen); get_config!(inner.network.application.https.enabled);
get_config!(inner.network.protocol.ws.max_connections); get_config!(inner.network.application.https.listen_address);
get_config!(inner.network.protocol.ws.listen_address); get_config!(inner.network.application.https.path);
get_config!(inner.network.protocol.ws.path); get_config!(inner.network.application.https.url);
get_config!(inner.network.protocol.ws.url); get_config!(inner.network.application.http.enabled);
get_config!(inner.network.protocol.wss.connect); get_config!(inner.network.application.http.listen_address);
get_config!(inner.network.protocol.wss.listen); get_config!(inner.network.application.http.path);
get_config!(inner.network.protocol.wss.max_connections); get_config!(inner.network.application.http.url);
get_config!(inner.network.protocol.wss.listen_address); get_config!(inner.network.protocol.udp.enabled);
get_config!(inner.network.protocol.wss.path); get_config!(inner.network.protocol.udp.socket_pool_size);
get_config!(inner.network.protocol.wss.url); get_config!(inner.network.protocol.udp.listen_address);
#[cfg(feature = "geolocation")] get_config!(inner.network.protocol.udp.public_address);
get_config!(inner.network.privacy.country_code_denylist); get_config!(inner.network.protocol.tcp.connect);
#[cfg(feature = "virtual-network")] get_config!(inner.network.protocol.tcp.listen);
{ get_config!(inner.network.protocol.tcp.max_connections);
get_config!(inner.network.virtual_network.enabled); get_config!(inner.network.protocol.tcp.listen_address);
get_config!(inner.network.virtual_network.server_address); get_config!(inner.network.protocol.tcp.public_address);
} get_config!(inner.network.protocol.ws.connect);
Ok(()) get_config!(inner.network.protocol.ws.listen);
get_config!(inner.network.protocol.ws.max_connections);
get_config!(inner.network.protocol.ws.listen_address);
get_config!(inner.network.protocol.ws.path);
get_config!(inner.network.protocol.ws.url);
get_config!(inner.network.protocol.wss.connect);
get_config!(inner.network.protocol.wss.listen);
get_config!(inner.network.protocol.wss.max_connections);
get_config!(inner.network.protocol.wss.listen_address);
get_config!(inner.network.protocol.wss.path);
get_config!(inner.network.protocol.wss.url);
#[cfg(feature = "geolocation")]
get_config!(inner.network.privacy.country_code_denylist);
#[cfg(feature = "virtual-network")]
{
get_config!(inner.network.virtual_network.enabled);
get_config!(inner.network.virtual_network.server_address);
}
Ok(Self {
update_cb,
inner: Arc::new(RwLock::new(inner)),
}) })
} }
@ -1039,6 +1019,10 @@ impl VeilidConfig {
}) })
} }
pub fn update_callback(&self) -> UpdateCallback {
self.update_cb.clone()
}
pub fn get(&self) -> RwLockReadGuard<VeilidConfigInner> { pub fn get(&self) -> RwLockReadGuard<VeilidConfigInner> {
self.inner.read() self.inner.read()
} }
@ -1068,7 +1052,15 @@ impl VeilidConfig {
} }
} }
pub fn with_mut<F, R>(&self, f: F) -> VeilidAPIResult<R> pub fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(&VeilidConfigInner) -> R,
{
let inner = self.inner.read();
f(&*inner)
}
pub fn try_with_mut<F, R>(&self, f: F) -> VeilidAPIResult<R>
where where
F: FnOnce(&mut VeilidConfigInner) -> VeilidAPIResult<R>, F: FnOnce(&mut VeilidConfigInner) -> VeilidAPIResult<R>,
{ {
@ -1091,12 +1083,10 @@ impl VeilidConfig {
}; };
// Send configuration update to clients // Send configuration update to clients
if let Some(update_cb) = &self.update_cb { let safe_cfg = self.safe_config_inner();
let safe_cfg = self.safe_config_inner(); (self.update_cb)(VeilidUpdate::Config(Box::new(VeilidStateConfig {
update_cb(VeilidUpdate::Config(Box::new(VeilidStateConfig { config: safe_cfg,
config: safe_cfg, })));
})));
}
Ok(out) Ok(out)
} }
@ -1133,7 +1123,7 @@ impl VeilidConfig {
} }
} }
pub fn set_key_json(&self, key: &str, value: &str) -> VeilidAPIResult<()> { pub fn set_key_json(&self, key: &str, value: &str) -> VeilidAPIResult<()> {
self.with_mut(|c| { self.try_with_mut(|c| {
// Split key into path parts // Split key into path parts
let keypath: Vec<&str> = key.split('.').collect(); let keypath: Vec<&str> = key.split('.').collect();
@ -1305,124 +1295,6 @@ impl VeilidConfig {
Ok(()) Ok(())
} }
#[cfg(not(test))]
async fn init_node_id(
&self,
vcrypto: CryptoSystemVersion,
table_store: TableStore,
) -> VeilidAPIResult<(TypedKey, TypedSecret)> {
let ck = vcrypto.kind();
let mut node_id = self.inner.read().network.routing_table.node_id.get(ck);
let mut node_id_secret = self
.inner
.read()
.network
.routing_table
.node_id_secret
.get(ck);
// See if node id was previously stored in the table store
let config_table = table_store.open("__veilid_config", 1).await?;
let table_key_node_id = format!("node_id_{}", ck);
let table_key_node_id_secret = format!("node_id_secret_{}", ck);
if node_id.is_none() {
log_tstore!(debug "pulling {} from storage", table_key_node_id);
if let Ok(Some(stored_node_id)) = config_table
.load_json::<TypedKey>(0, table_key_node_id.as_bytes())
.await
{
log_tstore!(debug "{} found in storage", table_key_node_id);
node_id = Some(stored_node_id);
} else {
log_tstore!(debug "{} not found in storage", table_key_node_id);
}
}
// See if node id secret was previously stored in the protected store
if node_id_secret.is_none() {
log_tstore!(debug "pulling {} from storage", table_key_node_id_secret);
if let Ok(Some(stored_node_id_secret)) = config_table
.load_json::<TypedSecret>(0, table_key_node_id_secret.as_bytes())
.await
{
log_tstore!(debug "{} found in storage", table_key_node_id_secret);
node_id_secret = Some(stored_node_id_secret);
} else {
log_tstore!(debug "{} not found in storage", table_key_node_id_secret);
}
}
// If we have a node id from storage, check it
let (node_id, node_id_secret) =
if let (Some(node_id), Some(node_id_secret)) = (node_id, node_id_secret) {
// Validate node id
if !vcrypto.validate_keypair(&node_id.value, &node_id_secret.value) {
apibail_generic!(format!(
"node_id_secret_{} and node_id_key_{} don't match",
ck, ck
));
}
(node_id, node_id_secret)
} else {
// If we still don't have a valid node id, generate one
log_tstore!(debug "generating new node_id_{}", ck);
let kp = vcrypto.generate_keypair();
(TypedKey::new(ck, kp.key), TypedSecret::new(ck, kp.secret))
};
info!("Node Id: {}", node_id);
// Save the node id / secret in storage
config_table
.store_json(0, table_key_node_id.as_bytes(), &node_id)
.await?;
config_table
.store_json(0, table_key_node_id_secret.as_bytes(), &node_id_secret)
.await?;
Ok((node_id, node_id_secret))
}
/// Get the node id from config if one is specified.
/// Must be done -after- protected store startup.
#[cfg_attr(test, allow(unused_variables))]
pub async fn init_node_ids(
&self,
crypto: Crypto,
table_store: TableStore,
) -> VeilidAPIResult<()> {
let mut out_node_id = TypedKeyGroup::new();
let mut out_node_id_secret = TypedSecretGroup::new();
for ck in VALID_CRYPTO_KINDS {
let vcrypto = crypto
.get(ck)
.expect("Valid crypto kind is not actually valid.");
#[cfg(test)]
let (node_id, node_id_secret) = {
let kp = vcrypto.generate_keypair();
(TypedKey::new(ck, kp.key), TypedSecret::new(ck, kp.secret))
};
#[cfg(not(test))]
let (node_id, node_id_secret) = self.init_node_id(vcrypto, table_store.clone()).await?;
// Save for config
out_node_id.add(node_id);
out_node_id_secret.add(node_id_secret);
}
// Commit back to config
self.with_mut(|c| {
c.network.routing_table.node_id = out_node_id;
c.network.routing_table.node_id_secret = out_node_id_secret;
Ok(())
})?;
Ok(())
}
} }
/// Return the default veilid config as a json object. /// Return the default veilid config as a json object.

View File

@ -127,7 +127,11 @@ pub use async_lock::RwLock as AsyncRwLock;
#[doc(no_inline)] #[doc(no_inline)]
pub use async_lock::RwLockReadGuard as AsyncRwLockReadGuard; pub use async_lock::RwLockReadGuard as AsyncRwLockReadGuard;
#[doc(no_inline)] #[doc(no_inline)]
pub use async_lock::RwLockReadGuardArc as AsyncRwLockReadGuardArc;
#[doc(no_inline)]
pub use async_lock::RwLockWriteGuard as AsyncRwLockWriteGuard; pub use async_lock::RwLockWriteGuard as AsyncRwLockWriteGuard;
#[doc(no_inline)]
pub use async_lock::RwLockWriteGuardArc as AsyncRwLockWriteGuardArc;
cfg_if! { cfg_if! {
if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] { if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
@ -151,13 +155,6 @@ cfg_if! {
#[doc(no_inline)] #[doc(no_inline)]
pub use async_std::sync::MutexGuardArc as AsyncMutexGuardArc; pub use async_std::sync::MutexGuardArc as AsyncMutexGuardArc;
// #[doc(no_inline)]
// pub use async_std::sync::RwLock as AsyncRwLock;
// #[doc(no_inline)]
// pub use async_std::sync::RwLockReadGuard as AsyncRwLockReadGuard;
// #[doc(no_inline)]
// pub use async_std::sync::RwLockWriteGuard as AsyncRwLockWriteGuard;
#[doc(no_inline)] #[doc(no_inline)]
pub use async_std::task::JoinHandle as LowLevelJoinHandle; pub use async_std::task::JoinHandle as LowLevelJoinHandle;
@ -169,14 +166,6 @@ cfg_if! {
#[doc(no_inline)] #[doc(no_inline)]
pub use tokio::sync::OwnedMutexGuard as AsyncMutexGuardArc; pub use tokio::sync::OwnedMutexGuard as AsyncMutexGuardArc;
// #[doc(no_inline)]
// pub use tokio::sync::RwLock as AsyncRwLock;
// #[doc(no_inline)]
// pub use tokio::sync::RwLockReadGuard as AsyncRwLockReadGuard;
// #[doc(no_inline)]
// pub use tokio::sync::RwLockWriteGuard as AsyncRwLockWriteGuard;
#[doc(no_inline)] #[doc(no_inline)]
pub use tokio::task::JoinHandle as LowLevelJoinHandle; pub use tokio::task::JoinHandle as LowLevelJoinHandle;
} else { } else {

View File

@ -47,6 +47,23 @@ impl<'a> Drop for StartupLockEnterGuard<'a> {
} }
} }
/// RAII-style lock for entry operations on a started-up region of code.
#[derive(Debug)]
pub struct StartupLockEnterGuardArc {
_guard: AsyncRwLockReadGuardArc<bool>,
#[cfg(feature = "debug-locks")]
id: usize,
#[cfg(feature = "debug-locks")]
active_guards: Arc<Mutex<HashMap<usize, backtrace::Backtrace>>>,
}
#[cfg(feature = "debug-locks")]
impl Drop for StartupLockEnterGuardArc {
fn drop(&mut self) {
self.active_guards.lock().remove(&self.id);
}
}
#[cfg(feature = "debug-locks")] #[cfg(feature = "debug-locks")]
static GUARD_ID: AtomicUsize = AtomicUsize::new(0); static GUARD_ID: AtomicUsize = AtomicUsize::new(0);
@ -59,7 +76,7 @@ static GUARD_ID: AtomicUsize = AtomicUsize::new(0);
/// asynchronous shutdown to wait for operations to finish before proceeding. /// asynchronous shutdown to wait for operations to finish before proceeding.
#[derive(Debug)] #[derive(Debug)]
pub struct StartupLock { pub struct StartupLock {
startup_state: AsyncRwLock<bool>, startup_state: Arc<AsyncRwLock<bool>>,
stop_source: Mutex<Option<StopSource>>, stop_source: Mutex<Option<StopSource>>,
#[cfg(feature = "debug-locks")] #[cfg(feature = "debug-locks")]
active_guards: Arc<Mutex<HashMap<usize, backtrace::Backtrace>>>, active_guards: Arc<Mutex<HashMap<usize, backtrace::Backtrace>>>,
@ -68,7 +85,7 @@ pub struct StartupLock {
impl StartupLock { impl StartupLock {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
startup_state: AsyncRwLock::new(false), startup_state: Arc::new(AsyncRwLock::new(false)),
stop_source: Mutex::new(None), stop_source: Mutex::new(None),
#[cfg(feature = "debug-locks")] #[cfg(feature = "debug-locks")]
active_guards: Arc::new(Mutex::new(HashMap::new())), active_guards: Arc::new(Mutex::new(HashMap::new())),
@ -168,6 +185,31 @@ impl StartupLock {
Ok(out) Ok(out)
} }
/// Enter an operation in a started-up module, using an owned lock.
/// If this module has not yet started up or is in the process of startup or shutdown
/// this will fail.
pub fn enter_arc(&self) -> Result<StartupLockEnterGuardArc, StartupLockNotStartedError> {
let guard =
asyncrwlock_try_read_arc!(self.startup_state).ok_or(StartupLockNotStartedError)?;
if !*guard {
return Err(StartupLockNotStartedError);
}
let out = StartupLockEnterGuardArc {
_guard: guard,
#[cfg(feature = "debug-locks")]
id: GUARD_ID.fetch_add(1, Ordering::AcqRel),
#[cfg(feature = "debug-locks")]
active_guards: self.active_guards.clone(),
};
#[cfg(feature = "debug-locks")]
self.active_guards
.lock()
.insert(out.id, backtrace::Backtrace::new());
Ok(out)
}
} }
impl Default for StartupLock { impl Default for StartupLock {

View File

@ -3,7 +3,7 @@ use super::*;
static STRING_TABLE: std::sync::LazyLock<Mutex<BTreeSet<&'static str>>> = static STRING_TABLE: std::sync::LazyLock<Mutex<BTreeSet<&'static str>>> =
std::sync::LazyLock::new(|| Mutex::new(BTreeSet::new())); std::sync::LazyLock::new(|| Mutex::new(BTreeSet::new()));
static STRING_TRANSFORM_TABLE: std::sync::LazyLock<Mutex<HashMap<usize, &'static str>>> = static STRING_TRANSFORM_TABLE: std::sync::LazyLock<Mutex<HashMap<(usize, usize), &'static str>>> =
std::sync::LazyLock::new(|| Mutex::new(HashMap::new())); std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
pub trait ToStaticStr { pub trait ToStaticStr {
@ -35,7 +35,15 @@ impl StaticStrTransform for &'static str {
self, self,
transform: F, transform: F,
) -> &'static str { ) -> &'static str {
let key = self.as_ptr() as usize; // multiple keys can point to the same data, but it must be bounded due to static lifetime
// a pointer to static memory plus its length must always be the same immutable slice
// this is maybe slightly faster for use in log string transformation where speed is essential at scale
// otherwise we would have used a hash here.
// TODO: if performance does not suffer, consider switching to a hash at a later point, as this could cause
// the STRING_TRANSFORM_TABLE to be bigger than necessary, depending on unknowns in rustc about 'static str deduplication.
let key = (self.as_ptr() as usize, self.len());
let mut transform_table = STRING_TRANSFORM_TABLE.lock(); let mut transform_table = STRING_TRANSFORM_TABLE.lock();
if let Some(v) = transform_table.get(&key) { if let Some(v) = transform_table.get(&key) {
return v; return v;

View File

@ -104,6 +104,19 @@ macro_rules! asyncrwlock_try_write {
}; };
} }
#[macro_export]
macro_rules! asyncrwlock_try_read_arc {
($x:expr) => {
$x.try_read_arc()
};
}
#[macro_export]
macro_rules! asyncrwlock_try_write_arc {
($x:expr) => {
$x.try_write_arc()
};
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////
pub fn system_boxed<'a, Out>( pub fn system_boxed<'a, Out>(