mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-03-16 02:46:15 -04:00
[skip ci] checkpoint. refactor.
This commit is contained in:
parent
bf948e017f
commit
e57d56b00c
195
veilid-core/src/component.rs
Normal file
195
veilid-core/src/component.rs
Normal file
@ -0,0 +1,195 @@
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use super::*;
|
||||
|
||||
trait AsAnyArcSendSync {
|
||||
fn as_any_arc_send_sync(self: Arc<Self>) -> Arc<dyn core::any::Any + Send + Sync>;
|
||||
}
|
||||
|
||||
impl<T: Send + Sync + 'static> AsAnyArcSendSync for T {
|
||||
fn as_any_arc_send_sync(self: Arc<Self>) -> Arc<dyn core::any::Any + Send + Sync> {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
pub trait VeilidComponent: AsAnyArcSendSync + core::fmt::Debug {
|
||||
fn registry(&self) -> VeilidComponentRegistry;
|
||||
|
||||
fn init(&self) -> SendPinBoxFutureLifetime<'_, EyreResult<()>>;
|
||||
fn terminate(&self) -> SendPinBoxFutureLifetime<'_, ()>;
|
||||
|
||||
// Registry shortcuts
|
||||
fn config(&self) -> VeilidConfig {
|
||||
self.registry().config()
|
||||
}
|
||||
fn update_callback(&self) -> UpdateCallback {
|
||||
self.registry().update_callback()
|
||||
}
|
||||
fn event_bus(&self) -> EventBus {
|
||||
self.registry().event_bus()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct VeilidComponentGuard<'a, T: VeilidComponent + Send + Sync + 'static> {
|
||||
component: Arc<T>,
|
||||
_phantom: core::marker::PhantomData<&'a T>,
|
||||
}
|
||||
|
||||
impl<'a, T> core::ops::Deref for VeilidComponentGuard<'a, T>
|
||||
where
|
||||
T: VeilidComponent + Send + Sync + 'static,
|
||||
{
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.component
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct VeilidComponentRegistryInner {
|
||||
type_map: HashMap<core::any::TypeId, Arc<dyn VeilidComponent + Send + Sync>>,
|
||||
init_order: Vec<core::any::TypeId>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct VeilidComponentRegistry {
|
||||
inner: Arc<Mutex<VeilidComponentRegistryInner>>,
|
||||
config: VeilidConfig,
|
||||
event_bus: EventBus,
|
||||
init_lock: Arc<AsyncMutex<bool>>,
|
||||
}
|
||||
|
||||
impl VeilidComponentRegistry {
|
||||
pub fn new(config: VeilidConfig) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Mutex::new(VeilidComponentRegistryInner {
|
||||
type_map: HashMap::new(),
|
||||
init_order: Vec::new(),
|
||||
})),
|
||||
config,
|
||||
event_bus: EventBus::new(),
|
||||
init_lock: Arc::new(AsyncMutex::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn register<
|
||||
T: VeilidComponent + Send + Sync + 'static,
|
||||
F: FnOnce(VeilidComponentRegistry) -> T,
|
||||
>(
|
||||
&self,
|
||||
component_constructor: F,
|
||||
) {
|
||||
let component = Arc::new(component_constructor(self.clone()));
|
||||
let component_type_id = core::any::TypeId::of::<T>();
|
||||
|
||||
let mut inner = self.inner.lock();
|
||||
assert!(
|
||||
inner
|
||||
.type_map
|
||||
.insert(component_type_id, component)
|
||||
.is_none(),
|
||||
"should not register same component twice"
|
||||
);
|
||||
inner.init_order.push(component_type_id);
|
||||
}
|
||||
|
||||
pub async fn init(&self) -> EyreResult<()> {
|
||||
let Some(mut _init_guard) = asyncmutex_try_lock!(self.init_lock) else {
|
||||
bail!("init should only happen one at a time");
|
||||
};
|
||||
if *_init_guard {
|
||||
bail!("already initialized");
|
||||
}
|
||||
|
||||
let init_order = self.get_init_order();
|
||||
let mut initialized = vec![];
|
||||
for component in init_order {
|
||||
if let Err(e) = component.init().await {
|
||||
self.terminate_inner(initialized).await;
|
||||
return Err(e);
|
||||
}
|
||||
initialized.push(component);
|
||||
}
|
||||
|
||||
*_init_guard = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn terminate(&self) {
|
||||
let Some(mut _init_guard) = asyncmutex_try_lock!(self.init_lock) else {
|
||||
panic!("terminate should only happen one at a time");
|
||||
};
|
||||
if !*_init_guard {
|
||||
panic!("not initialized");
|
||||
}
|
||||
|
||||
let init_order = self.get_init_order();
|
||||
self.terminate_inner(init_order).await;
|
||||
|
||||
*_init_guard = false;
|
||||
}
|
||||
|
||||
async fn terminate_inner(&self, initialized: Vec<Arc<dyn VeilidComponent + Send + Sync>>) {
|
||||
for component in initialized.iter().rev() {
|
||||
component.terminate().await;
|
||||
}
|
||||
}
|
||||
|
||||
fn get_init_order(&self) -> Vec<Arc<dyn VeilidComponent + Send + Sync>> {
|
||||
let inner = self.inner.lock();
|
||||
inner
|
||||
.init_order
|
||||
.iter()
|
||||
.map(|id| inner.type_map.get(id).unwrap().clone())
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////
|
||||
|
||||
pub fn config(&self) -> VeilidConfig {
|
||||
self.config.clone()
|
||||
}
|
||||
pub fn update_callback(&self) -> UpdateCallback {
|
||||
self.config.update_callback()
|
||||
}
|
||||
pub fn event_bus(&self) -> EventBus {
|
||||
self.event_bus.clone()
|
||||
}
|
||||
|
||||
pub fn lookup<'a, T: VeilidComponent + Send + Sync + 'static>(
|
||||
&self,
|
||||
) -> Option<VeilidComponentGuard<'a, T>> {
|
||||
let inner = self.inner.lock();
|
||||
let component_type_id = core::any::TypeId::of::<T>();
|
||||
let component_dyn = inner.type_map.get(&component_type_id)?.clone();
|
||||
let component = component_dyn
|
||||
.as_any_arc_send_sync()
|
||||
.downcast::<T>()
|
||||
.unwrap();
|
||||
Some(VeilidComponentGuard {
|
||||
component,
|
||||
_phantom: PhantomData {},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
macro_rules! impl_veilid_component {
|
||||
($component_name:ident) => {
|
||||
impl VeilidComponent for $component_name {
|
||||
fn registry(&self) -> VeilidComponentRegistry {
|
||||
self.registry.clone()
|
||||
}
|
||||
|
||||
fn init(&self) -> SendPinBoxFutureLifetime<'_, EyreResult<()>> {
|
||||
Box::pin(async { self.init_async().await })
|
||||
}
|
||||
|
||||
fn terminate(&self) -> SendPinBoxFutureLifetime<'_, ()> {
|
||||
Box::pin(async { self.terminate_async().await })
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub(crate) use impl_veilid_component;
|
@ -8,234 +8,11 @@ use crate::*;
|
||||
|
||||
pub type UpdateCallback = Arc<dyn Fn(VeilidUpdate) + Send + Sync>;
|
||||
|
||||
/// Internal services startup mechanism.
|
||||
/// Ensures that everything is started up, and shut down in the right order
|
||||
/// and provides an atomic state for if the system is properly operational.
|
||||
struct StartupShutdownContext {
|
||||
pub config: VeilidConfig,
|
||||
pub update_callback: UpdateCallback,
|
||||
|
||||
pub event_bus: Option<EventBus>,
|
||||
pub protected_store: Option<ProtectedStore>,
|
||||
pub table_store: Option<TableStore>,
|
||||
#[cfg(feature = "unstable-blockstore")]
|
||||
pub block_store: Option<BlockStore>,
|
||||
pub crypto: Option<Crypto>,
|
||||
pub attachment_manager: Option<AttachmentManager>,
|
||||
pub storage_manager: Option<StorageManager>,
|
||||
}
|
||||
|
||||
impl StartupShutdownContext {
|
||||
pub fn new_empty(config: VeilidConfig, update_callback: UpdateCallback) -> Self {
|
||||
Self {
|
||||
config,
|
||||
update_callback,
|
||||
event_bus: None,
|
||||
protected_store: None,
|
||||
table_store: None,
|
||||
#[cfg(feature = "unstable-blockstore")]
|
||||
block_store: None,
|
||||
crypto: None,
|
||||
attachment_manager: None,
|
||||
storage_manager: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new_full(context: VeilidCoreContext) -> Self {
|
||||
Self {
|
||||
config: context.config,
|
||||
update_callback: context.update_callback,
|
||||
event_bus: Some(context.event_bus),
|
||||
protected_store: Some(context.protected_store),
|
||||
table_store: Some(context.table_store),
|
||||
#[cfg(feature = "unstable-blockstore")]
|
||||
block_store: Some(context.block_store),
|
||||
crypto: Some(context.crypto),
|
||||
attachment_manager: Some(context.attachment_manager),
|
||||
storage_manager: Some(context.storage_manager),
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", target = "core_context", err, skip_all)]
|
||||
pub async fn startup(mut self) -> EyreResult<VeilidCoreContext> {
|
||||
info!("Veilid API starting up");
|
||||
|
||||
info!("init api tracing");
|
||||
let (program_name, namespace) = {
|
||||
let config = self.config.get();
|
||||
(config.program_name.clone(), config.namespace.clone())
|
||||
};
|
||||
|
||||
ApiTracingLayer::add_callback(program_name, namespace, self.update_callback.clone())
|
||||
.await?;
|
||||
|
||||
// Add the event bus
|
||||
let event_bus = EventBus::new();
|
||||
if let Err(e) = event_bus.startup().await {
|
||||
error!("failed to start up event bus: {}", e);
|
||||
self.shutdown().await;
|
||||
return Err(e.into());
|
||||
}
|
||||
self.event_bus = Some(event_bus.clone());
|
||||
|
||||
// Set up protected store
|
||||
let protected_store = ProtectedStore::new(event_bus.clone(), self.config.clone());
|
||||
if let Err(e) = protected_store.init().await {
|
||||
error!("failed to init protected store: {}", e);
|
||||
self.shutdown().await;
|
||||
return Err(e);
|
||||
}
|
||||
self.protected_store = Some(protected_store.clone());
|
||||
|
||||
// Set up tablestore and crypto system
|
||||
let table_store = TableStore::new(
|
||||
event_bus.clone(),
|
||||
self.config.clone(),
|
||||
protected_store.clone(),
|
||||
);
|
||||
let crypto = Crypto::new(event_bus.clone(), self.config.clone(), table_store.clone());
|
||||
table_store.set_crypto(crypto.clone());
|
||||
|
||||
// Initialize table store first, so crypto code can load caches
|
||||
// Tablestore can use crypto during init, just not any cached operations or things
|
||||
// that require flushing back to the tablestore
|
||||
if let Err(e) = table_store.init().await {
|
||||
error!("failed to init table store: {}", e);
|
||||
self.shutdown().await;
|
||||
return Err(e);
|
||||
}
|
||||
self.table_store = Some(table_store.clone());
|
||||
|
||||
// Set up crypto
|
||||
if let Err(e) = crypto.init().await {
|
||||
error!("failed to init crypto: {}", e);
|
||||
self.shutdown().await;
|
||||
return Err(e);
|
||||
}
|
||||
self.crypto = Some(crypto.clone());
|
||||
|
||||
// Set up block store
|
||||
#[cfg(feature = "unstable-blockstore")]
|
||||
{
|
||||
let block_store = BlockStore::new(event_bus.clone(), self.config.clone());
|
||||
if let Err(e) = block_store.init().await {
|
||||
error!("failed to init block store: {}", e);
|
||||
self.shutdown().await;
|
||||
return Err(e);
|
||||
}
|
||||
self.block_store = Some(block_store.clone());
|
||||
}
|
||||
|
||||
// Set up storage manager
|
||||
let update_callback = self.update_callback.clone();
|
||||
|
||||
let storage_manager = StorageManager::new(
|
||||
event_bus.clone(),
|
||||
self.config.clone(),
|
||||
self.crypto.clone().unwrap(),
|
||||
self.table_store.clone().unwrap(),
|
||||
#[cfg(feature = "unstable-blockstore")]
|
||||
self.block_store.clone().unwrap(),
|
||||
);
|
||||
if let Err(e) = storage_manager.init(update_callback).await {
|
||||
error!("failed to init storage manager: {}", e);
|
||||
self.shutdown().await;
|
||||
return Err(e);
|
||||
}
|
||||
self.storage_manager = Some(storage_manager.clone());
|
||||
|
||||
// Set up attachment manager
|
||||
let update_callback = self.update_callback.clone();
|
||||
let attachment_manager = AttachmentManager::new(
|
||||
event_bus.clone(),
|
||||
self.config.clone(),
|
||||
storage_manager,
|
||||
table_store,
|
||||
#[cfg(feature = "unstable-blockstore")]
|
||||
block_store,
|
||||
crypto,
|
||||
);
|
||||
if let Err(e) = attachment_manager.init(update_callback).await {
|
||||
error!("failed to init attachment manager: {}", e);
|
||||
self.shutdown().await;
|
||||
return Err(e);
|
||||
}
|
||||
self.attachment_manager = Some(attachment_manager);
|
||||
|
||||
info!("Veilid API startup complete");
|
||||
Ok(VeilidCoreContext {
|
||||
config: self.config,
|
||||
update_callback: self.update_callback,
|
||||
event_bus: self.event_bus.unwrap(),
|
||||
storage_manager: self.storage_manager.unwrap(),
|
||||
protected_store: self.protected_store.unwrap(),
|
||||
table_store: self.table_store.unwrap(),
|
||||
#[cfg(feature = "unstable-blockstore")]
|
||||
block_store: self.block_store.unwrap(),
|
||||
crypto: self.crypto.unwrap(),
|
||||
attachment_manager: self.attachment_manager.unwrap(),
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", target = "core_context", skip_all)]
|
||||
pub async fn shutdown(mut self) {
|
||||
info!("Veilid API shutting down");
|
||||
|
||||
if let Some(attachment_manager) = self.attachment_manager.take() {
|
||||
attachment_manager.terminate().await;
|
||||
}
|
||||
if let Some(storage_manager) = self.storage_manager.take() {
|
||||
storage_manager.terminate().await;
|
||||
}
|
||||
#[cfg(feature = "unstable-blockstore")]
|
||||
if let Some(block_store) = self.block_store.take() {
|
||||
block_store.terminate().await;
|
||||
}
|
||||
if let Some(crypto) = self.crypto.take() {
|
||||
crypto.terminate().await;
|
||||
}
|
||||
if let Some(table_store) = self.table_store.take() {
|
||||
table_store.terminate().await;
|
||||
}
|
||||
if let Some(protected_store) = self.protected_store.take() {
|
||||
protected_store.terminate().await;
|
||||
}
|
||||
if let Some(event_bus) = self.event_bus.take() {
|
||||
event_bus.shutdown().await;
|
||||
}
|
||||
|
||||
info!("Veilid API shutdown complete");
|
||||
|
||||
// api logger terminate is idempotent
|
||||
let (program_name, namespace) = {
|
||||
let config = self.config.get();
|
||||
(config.program_name.clone(), config.namespace.clone())
|
||||
};
|
||||
|
||||
if let Err(e) = ApiTracingLayer::remove_callback(program_name, namespace).await {
|
||||
error!("Error removing callback from ApiTracingLayer: {}", e);
|
||||
}
|
||||
|
||||
// send final shutdown update
|
||||
(self.update_callback)(VeilidUpdate::Shutdown);
|
||||
}
|
||||
}
|
||||
type InitKey = (String, String);
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
pub struct VeilidCoreContext {
|
||||
pub config: VeilidConfig,
|
||||
pub update_callback: UpdateCallback,
|
||||
// Event bus
|
||||
pub event_bus: EventBus,
|
||||
// Services
|
||||
pub crypto: Crypto,
|
||||
pub protected_store: ProtectedStore,
|
||||
pub table_store: TableStore,
|
||||
#[cfg(feature = "unstable-blockstore")]
|
||||
pub block_store: BlockStore,
|
||||
pub storage_manager: StorageManager,
|
||||
pub attachment_manager: AttachmentManager,
|
||||
pub(crate) struct VeilidCoreContext {
|
||||
registry: VeilidComponentRegistry,
|
||||
}
|
||||
|
||||
impl VeilidCoreContext {
|
||||
@ -245,10 +22,9 @@ impl VeilidCoreContext {
|
||||
config_callback: ConfigCallback,
|
||||
) -> VeilidAPIResult<VeilidCoreContext> {
|
||||
// Set up config from callback
|
||||
let mut config = VeilidConfig::new();
|
||||
config.setup(config_callback, update_callback.clone())?;
|
||||
let config = VeilidConfig::new_from_callback(config_callback, update_callback)?;
|
||||
|
||||
Self::new_common(update_callback, config).await
|
||||
Self::new_common(config).await
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", target = "core_context", err, skip_all)]
|
||||
@ -257,16 +33,12 @@ impl VeilidCoreContext {
|
||||
config_inner: VeilidConfigInner,
|
||||
) -> VeilidAPIResult<VeilidCoreContext> {
|
||||
// Set up config from json
|
||||
let mut config = VeilidConfig::new();
|
||||
config.setup_from_config(config_inner, update_callback.clone())?;
|
||||
Self::new_common(update_callback, config).await
|
||||
let config = VeilidConfig::new_from_config(config_inner, update_callback);
|
||||
Self::new_common(config).await
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", target = "core_context", err, skip_all)]
|
||||
async fn new_common(
|
||||
update_callback: UpdateCallback,
|
||||
config: VeilidConfig,
|
||||
) -> VeilidAPIResult<VeilidCoreContext> {
|
||||
async fn new_common(config: VeilidConfig) -> VeilidAPIResult<VeilidCoreContext> {
|
||||
cfg_if! {
|
||||
if #[cfg(target_os = "android")] {
|
||||
if !crate::intf::android::is_android_ready() {
|
||||
@ -275,19 +47,88 @@ impl VeilidCoreContext {
|
||||
}
|
||||
}
|
||||
|
||||
let sc = StartupShutdownContext::new_empty(config.clone(), update_callback);
|
||||
sc.startup().await.map_err(VeilidAPIError::generic)
|
||||
info!("Veilid API starting up");
|
||||
|
||||
let (program_name, namespace, update_callback) = {
|
||||
let cfginner = config.get();
|
||||
(
|
||||
cfginner.program_name.clone(),
|
||||
cfginner.namespace.clone(),
|
||||
config.update_callback(),
|
||||
)
|
||||
};
|
||||
|
||||
ApiTracingLayer::add_callback(program_name, namespace, update_callback.clone()).await?;
|
||||
|
||||
// Create component registry
|
||||
let registry = VeilidComponentRegistry::new(config);
|
||||
|
||||
// Register all components
|
||||
registry.register(ProtectedStore::new);
|
||||
// Initialize table store first, so crypto code can load caches
|
||||
// Tablestore can use crypto during init, just not any cached operations or things
|
||||
// that require flushing back to the tablestore
|
||||
registry.register(TableStore::new);
|
||||
registry.register(Crypto::new);
|
||||
#[cfg(feature = "unstable-blockstore")]
|
||||
registry.register(BlockStore::new);
|
||||
registry.register(StorageManager::new);
|
||||
registry.register(AttachmentManager::new);
|
||||
|
||||
// Run initialization
|
||||
registry.init().await.map_err(VeilidAPIError::internal)?;
|
||||
|
||||
info!("Veilid API startup complete");
|
||||
|
||||
Ok(Self { registry })
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", target = "core_context", skip_all)]
|
||||
async fn shutdown(self) {
|
||||
let sc = StartupShutdownContext::new_full(self);
|
||||
sc.shutdown().await;
|
||||
info!("Veilid API shutdown complete");
|
||||
|
||||
let (program_name, namespace, update_callback) = {
|
||||
let config = self.registry.config();
|
||||
let cfginner = config.get();
|
||||
(
|
||||
cfginner.program_name.clone(),
|
||||
cfginner.namespace.clone(),
|
||||
config.update_callback(),
|
||||
)
|
||||
};
|
||||
|
||||
self.registry.terminate().await;
|
||||
|
||||
if let Err(e) = ApiTracingLayer::remove_callback(program_name, namespace).await {
|
||||
error!("Error removing callback from ApiTracingLayer: {}", e);
|
||||
}
|
||||
|
||||
// send final shutdown update
|
||||
update_callback(VeilidUpdate::Shutdown);
|
||||
}
|
||||
|
||||
pub fn registry(&self) -> VeilidComponentRegistry {
|
||||
self.registry.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
pub trait RegisteredComponents: VeilidComponent {
|
||||
fn protected_store(&self) -> VeilidComponentGuard<'_, ProtectedStore> {
|
||||
self.registry().lookup::<ProtectedStore>().unwrap()
|
||||
}
|
||||
fn table_store(&self) -> VeilidComponentGuard<'_, TableStore> {
|
||||
self.registry().lookup::<TableStore>().unwrap()
|
||||
}
|
||||
fn crypto(&self) -> VeilidComponentGuard<'_, Crypto> {
|
||||
self.registry().lookup::<Crypto>().unwrap()
|
||||
}
|
||||
}
|
||||
impl<T: VeilidComponent> RegisteredComponents for T {}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
static ref INITIALIZED: Mutex<HashSet<(String,String)>> = Mutex::new(HashSet::new());
|
||||
static ref STARTUP_TABLE: AsyncTagLockTable<(String, String)> = AsyncTagLockTable::new();
|
||||
@ -384,7 +225,6 @@ pub async fn api_startup_config(
|
||||
// Get the program_name and namespace we're starting up in
|
||||
let program_name = config.program_name.clone();
|
||||
let namespace = config.namespace.clone();
|
||||
|
||||
let init_key = (program_name, namespace);
|
||||
|
||||
// Only allow one startup/shutdown per program_name+namespace combination simultaneously
|
||||
@ -410,8 +250,10 @@ pub async fn api_startup_config(
|
||||
#[instrument(level = "trace", target = "core_context", skip_all)]
|
||||
pub(crate) async fn api_shutdown(context: VeilidCoreContext) {
|
||||
let init_key = {
|
||||
let config = context.config.get();
|
||||
(config.program_name.clone(), config.namespace.clone())
|
||||
let registry = context.registry();
|
||||
let config = registry.config();
|
||||
let cfginner = config.get();
|
||||
(cfginner.program_name.clone(), cfginner.namespace.clone())
|
||||
};
|
||||
|
||||
// Only allow one startup/shutdown per program_name+namespace combination simultaneously
|
||||
|
@ -5,7 +5,7 @@ const VEILID_DOMAIN_API: &[u8] = b"VEILID_API";
|
||||
pub trait CryptoSystem {
|
||||
// Accessors
|
||||
fn kind(&self) -> CryptoKind;
|
||||
fn crypto(&self) -> Crypto;
|
||||
fn crypto(&self) -> VeilidComponentGuard<'_, Crypto>;
|
||||
|
||||
// Cached Operations
|
||||
fn cached_dh(&self, key: &PublicKey, secret: &SecretKey) -> VeilidAPIResult<SharedSecret>;
|
||||
|
@ -67,7 +67,7 @@ impl Envelope {
|
||||
|
||||
#[instrument(level = "trace", target = "envelope", skip_all)]
|
||||
pub fn from_signed_data(
|
||||
crypto: Crypto,
|
||||
crypto: &Crypto,
|
||||
data: &[u8],
|
||||
network_key: &Option<SharedSecret>,
|
||||
) -> VeilidAPIResult<Envelope> {
|
||||
@ -193,7 +193,7 @@ impl Envelope {
|
||||
#[instrument(level = "trace", target = "envelope", skip_all)]
|
||||
pub fn decrypt_body(
|
||||
&self,
|
||||
crypto: Crypto,
|
||||
crypto: &Crypto,
|
||||
data: &[u8],
|
||||
node_id_secret: &SecretKey,
|
||||
network_key: &Option<SharedSecret>,
|
||||
@ -226,7 +226,7 @@ impl Envelope {
|
||||
#[instrument(level = "trace", target = "envelope", skip_all, err)]
|
||||
pub fn to_encrypted_data(
|
||||
&self,
|
||||
crypto: Crypto,
|
||||
crypto: &Crypto,
|
||||
body: &[u8],
|
||||
node_id_secret: &SecretKey,
|
||||
network_key: &Option<SharedSecret>,
|
||||
|
@ -29,9 +29,21 @@ use core::convert::TryInto;
|
||||
use dh_cache::*;
|
||||
use hashlink::linked_hash_map::Entry;
|
||||
use hashlink::LruCache;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
/// Handle to a particular cryptosystem
|
||||
pub type CryptoSystemVersion = Arc<dyn CryptoSystem + Send + Sync>;
|
||||
/// Guard to access a particular cryptosystem
|
||||
pub struct CryptoSystemGuard<'a> {
|
||||
crypto_system: Arc<dyn CryptoSystem + Send + Sync>,
|
||||
_phantom: core::marker::PhantomData<&'a (dyn CryptoSystem + Send + Sync)>,
|
||||
}
|
||||
|
||||
impl<'a> core::ops::Deref for CryptoSystemGuard<'a> {
|
||||
type Target = dyn CryptoSystem + Send + Sync;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.crypto_system.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
cfg_if! {
|
||||
if #[cfg(all(feature = "enable-crypto-none", feature = "enable-crypto-vld0"))] {
|
||||
@ -72,89 +84,71 @@ pub fn best_envelope_version() -> EnvelopeVersion {
|
||||
struct CryptoInner {
|
||||
dh_cache: DHCache,
|
||||
flush_future: Option<SendPinBoxFuture<()>>,
|
||||
#[cfg(feature = "enable-crypto-vld0")]
|
||||
crypto_vld0: Option<Arc<dyn CryptoSystem + Send + Sync>>,
|
||||
#[cfg(feature = "enable-crypto-none")]
|
||||
crypto_none: Option<Arc<dyn CryptoSystem + Send + Sync>>,
|
||||
}
|
||||
|
||||
struct CryptoUnlockedInner {
|
||||
_event_bus: EventBus,
|
||||
config: VeilidConfig,
|
||||
table_store: TableStore,
|
||||
impl fmt::Debug for CryptoInner {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("CryptoInner")
|
||||
//.field("dh_cache", &self.dh_cache)
|
||||
// .field("flush_future", &self.flush_future)
|
||||
// .field("crypto_vld0", &self.crypto_vld0)
|
||||
// .field("crypto_none", &self.crypto_none)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Crypto factory implementation
|
||||
#[derive(Clone)]
|
||||
#[derive(Debug)]
|
||||
pub struct Crypto {
|
||||
unlocked_inner: Arc<CryptoUnlockedInner>,
|
||||
registry: VeilidComponentRegistry,
|
||||
inner: Arc<Mutex<CryptoInner>>,
|
||||
#[cfg(feature = "enable-crypto-vld0")]
|
||||
crypto_vld0: Arc<dyn CryptoSystem + Send + Sync>,
|
||||
#[cfg(feature = "enable-crypto-none")]
|
||||
crypto_none: Arc<dyn CryptoSystem + Send + Sync>,
|
||||
}
|
||||
|
||||
impl_veilid_component!(Crypto);
|
||||
|
||||
impl Crypto {
|
||||
fn new_inner() -> CryptoInner {
|
||||
CryptoInner {
|
||||
dh_cache: DHCache::new(DH_CACHE_SIZE),
|
||||
flush_future: None,
|
||||
#[cfg(feature = "enable-crypto-vld0")]
|
||||
crypto_vld0: None,
|
||||
#[cfg(feature = "enable-crypto-none")]
|
||||
crypto_none: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(event_bus: EventBus, config: VeilidConfig, table_store: TableStore) -> Self {
|
||||
let out = Self {
|
||||
unlocked_inner: Arc::new(CryptoUnlockedInner {
|
||||
_event_bus: event_bus,
|
||||
config,
|
||||
table_store,
|
||||
}),
|
||||
pub fn new(registry: VeilidComponentRegistry) -> Self {
|
||||
Self {
|
||||
registry: registry.clone(),
|
||||
inner: Arc::new(Mutex::new(Self::new_inner())),
|
||||
};
|
||||
|
||||
#[cfg(feature = "enable-crypto-vld0")]
|
||||
{
|
||||
out.inner.lock().crypto_vld0 = Some(Arc::new(vld0::CryptoSystemVLD0::new(out.clone())));
|
||||
#[cfg(feature = "enable-crypto-vld0")]
|
||||
crypto_vld0: Arc::new(vld0::CryptoSystemVLD0::new(registry.clone())),
|
||||
#[cfg(feature = "enable-crypto-none")]
|
||||
crypto_none: Arc::new(none::CryptoSystemNONE::new(registry.clone())),
|
||||
}
|
||||
|
||||
#[cfg(feature = "enable-crypto-none")]
|
||||
{
|
||||
out.inner.lock().crypto_none = Some(Arc::new(none::CryptoSystemNONE::new(out.clone())));
|
||||
}
|
||||
|
||||
out
|
||||
}
|
||||
|
||||
pub fn config(&self) -> VeilidConfig {
|
||||
self.unlocked_inner.config.clone()
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", target = "crypto", skip_all, err)]
|
||||
pub async fn init(&self) -> EyreResult<()> {
|
||||
let table_store = self.unlocked_inner.table_store.clone();
|
||||
async fn init_async(&self) -> EyreResult<()> {
|
||||
// Init node id from config
|
||||
if let Err(e) = self
|
||||
.unlocked_inner
|
||||
.config
|
||||
.init_node_ids(self.clone(), table_store.clone())
|
||||
.await
|
||||
{
|
||||
if let Err(e) = self.init_node_ids().await {
|
||||
return Err(e).wrap_err("init node id failed");
|
||||
}
|
||||
|
||||
// make local copy of node id for easy access
|
||||
let mut cache_validity_key: Vec<u8> = Vec::new();
|
||||
{
|
||||
let c = self.unlocked_inner.config.get();
|
||||
self.config().with(|c| {
|
||||
for ck in VALID_CRYPTO_KINDS {
|
||||
if let Some(nid) = c.network.routing_table.node_id.get(ck) {
|
||||
cache_validity_key.append(&mut nid.value.bytes.to_vec());
|
||||
}
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
// load caches if they are valid for this node id
|
||||
let table_store = self.table_store();
|
||||
|
||||
let mut db = table_store
|
||||
.open("crypto_caches", 1)
|
||||
.await
|
||||
@ -177,11 +171,11 @@ impl Crypto {
|
||||
}
|
||||
|
||||
// Schedule flushing
|
||||
let this = self.clone();
|
||||
let registry = self.registry();
|
||||
let flush_future = interval("crypto flush", 60000, move || {
|
||||
let this = this.clone();
|
||||
let crypto = registry.lookup::<Crypto>().unwrap();
|
||||
async move {
|
||||
if let Err(e) = this.flush().await {
|
||||
if let Err(e) = crypto.flush().await {
|
||||
warn!("flush failed: {}", e);
|
||||
}
|
||||
}
|
||||
@ -197,16 +191,12 @@ impl Crypto {
|
||||
cache_to_bytes(&inner.dh_cache)
|
||||
};
|
||||
|
||||
let db = self
|
||||
.unlocked_inner
|
||||
.table_store
|
||||
.open("crypto_caches", 1)
|
||||
.await?;
|
||||
let db = self.table_store().open("crypto_caches", 1).await?;
|
||||
db.store(0, b"dh_cache", &cache_bytes).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn terminate(&self) {
|
||||
async fn terminate_async(&self) {
|
||||
let flush_future = self.inner.lock().flush_future.take();
|
||||
if let Some(f) = flush_future {
|
||||
f.await;
|
||||
@ -223,19 +213,25 @@ impl Crypto {
|
||||
}
|
||||
|
||||
/// Factory method to get a specific crypto version
|
||||
pub fn get(&self, kind: CryptoKind) -> Option<CryptoSystemVersion> {
|
||||
pub fn get(&self, kind: CryptoKind) -> Option<CryptoSystemGuard<'_>> {
|
||||
let inner = self.inner.lock();
|
||||
match kind {
|
||||
#[cfg(feature = "enable-crypto-vld0")]
|
||||
CRYPTO_KIND_VLD0 => Some(inner.crypto_vld0.clone().unwrap()),
|
||||
CRYPTO_KIND_VLD0 => Some(CryptoSystemGuard {
|
||||
crypto_system: inner.crypto_vld0.clone().unwrap(),
|
||||
_phantom: PhantomData {},
|
||||
}),
|
||||
#[cfg(feature = "enable-crypto-none")]
|
||||
CRYPTO_KIND_NONE => Some(inner.crypto_none.clone().unwrap()),
|
||||
CRYPTO_KIND_NONE => Some(CryptoSystemGuard {
|
||||
crypto_system: inner.crypto_none.clone().unwrap(),
|
||||
_phantom: PhantomData {},
|
||||
}),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
// Factory method to get the best crypto version
|
||||
pub fn best(&self) -> CryptoSystemVersion {
|
||||
pub fn best(&self) -> CryptoSystemGuard<'_> {
|
||||
self.get(best_crypto_kind()).unwrap()
|
||||
}
|
||||
|
||||
@ -331,4 +327,118 @@ impl Crypto {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
async fn init_node_id(
|
||||
&self,
|
||||
vcrypto: CryptoSystemGuard<'_>,
|
||||
) -> VeilidAPIResult<(TypedKey, TypedSecret)> {
|
||||
let config = self.config();
|
||||
let ck = vcrypto.kind();
|
||||
let (mut node_id, mut node_id_secret) = config.with(|c| {
|
||||
(
|
||||
c.network.routing_table.node_id.get(ck),
|
||||
c.network.routing_table.node_id_secret.get(ck),
|
||||
)
|
||||
});
|
||||
|
||||
// See if node id was previously stored in the table store
|
||||
let table_store = self.table_store();
|
||||
|
||||
let config_table = table_store.open("__veilid_config", 1).await?;
|
||||
|
||||
let table_key_node_id = format!("node_id_{}", ck);
|
||||
let table_key_node_id_secret = format!("node_id_secret_{}", ck);
|
||||
|
||||
if node_id.is_none() {
|
||||
log_tstore!(debug "pulling {} from storage", table_key_node_id);
|
||||
if let Ok(Some(stored_node_id)) = config_table
|
||||
.load_json::<TypedKey>(0, table_key_node_id.as_bytes())
|
||||
.await
|
||||
{
|
||||
log_tstore!(debug "{} found in storage", table_key_node_id);
|
||||
node_id = Some(stored_node_id);
|
||||
} else {
|
||||
log_tstore!(debug "{} not found in storage", table_key_node_id);
|
||||
}
|
||||
}
|
||||
|
||||
// See if node id secret was previously stored in the protected store
|
||||
if node_id_secret.is_none() {
|
||||
log_tstore!(debug "pulling {} from storage", table_key_node_id_secret);
|
||||
if let Ok(Some(stored_node_id_secret)) = config_table
|
||||
.load_json::<TypedSecret>(0, table_key_node_id_secret.as_bytes())
|
||||
.await
|
||||
{
|
||||
log_tstore!(debug "{} found in storage", table_key_node_id_secret);
|
||||
node_id_secret = Some(stored_node_id_secret);
|
||||
} else {
|
||||
log_tstore!(debug "{} not found in storage", table_key_node_id_secret);
|
||||
}
|
||||
}
|
||||
|
||||
// If we have a node id from storage, check it
|
||||
let (node_id, node_id_secret) =
|
||||
if let (Some(node_id), Some(node_id_secret)) = (node_id, node_id_secret) {
|
||||
// Validate node id
|
||||
if !vcrypto.validate_keypair(&node_id.value, &node_id_secret.value) {
|
||||
apibail_generic!(format!(
|
||||
"node_id_secret_{} and node_id_key_{} don't match",
|
||||
ck, ck
|
||||
));
|
||||
}
|
||||
(node_id, node_id_secret)
|
||||
} else {
|
||||
// If we still don't have a valid node id, generate one
|
||||
log_tstore!(debug "generating new node_id_{}", ck);
|
||||
let kp = vcrypto.generate_keypair();
|
||||
(TypedKey::new(ck, kp.key), TypedSecret::new(ck, kp.secret))
|
||||
};
|
||||
info!("Node Id: {}", node_id);
|
||||
|
||||
// Save the node id / secret in storage
|
||||
config_table
|
||||
.store_json(0, table_key_node_id.as_bytes(), &node_id)
|
||||
.await?;
|
||||
config_table
|
||||
.store_json(0, table_key_node_id_secret.as_bytes(), &node_id_secret)
|
||||
.await?;
|
||||
|
||||
Ok((node_id, node_id_secret))
|
||||
}
|
||||
|
||||
/// Get the node id from config if one is specified.
|
||||
/// Must be done -after- protected store startup.
|
||||
#[cfg_attr(test, allow(unused_variables))]
|
||||
pub(crate) async fn init_node_ids(&self) -> VeilidAPIResult<()> {
|
||||
let mut out_node_id = TypedKeyGroup::new();
|
||||
let mut out_node_id_secret = TypedSecretGroup::new();
|
||||
|
||||
for ck in VALID_CRYPTO_KINDS {
|
||||
let vcrypto = self
|
||||
.get(ck)
|
||||
.expect("Valid crypto kind is not actually valid.");
|
||||
|
||||
#[cfg(test)]
|
||||
let (node_id, node_id_secret) = {
|
||||
let kp = vcrypto.generate_keypair();
|
||||
(TypedKey::new(ck, kp.key), TypedSecret::new(ck, kp.secret))
|
||||
};
|
||||
#[cfg(not(test))]
|
||||
let (node_id, node_id_secret) = self.init_node_id(vcrypto).await?;
|
||||
|
||||
// Save for config
|
||||
out_node_id.add(node_id);
|
||||
out_node_id_secret.add(node_id_secret);
|
||||
}
|
||||
|
||||
// Commit back to config
|
||||
self.config().try_with_mut(|c| {
|
||||
c.network.routing_table.node_id = out_node_id;
|
||||
c.network.routing_table.node_id_secret = out_node_id_secret;
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -49,14 +49,13 @@ fn is_bytes_eq_32(a: &[u8], v: u8) -> bool {
|
||||
}
|
||||
|
||||
/// None CryptoSystem
|
||||
#[derive(Clone)]
|
||||
pub struct CryptoSystemNONE {
|
||||
crypto: Crypto,
|
||||
registry: VeilidComponentRegistry,
|
||||
}
|
||||
|
||||
impl CryptoSystemNONE {
|
||||
pub fn new(crypto: Crypto) -> Self {
|
||||
Self { crypto }
|
||||
pub fn new(registry: VeilidComponentRegistry) -> Self {
|
||||
Self { registry }
|
||||
}
|
||||
}
|
||||
|
||||
@ -66,13 +65,13 @@ impl CryptoSystem for CryptoSystemNONE {
|
||||
CRYPTO_KIND_NONE
|
||||
}
|
||||
|
||||
fn crypto(&self) -> Crypto {
|
||||
self.crypto.clone()
|
||||
fn crypto(&self) -> VeilidComponentGuard<'_, Crypto> {
|
||||
self.registry().lookup::<Crypto>().unwrap()
|
||||
}
|
||||
|
||||
// Cached Operations
|
||||
fn cached_dh(&self, key: &PublicKey, secret: &SecretKey) -> VeilidAPIResult<SharedSecret> {
|
||||
self.crypto
|
||||
self.crypto()
|
||||
.cached_dh_internal::<CryptoSystemNONE>(self, key, secret)
|
||||
}
|
||||
|
||||
|
@ -68,7 +68,7 @@ impl Receipt {
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", target = "receipt", skip_all, err)]
|
||||
pub fn from_signed_data(crypto: Crypto, data: &[u8]) -> VeilidAPIResult<Receipt> {
|
||||
pub fn from_signed_data(crypto: &Crypto, data: &[u8]) -> VeilidAPIResult<Receipt> {
|
||||
// Ensure we are at least the length of the envelope
|
||||
if data.len() < MIN_RECEIPT_SIZE {
|
||||
apibail_parse_error!("receipt too small", data.len());
|
||||
@ -157,7 +157,7 @@ impl Receipt {
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", target = "receipt", skip_all, err)]
|
||||
pub fn to_signed_data(&self, crypto: Crypto, secret: &SecretKey) -> VeilidAPIResult<Vec<u8>> {
|
||||
pub fn to_signed_data(&self, crypto: &Crypto, secret: &SecretKey) -> VeilidAPIResult<Vec<u8>> {
|
||||
// Ensure extra data isn't too long
|
||||
let receipt_size: usize = self.extra_data.len() + MIN_RECEIPT_SIZE;
|
||||
if receipt_size > MAX_RECEIPT_SIZE {
|
||||
|
@ -2,7 +2,7 @@ use super::*;
|
||||
|
||||
static LOREM_IPSUM:&[u8] = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. ";
|
||||
|
||||
pub async fn test_aead(vcrypto: CryptoSystemVersion) {
|
||||
pub async fn test_aead(vcrypto: &CryptoSystemGuard<'_>) {
|
||||
trace!("test_aead");
|
||||
|
||||
let n1 = vcrypto.random_nonce();
|
||||
@ -82,7 +82,7 @@ pub async fn test_aead(vcrypto: CryptoSystemVersion) {
|
||||
assert_eq!(body5, body7);
|
||||
}
|
||||
|
||||
pub async fn test_no_auth(vcrypto: CryptoSystemVersion) {
|
||||
pub async fn test_no_auth(vcrypto: &CryptoSystemGuard<'_>) {
|
||||
trace!("test_no_auth");
|
||||
|
||||
let n1 = vcrypto.random_nonce();
|
||||
@ -136,7 +136,7 @@ pub async fn test_no_auth(vcrypto: CryptoSystemVersion) {
|
||||
assert_eq!(body5, body7);
|
||||
}
|
||||
|
||||
pub async fn test_dh(vcrypto: CryptoSystemVersion) {
|
||||
pub async fn test_dh(vcrypto: &CryptoSystemGuard<'_>) {
|
||||
trace!("test_dh");
|
||||
let (dht_key, dht_key_secret) = vcrypto.generate_keypair().into_split();
|
||||
assert!(vcrypto.validate_keypair(&dht_key, &dht_key_secret));
|
||||
@ -164,7 +164,7 @@ pub async fn test_dh(vcrypto: CryptoSystemVersion) {
|
||||
trace!("cached_dh: {:?}", r5);
|
||||
}
|
||||
|
||||
pub async fn test_generation(vcrypto: CryptoSystemVersion) {
|
||||
pub async fn test_generation(vcrypto: &CryptoSystemGuard<'_>) {
|
||||
let b1 = vcrypto.random_bytes(32);
|
||||
let b2 = vcrypto.random_bytes(32);
|
||||
assert_ne!(b1, b2);
|
||||
@ -231,10 +231,10 @@ pub async fn test_all() {
|
||||
// Test versions
|
||||
for v in VALID_CRYPTO_KINDS {
|
||||
let vcrypto = crypto.get(v).unwrap();
|
||||
test_aead(vcrypto.clone()).await;
|
||||
test_no_auth(vcrypto.clone()).await;
|
||||
test_dh(vcrypto.clone()).await;
|
||||
test_generation(vcrypto).await;
|
||||
test_aead(&vcrypto).await;
|
||||
test_no_auth(&vcrypto).await;
|
||||
test_dh(&vcrypto).await;
|
||||
test_generation(&vcrypto).await;
|
||||
}
|
||||
|
||||
crypto_tests_shutdown(api.clone()).await;
|
||||
|
@ -2,9 +2,10 @@ use super::*;
|
||||
|
||||
pub async fn test_envelope_round_trip(
|
||||
envelope_version: EnvelopeVersion,
|
||||
vcrypto: CryptoSystemVersion,
|
||||
vcrypto: &CryptoSystemGuard<'_>,
|
||||
network_key: Option<SharedSecret>,
|
||||
) {
|
||||
let crypto = vcrypto.crypto();
|
||||
if network_key.is_some() {
|
||||
info!(
|
||||
"--- test envelope round trip {} w/network key ---",
|
||||
@ -33,15 +34,15 @@ pub async fn test_envelope_round_trip(
|
||||
|
||||
// Serialize to bytes
|
||||
let enc_data = envelope
|
||||
.to_encrypted_data(vcrypto.crypto(), body, &sender_secret, &network_key)
|
||||
.to_encrypted_data(&crypto, body, &sender_secret, &network_key)
|
||||
.expect("failed to encrypt data");
|
||||
|
||||
// Deserialize from bytes
|
||||
let envelope2 = Envelope::from_signed_data(vcrypto.crypto(), &enc_data, &network_key)
|
||||
let envelope2 = Envelope::from_signed_data(&crypto, &enc_data, &network_key)
|
||||
.expect("failed to deserialize envelope from data");
|
||||
|
||||
let body2 = envelope2
|
||||
.decrypt_body(vcrypto.crypto(), &enc_data, &recipient_secret, &network_key)
|
||||
.decrypt_body(&crypto, &enc_data, &recipient_secret, &network_key)
|
||||
.expect("failed to decrypt envelope body");
|
||||
|
||||
// Compare envelope and body
|
||||
@ -53,21 +54,22 @@ pub async fn test_envelope_round_trip(
|
||||
let mut mod_enc_data = enc_data.clone();
|
||||
mod_enc_data[enc_data_len - 1] ^= 0x80u8;
|
||||
assert!(
|
||||
Envelope::from_signed_data(vcrypto.crypto(), &mod_enc_data, &network_key).is_err(),
|
||||
Envelope::from_signed_data(&crypto, &mod_enc_data, &network_key).is_err(),
|
||||
"should have failed to decode envelope with modified signature"
|
||||
);
|
||||
let mut mod_enc_data2 = enc_data.clone();
|
||||
mod_enc_data2[enc_data_len - 65] ^= 0x80u8;
|
||||
assert!(
|
||||
Envelope::from_signed_data(vcrypto.crypto(), &mod_enc_data2, &network_key).is_err(),
|
||||
Envelope::from_signed_data(&crypto, &mod_enc_data2, &network_key).is_err(),
|
||||
"should have failed to decode envelope with modified data"
|
||||
);
|
||||
}
|
||||
|
||||
pub async fn test_receipt_round_trip(
|
||||
envelope_version: EnvelopeVersion,
|
||||
vcrypto: CryptoSystemVersion,
|
||||
vcrypto: &CryptoSystemGuard<'_>,
|
||||
) {
|
||||
let crypto = vcrypto.crypto();
|
||||
info!("--- test receipt round trip ---");
|
||||
// Create arbitrary body
|
||||
let body = b"This is an arbitrary body";
|
||||
@ -80,16 +82,16 @@ pub async fn test_receipt_round_trip(
|
||||
|
||||
// Serialize to bytes
|
||||
let mut enc_data = receipt
|
||||
.to_signed_data(vcrypto.crypto(), &sender_secret)
|
||||
.to_signed_data(&crypto, &sender_secret)
|
||||
.expect("failed to make signed data");
|
||||
|
||||
// Deserialize from bytes
|
||||
let receipt2 = Receipt::from_signed_data(vcrypto.crypto(), &enc_data)
|
||||
let receipt2 = Receipt::from_signed_data(&crypto, &enc_data)
|
||||
.expect("failed to deserialize envelope from data");
|
||||
|
||||
// Should not validate even when a single bit is changed
|
||||
enc_data[5] = 0x01;
|
||||
Receipt::from_signed_data(vcrypto.crypto(), &enc_data)
|
||||
Receipt::from_signed_data(&crypto, &enc_data)
|
||||
.expect_err("should have failed to decrypt using wrong secret");
|
||||
|
||||
// Compare receipts
|
||||
@ -105,10 +107,9 @@ pub async fn test_all() {
|
||||
for v in VALID_CRYPTO_KINDS {
|
||||
let vcrypto = crypto.get(v).unwrap();
|
||||
|
||||
test_envelope_round_trip(ev, vcrypto.clone(), None).await;
|
||||
test_envelope_round_trip(ev, vcrypto.clone(), Some(vcrypto.random_shared_secret()))
|
||||
.await;
|
||||
test_receipt_round_trip(ev, vcrypto).await;
|
||||
test_envelope_round_trip(ev, &vcrypto, None).await;
|
||||
test_envelope_round_trip(ev, &vcrypto, Some(vcrypto.random_shared_secret())).await;
|
||||
test_receipt_round_trip(ev, &vcrypto).await;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -6,7 +6,7 @@ static CHEEZBURGER: &str = "I can has cheezburger";
|
||||
static EMPTY_KEY: [u8; PUBLIC_KEY_LENGTH] = [0u8; PUBLIC_KEY_LENGTH];
|
||||
static EMPTY_KEY_SECRET: [u8; SECRET_KEY_LENGTH] = [0u8; SECRET_KEY_LENGTH];
|
||||
|
||||
pub async fn test_generate_secret(vcrypto: CryptoSystemVersion) {
|
||||
pub async fn test_generate_secret(vcrypto: &CryptoSystemGuard<'_>) {
|
||||
// Verify keys generate
|
||||
let (dht_key, dht_key_secret) = vcrypto.generate_keypair().into_split();
|
||||
let (dht_key2, dht_key_secret2) = vcrypto.generate_keypair().into_split();
|
||||
@ -20,7 +20,7 @@ pub async fn test_generate_secret(vcrypto: CryptoSystemVersion) {
|
||||
assert_ne!(dht_key_secret, dht_key_secret2);
|
||||
}
|
||||
|
||||
pub async fn test_sign_and_verify(vcrypto: CryptoSystemVersion) {
|
||||
pub async fn test_sign_and_verify(vcrypto: &CryptoSystemGuard<'_>) {
|
||||
// Make two keys
|
||||
let (dht_key, dht_key_secret) = vcrypto.generate_keypair().into_split();
|
||||
let (dht_key2, dht_key_secret2) = vcrypto.generate_keypair().into_split();
|
||||
@ -115,7 +115,7 @@ pub async fn test_sign_and_verify(vcrypto: CryptoSystemVersion) {
|
||||
);
|
||||
}
|
||||
|
||||
pub async fn test_key_conversions(vcrypto: CryptoSystemVersion) {
|
||||
pub async fn test_key_conversions(vcrypto: &CryptoSystemGuard<'_>) {
|
||||
// Test default key
|
||||
let (dht_key, dht_key_secret) = (PublicKey::default(), SecretKey::default());
|
||||
assert_eq!(dht_key.bytes, EMPTY_KEY);
|
||||
@ -185,7 +185,7 @@ pub async fn test_key_conversions(vcrypto: CryptoSystemVersion) {
|
||||
.is_err());
|
||||
}
|
||||
|
||||
pub async fn test_encode_decode(vcrypto: CryptoSystemVersion) {
|
||||
pub async fn test_encode_decode(vcrypto: &CryptoSystemGuard<'_>) {
|
||||
let dht_key = PublicKey::try_decode("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").unwrap();
|
||||
let dht_key_secret =
|
||||
SecretKey::try_decode("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").unwrap();
|
||||
@ -229,7 +229,7 @@ pub async fn test_encode_decode(vcrypto: CryptoSystemVersion) {
|
||||
assert!(f2.is_err());
|
||||
}
|
||||
|
||||
pub async fn test_typed_convert(vcrypto: CryptoSystemVersion) {
|
||||
pub async fn test_typed_convert(vcrypto: &CryptoSystemGuard<'_>) {
|
||||
let tks1 = format!(
|
||||
"{}:7lxDEabK_qgjbe38RtBa3IZLrud84P6NhGP-pRTZzdQ",
|
||||
vcrypto.kind()
|
||||
@ -261,7 +261,7 @@ pub async fn test_typed_convert(vcrypto: CryptoSystemVersion) {
|
||||
assert!(tks6x.ends_with(&tks6));
|
||||
}
|
||||
|
||||
async fn test_hash(vcrypto: CryptoSystemVersion) {
|
||||
async fn test_hash(vcrypto: &CryptoSystemGuard<'_>) {
|
||||
let mut s = BTreeSet::<PublicKey>::new();
|
||||
|
||||
let k1 = vcrypto.generate_hash("abc".as_bytes());
|
||||
@ -301,7 +301,7 @@ async fn test_hash(vcrypto: CryptoSystemVersion) {
|
||||
vcrypto.validate_hash(CHEEZBURGER.as_bytes(), &v6);
|
||||
}
|
||||
|
||||
async fn test_operations(vcrypto: CryptoSystemVersion) {
|
||||
async fn test_operations(vcrypto: &CryptoSystemGuard<'_>) {
|
||||
let k1 = vcrypto.generate_hash(LOREM_IPSUM.as_bytes());
|
||||
let k2 = vcrypto.generate_hash(CHEEZBURGER.as_bytes());
|
||||
let k3 = vcrypto.generate_hash("abc".as_bytes());
|
||||
@ -395,13 +395,13 @@ pub async fn test_all() {
|
||||
for v in VALID_CRYPTO_KINDS {
|
||||
let vcrypto = crypto.get(v).unwrap();
|
||||
|
||||
test_generate_secret(vcrypto.clone()).await;
|
||||
test_sign_and_verify(vcrypto.clone()).await;
|
||||
test_key_conversions(vcrypto.clone()).await;
|
||||
test_encode_decode(vcrypto.clone()).await;
|
||||
test_typed_convert(vcrypto.clone()).await;
|
||||
test_hash(vcrypto.clone()).await;
|
||||
test_operations(vcrypto).await;
|
||||
test_generate_secret(&vcrypto).await;
|
||||
test_sign_and_verify(&vcrypto).await;
|
||||
test_key_conversions(&vcrypto).await;
|
||||
test_encode_decode(&vcrypto).await;
|
||||
test_typed_convert(&vcrypto).await;
|
||||
test_hash(&vcrypto).await;
|
||||
test_operations(&vcrypto).await;
|
||||
}
|
||||
|
||||
crypto_tests_shutdown(api.clone()).await;
|
||||
|
@ -47,14 +47,13 @@ pub fn vld0_generate_keypair() -> KeyPair {
|
||||
}
|
||||
|
||||
/// V0 CryptoSystem
|
||||
#[derive(Clone)]
|
||||
pub struct CryptoSystemVLD0 {
|
||||
crypto: Crypto,
|
||||
registry: VeilidComponentRegistry,
|
||||
}
|
||||
|
||||
impl CryptoSystemVLD0 {
|
||||
pub fn new(crypto: Crypto) -> Self {
|
||||
Self { crypto }
|
||||
pub fn new(registry: VeilidComponentRegistry) -> Self {
|
||||
Self { registry }
|
||||
}
|
||||
}
|
||||
|
||||
@ -64,14 +63,14 @@ impl CryptoSystem for CryptoSystemVLD0 {
|
||||
CRYPTO_KIND_VLD0
|
||||
}
|
||||
|
||||
fn crypto(&self) -> Crypto {
|
||||
self.crypto.clone()
|
||||
fn crypto(&self) -> VeilidComponentGuard<'_, Crypto> {
|
||||
self.registry.lookup::<Crypto>().unwrap()
|
||||
}
|
||||
|
||||
// Cached Operations
|
||||
#[instrument(level = "trace", skip_all)]
|
||||
fn cached_dh(&self, key: &PublicKey, secret: &SecretKey) -> VeilidAPIResult<SharedSecret> {
|
||||
self.crypto
|
||||
self.crypto()
|
||||
.cached_dh_internal::<CryptoSystemVLD0>(self, key, secret)
|
||||
}
|
||||
|
||||
|
@ -4,31 +4,37 @@ struct BlockStoreInner {
|
||||
//
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
impl fmt::Debug for BlockStoreInner {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("BlockStoreInner").finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct BlockStore {
|
||||
event_bus: EventBus,
|
||||
config: VeilidConfig,
|
||||
registry: VeilidComponentRegistry,
|
||||
inner: Arc<Mutex<BlockStoreInner>>,
|
||||
}
|
||||
|
||||
impl_veilid_component!(BlockStore);
|
||||
|
||||
impl BlockStore {
|
||||
fn new_inner() -> BlockStoreInner {
|
||||
BlockStoreInner {}
|
||||
}
|
||||
pub fn new(event_bus: EventBus, config: VeilidConfig) -> Self {
|
||||
pub fn new(registry: VeilidComponentRegistry) -> Self {
|
||||
Self {
|
||||
event_bus,
|
||||
config,
|
||||
registry,
|
||||
inner: Arc::new(Mutex::new(Self::new_inner())),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn init(&self) -> EyreResult<()> {
|
||||
async fn init_async(&self) -> EyreResult<()> {
|
||||
// Ensure permissions are correct
|
||||
// ensure_file_private_owner(&dbpath)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn terminate(&self) {}
|
||||
async fn terminate_async(&self) {}
|
||||
}
|
||||
|
@ -6,14 +6,20 @@ use std::path::Path;
|
||||
pub struct ProtectedStoreInner {
|
||||
keyring_manager: Option<KeyringManager>,
|
||||
}
|
||||
impl fmt::Debug for ProtectedStoreInner {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("ProtectedStoreInner").finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Debug)]
|
||||
pub struct ProtectedStore {
|
||||
_event_bus: EventBus,
|
||||
config: VeilidConfig,
|
||||
registry: VeilidComponentRegistry,
|
||||
inner: Arc<Mutex<ProtectedStoreInner>>,
|
||||
}
|
||||
|
||||
impl_veilid_component!(ProtectedStore);
|
||||
|
||||
impl ProtectedStore {
|
||||
fn new_inner() -> ProtectedStoreInner {
|
||||
ProtectedStoreInner {
|
||||
@ -21,10 +27,9 @@ impl ProtectedStore {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(event_bus: EventBus, config: VeilidConfig) -> Self {
|
||||
pub fn new(registry: VeilidComponentRegistry) -> Self {
|
||||
Self {
|
||||
_event_bus: event_bus,
|
||||
config,
|
||||
registry,
|
||||
inner: Arc::new(Mutex::new(Self::new_inner())),
|
||||
}
|
||||
}
|
||||
@ -42,9 +47,10 @@ impl ProtectedStore {
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self), err)]
|
||||
pub async fn init(&self) -> EyreResult<()> {
|
||||
async fn init_async(&self) -> EyreResult<()> {
|
||||
let delete = {
|
||||
let c = self.config.get();
|
||||
let config = self.config();
|
||||
let c = config.get();
|
||||
let mut inner = self.inner.lock();
|
||||
if !c.protected_store.always_use_insecure_storage {
|
||||
// Attempt to open the secure keyring
|
||||
@ -102,12 +108,13 @@ impl ProtectedStore {
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
pub async fn terminate(&self) {
|
||||
async fn terminate_async(&self) {
|
||||
*self.inner.lock() = Self::new_inner();
|
||||
}
|
||||
|
||||
fn service_name(&self) -> String {
|
||||
let c = self.config.get();
|
||||
let config = self.config();
|
||||
let c = config.get();
|
||||
if c.namespace.is_empty() {
|
||||
"veilid_protected_store".to_owned()
|
||||
} else {
|
||||
|
@ -4,28 +4,34 @@ struct BlockStoreInner {
|
||||
//
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
impl fmt::Debug for BlockStoreInner {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("BlockStoreInner").finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct BlockStore {
|
||||
event_bus: EventBus,
|
||||
config: VeilidConfig,
|
||||
registry: VeilidComponentRegistry,
|
||||
inner: Arc<Mutex<BlockStoreInner>>,
|
||||
}
|
||||
|
||||
impl_veilid_component!(BlockStore);
|
||||
|
||||
impl BlockStore {
|
||||
fn new_inner() -> BlockStoreInner {
|
||||
BlockStoreInner {}
|
||||
}
|
||||
pub fn new(event_bus: EventBus, config: VeilidConfig) -> Self {
|
||||
pub fn new(registry: VeilidComponentRegistry) -> Self {
|
||||
Self {
|
||||
event_bus,
|
||||
config,
|
||||
registry,
|
||||
inner: Arc::new(Mutex::new(Self::new_inner())),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn init(&self) -> EyreResult<()> {
|
||||
async fn init_async(&self) -> EyreResult<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn terminate(&self) {}
|
||||
async fn terminate_async(&self) {}
|
||||
}
|
||||
|
@ -3,18 +3,16 @@ use data_encoding::BASE64URL_NOPAD;
|
||||
|
||||
use web_sys::*;
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Debug)]
|
||||
pub struct ProtectedStore {
|
||||
_event_bus: EventBus,
|
||||
config: VeilidConfig,
|
||||
registry: VeilidComponentRegistry,
|
||||
}
|
||||
|
||||
impl_veilid_component!(ProtectedStore);
|
||||
|
||||
impl ProtectedStore {
|
||||
pub fn new(event_bus: EventBus, config: VeilidConfig) -> Self {
|
||||
Self {
|
||||
_event_bus: event_bus,
|
||||
config,
|
||||
}
|
||||
pub fn new(registry: VeilidComponentRegistry) -> Self {
|
||||
Self { registry }
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip(self), err)]
|
||||
@ -38,7 +36,7 @@ impl ProtectedStore {
|
||||
pub async fn terminate(&self) {}
|
||||
|
||||
fn browser_key_name(&self, key: &str) -> String {
|
||||
let c = self.config.get();
|
||||
let c = self.config();
|
||||
if c.namespace.is_empty() {
|
||||
format!("__veilid_protected_store_{}", key)
|
||||
} else {
|
||||
|
@ -45,6 +45,7 @@ cfg_if::cfg_if! {
|
||||
extern crate alloc;
|
||||
|
||||
mod attachment_manager;
|
||||
mod component;
|
||||
mod core_context;
|
||||
mod crypto;
|
||||
mod intf;
|
||||
@ -58,6 +59,8 @@ mod veilid_api;
|
||||
mod veilid_config;
|
||||
mod wasm_helpers;
|
||||
|
||||
pub(crate) use self::component::*;
|
||||
pub(crate) use self::core_context::RegisteredComponents;
|
||||
pub use self::core_context::{api_startup, api_startup_config, api_startup_json, UpdateCallback};
|
||||
pub use self::logging::{
|
||||
ApiTracingLayer, VeilidLayerFilter, DEFAULT_LOG_FACILITIES_ENABLED_LIST,
|
||||
|
@ -3,21 +3,12 @@ use super::*;
|
||||
pub mod test_serialize_routing_table;
|
||||
|
||||
pub(crate) fn mock_routing_table() -> routing_table::RoutingTable {
|
||||
let event_bus = EventBus::new();
|
||||
let veilid_config = VeilidConfig::new();
|
||||
#[cfg(feature = "unstable-blockstore")]
|
||||
let block_store = BlockStore::new(event_bus.clone(), veilid_config.clone());
|
||||
let protected_store = ProtectedStore::new(event_bus.clone(), veilid_config.clone());
|
||||
let table_store = TableStore::new(
|
||||
event_bus.clone(),
|
||||
veilid_config.clone(),
|
||||
protected_store.clone(),
|
||||
);
|
||||
let crypto = Crypto::new(
|
||||
event_bus.clone(),
|
||||
veilid_config.clone(),
|
||||
table_store.clone(),
|
||||
);
|
||||
let veilid_config =
|
||||
VeilidConfig::new_from_config(VeilidConfigInner::default(), Arc::new(|_| {}));
|
||||
let registry = VeilidComponentRegistry::new(veilid_config);
|
||||
registry.register(ProtectedStore::new);
|
||||
registry.register(TableStore::new);
|
||||
registry.register(Crypto::new);
|
||||
let storage_manager = storage_manager::StorageManager::new(
|
||||
event_bus.clone(),
|
||||
veilid_config.clone(),
|
||||
|
@ -38,15 +38,14 @@ impl StorageManager {
|
||||
let routing_domain = RoutingDomain::PublicInternet;
|
||||
|
||||
// Get the DHT parameters for 'GetValue'
|
||||
let (key_count, consensus_count, fanout, timeout_us) = {
|
||||
let c = self.unlocked_inner.config.get();
|
||||
let (key_count, consensus_count, fanout, timeout_us) = self.config().with(|c| {
|
||||
(
|
||||
c.network.dht.max_find_node_count as usize,
|
||||
c.network.dht.get_value_count as usize,
|
||||
c.network.dht.get_value_fanout as usize,
|
||||
TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)),
|
||||
)
|
||||
};
|
||||
});
|
||||
|
||||
// Get the nodes we know are caching this value to seed the fanout
|
||||
let init_fanout_queue = {
|
||||
|
@ -64,9 +64,7 @@ impl StorageManager {
|
||||
|
||||
// Get the DHT parameters for 'InspectValue'
|
||||
// Can use either 'get scope' or 'set scope' depending on the purpose of the inspection
|
||||
let (key_count, consensus_count, fanout, timeout_us) = {
|
||||
let c = self.unlocked_inner.config.get();
|
||||
|
||||
let (key_count, consensus_count, fanout, timeout_us) = self.config().with(|c| {
|
||||
if use_set_scope {
|
||||
(
|
||||
c.network.dht.max_find_node_count as usize,
|
||||
@ -82,7 +80,7 @@ impl StorageManager {
|
||||
TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)),
|
||||
)
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
// Get the nodes we know are caching this value to seed the fanout
|
||||
let init_fanout_queue = {
|
||||
|
@ -44,13 +44,6 @@ struct ValueChangedInfo {
|
||||
}
|
||||
|
||||
struct StorageManagerUnlockedInner {
|
||||
_event_bus: EventBus,
|
||||
config: VeilidConfig,
|
||||
crypto: Crypto,
|
||||
table_store: TableStore,
|
||||
#[cfg(feature = "unstable-blockstore")]
|
||||
block_store: BlockStore,
|
||||
|
||||
// Background processes
|
||||
flush_record_stores_task: TickTask<EyreReport>,
|
||||
offline_subkey_writes_task: TickTask<EyreReport>,
|
||||
@ -62,20 +55,36 @@ struct StorageManagerUnlockedInner {
|
||||
anonymous_watch_keys: TypedKeyPairGroup,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
impl fmt::Debug for StorageManagerUnlockedInner {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("StorageManagerUnlockedInner")
|
||||
// .field("flush_record_stores_task", &self.flush_record_stores_task)
|
||||
// .field(
|
||||
// "offline_subkey_writes_task",
|
||||
// &self.offline_subkey_writes_task,
|
||||
// )
|
||||
// .field("send_value_changes_task", &self.send_value_changes_task)
|
||||
// .field("check_active_watches_task", &self.check_active_watches_task)
|
||||
// .field(
|
||||
// "check_watched_records_task",
|
||||
// &self.check_watched_records_task,
|
||||
// )
|
||||
.field("anonymous_watch_keys", &self.anonymous_watch_keys)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct StorageManager {
|
||||
registry: VeilidComponentRegistry,
|
||||
unlocked_inner: Arc<StorageManagerUnlockedInner>,
|
||||
inner: Arc<AsyncMutex<StorageManagerInner>>,
|
||||
}
|
||||
|
||||
impl_veilid_component!(StorageManager);
|
||||
|
||||
impl StorageManager {
|
||||
fn new_unlocked_inner(
|
||||
event_bus: EventBus,
|
||||
config: VeilidConfig,
|
||||
crypto: Crypto,
|
||||
table_store: TableStore,
|
||||
#[cfg(feature = "unstable-blockstore")] block_store: BlockStore,
|
||||
) -> StorageManagerUnlockedInner {
|
||||
fn new_unlocked_inner(crypto: Crypto) -> StorageManagerUnlockedInner {
|
||||
// Generate keys to use for anonymous watches
|
||||
let mut anonymous_watch_keys = TypedKeyPairGroup::new();
|
||||
for ck in VALID_CRYPTO_KINDS {
|
||||
@ -85,12 +94,6 @@ impl StorageManager {
|
||||
}
|
||||
|
||||
StorageManagerUnlockedInner {
|
||||
_event_bus: event_bus,
|
||||
config,
|
||||
crypto,
|
||||
table_store,
|
||||
#[cfg(feature = "unstable-blockstore")]
|
||||
block_store,
|
||||
flush_record_stores_task: TickTask::new(
|
||||
"flush_record_stores_task",
|
||||
FLUSH_RECORD_STORES_INTERVAL_SECS,
|
||||
@ -119,22 +122,11 @@ impl StorageManager {
|
||||
StorageManagerInner::new(unlocked_inner)
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
event_bus: EventBus,
|
||||
config: VeilidConfig,
|
||||
crypto: Crypto,
|
||||
table_store: TableStore,
|
||||
#[cfg(feature = "unstable-blockstore")] block_store: BlockStore,
|
||||
) -> StorageManager {
|
||||
let unlocked_inner = Arc::new(Self::new_unlocked_inner(
|
||||
event_bus,
|
||||
config,
|
||||
crypto,
|
||||
table_store,
|
||||
#[cfg(feature = "unstable-blockstore")]
|
||||
block_store,
|
||||
));
|
||||
pub fn new(registry: VeilidComponentRegistry) -> StorageManager {
|
||||
let crypto = registry.lookup::<Crypto>().unwrap();
|
||||
let unlocked_inner = Arc::new(Self::new_unlocked_inner(crypto));
|
||||
let this = StorageManager {
|
||||
registry,
|
||||
unlocked_inner: unlocked_inner.clone(),
|
||||
inner: Arc::new(AsyncMutex::new(Self::new_inner(unlocked_inner))),
|
||||
};
|
||||
@ -145,17 +137,17 @@ impl StorageManager {
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all, err)]
|
||||
pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> {
|
||||
async fn init_async(&self) -> EyreResult<()> {
|
||||
log_stor!(debug "startup storage manager");
|
||||
|
||||
let mut inner = self.inner.lock().await;
|
||||
inner.init(self.clone(), update_callback).await?;
|
||||
inner.init(self.clone()).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
pub async fn terminate(&self) {
|
||||
async fn terminate_async(&self) {
|
||||
log_stor!(debug "starting storage manager shutdown");
|
||||
|
||||
// Stop the background ticker process
|
||||
@ -177,16 +169,6 @@ impl StorageManager {
|
||||
log_stor!(debug "finished storage manager shutdown");
|
||||
}
|
||||
|
||||
pub async fn set_rpc_processor(&self, opt_rpc_processor: Option<RPCProcessor>) {
|
||||
let mut inner = self.inner.lock().await;
|
||||
inner.opt_rpc_processor = opt_rpc_processor
|
||||
}
|
||||
|
||||
pub async fn set_routing_table(&self, opt_routing_table: Option<RoutingTable>) {
|
||||
let mut inner = self.inner.lock().await;
|
||||
inner.opt_routing_table = opt_routing_table
|
||||
}
|
||||
|
||||
async fn lock(&self) -> VeilidAPIResult<AsyncMutexGuardArc<StorageManagerInner>> {
|
||||
let inner = asyncmutex_lock_arc!(&self.inner);
|
||||
if !inner.initialized {
|
||||
@ -518,7 +500,7 @@ impl StorageManager {
|
||||
let mut inner = self.lock().await?;
|
||||
|
||||
// Get cryptosystem
|
||||
let Some(vcrypto) = self.unlocked_inner.crypto.get(key.kind) else {
|
||||
let Some(vcrypto) = self.crypto().get(key.kind) else {
|
||||
apibail_generic!("unsupported cryptosystem");
|
||||
};
|
||||
|
||||
@ -738,13 +720,12 @@ impl StorageManager {
|
||||
opened_record.clear_active_watch();
|
||||
|
||||
// Get the minimum expiration timestamp we will accept
|
||||
let (rpc_timeout_us, max_watch_expiration_us) = {
|
||||
let c = self.unlocked_inner.config.get();
|
||||
let (rpc_timeout_us, max_watch_expiration_us) = self.config().with(|c| {
|
||||
(
|
||||
TimestampDuration::from(ms_to_us(c.network.rpc.timeout_ms)),
|
||||
TimestampDuration::from(ms_to_us(c.network.dht.max_watch_expiration_ms)),
|
||||
)
|
||||
};
|
||||
});
|
||||
let cur_ts = get_timestamp();
|
||||
let min_expiration_ts = cur_ts + rpc_timeout_us.as_u64();
|
||||
let max_expiration_ts = if expiration.as_u64() == 0 {
|
||||
@ -1027,8 +1008,9 @@ impl StorageManager {
|
||||
match fanout_result.kind {
|
||||
FanoutResultKind::Partial => false,
|
||||
FanoutResultKind::Timeout => {
|
||||
let get_consensus =
|
||||
self.unlocked_inner.config.get().network.dht.get_value_count as usize;
|
||||
let get_consensus = self
|
||||
.config()
|
||||
.with(|c| c.network.dht.get_value_count as usize);
|
||||
let value_node_count = fanout_result.value_nodes.len();
|
||||
if value_node_count < get_consensus {
|
||||
log_stor!(debug "timeout with insufficient consensus ({}<{}), adding offline subkey: {}:{}",
|
||||
@ -1043,8 +1025,9 @@ impl StorageManager {
|
||||
}
|
||||
}
|
||||
FanoutResultKind::Exhausted => {
|
||||
let get_consensus =
|
||||
self.unlocked_inner.config.get().network.dht.get_value_count as usize;
|
||||
let get_consensus = self
|
||||
.config()
|
||||
.with(|c| c.network.dht.get_value_count as usize);
|
||||
let value_node_count = fanout_result.value_nodes.len();
|
||||
if value_node_count < get_consensus {
|
||||
log_stor!(debug "exhausted with insufficient consensus ({}<{}), adding offline subkey: {}:{}",
|
||||
|
@ -20,6 +20,7 @@ impl InspectCacheL2 {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct InspectCache {
|
||||
cache: LruCache<TypedKey, InspectCacheL2>,
|
||||
}
|
||||
|
@ -80,6 +80,31 @@ where
|
||||
purge_dead_records_mutex: Arc<AsyncMutex<()>>,
|
||||
}
|
||||
|
||||
impl<D> fmt::Debug for RecordStore<D>
|
||||
where
|
||||
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("RecordStore")
|
||||
//.field("table_store", &self.table_store)
|
||||
.field("name", &self.name)
|
||||
.field("limits", &self.limits)
|
||||
.field("record_table", &self.record_table)
|
||||
.field("subkey_table", &self.subkey_table)
|
||||
.field("record_index", &self.record_index)
|
||||
.field("subkey_cache", &self.subkey_cache)
|
||||
.field("inspect_cache", &self.inspect_cache)
|
||||
.field("subkey_cache_total_size", &self.subkey_cache_total_size)
|
||||
.field("total_storage_space", &self.total_storage_space)
|
||||
.field("dead_records", &self.dead_records)
|
||||
.field("changed_records", &self.changed_records)
|
||||
.field("watched_records", &self.watched_records)
|
||||
.field("changed_watched_values", &self.changed_watched_values)
|
||||
.field("purge_dead_records_mutex", &self.purge_dead_records_mutex)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// The result of the do_get_value_operation
|
||||
#[derive(Default, Clone, Debug)]
|
||||
pub struct GetResult {
|
||||
|
@ -39,16 +39,16 @@ impl StorageManager {
|
||||
let routing_domain = RoutingDomain::PublicInternet;
|
||||
|
||||
// Get the DHT parameters for 'SetValue'
|
||||
let (key_count, get_consensus_count, set_consensus_count, fanout, timeout_us) = {
|
||||
let c = self.unlocked_inner.config.get();
|
||||
(
|
||||
c.network.dht.max_find_node_count as usize,
|
||||
c.network.dht.get_value_count as usize,
|
||||
c.network.dht.set_value_count as usize,
|
||||
c.network.dht.set_value_fanout as usize,
|
||||
TimestampDuration::from(ms_to_us(c.network.dht.set_value_timeout_ms)),
|
||||
)
|
||||
};
|
||||
let (key_count, get_consensus_count, set_consensus_count, fanout, timeout_us) =
|
||||
self.config().with(|c| {
|
||||
(
|
||||
c.network.dht.max_find_node_count as usize,
|
||||
c.network.dht.get_value_count as usize,
|
||||
c.network.dht.set_value_count as usize,
|
||||
c.network.dht.set_value_fanout as usize,
|
||||
TimestampDuration::from(ms_to_us(c.network.dht.set_value_timeout_ms)),
|
||||
)
|
||||
});
|
||||
|
||||
// Get the nodes we know are caching this value to seed the fanout
|
||||
let init_fanout_queue = {
|
||||
|
@ -26,10 +26,6 @@ pub(super) struct StorageManagerInner {
|
||||
pub offline_subkey_writes: HashMap<TypedKey, OfflineSubkeyWrite>,
|
||||
/// Storage manager metadata that is persistent, including copy of offline subkey writes
|
||||
pub metadata_db: Option<TableDB>,
|
||||
/// RPC processor if it is available
|
||||
pub opt_rpc_processor: Option<RPCProcessor>,
|
||||
/// Routing table if it is available
|
||||
pub opt_routing_table: Option<RoutingTable>,
|
||||
/// Background processing task (not part of attachment manager tick tree so it happens when detached too)
|
||||
pub tick_future: Option<SendPinBoxFuture<()>>,
|
||||
/// Update callback to send ValueChanged notification to
|
||||
@ -41,6 +37,24 @@ pub(super) struct StorageManagerInner {
|
||||
set_consensus_count: usize,
|
||||
}
|
||||
|
||||
impl fmt::Debug for StorageManagerInner {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("StorageManagerInner")
|
||||
// .field("unlocked_inner", &self.unlocked_inner)
|
||||
.field("initialized", &self.initialized)
|
||||
.field("opened_records", &self.opened_records)
|
||||
.field("local_record_store", &self.local_record_store)
|
||||
.field("remote_record_store", &self.remote_record_store)
|
||||
.field("offline_subkey_writes", &self.offline_subkey_writes)
|
||||
//.field("metadata_db", &self.metadata_db)
|
||||
//.field("tick_future", &self.tick_future)
|
||||
//.field("update_callback", &self.update_callback)
|
||||
.field("deferred_result_processor", &self.deferred_result_processor)
|
||||
.field("set_consensus_count", &self.set_consensus_count)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
|
||||
let c = config.get();
|
||||
RecordStoreLimits {
|
||||
@ -89,7 +103,7 @@ impl StorageManagerInner {
|
||||
offline_subkey_writes: Default::default(),
|
||||
metadata_db: Default::default(),
|
||||
opt_rpc_processor: Default::default(),
|
||||
opt_routing_table: Default::default(),
|
||||
//opt_routing_table: Default::default(),
|
||||
tick_future: Default::default(),
|
||||
update_callback: None,
|
||||
deferred_result_processor: DeferredStreamProcessor::default(),
|
||||
|
@ -213,14 +213,13 @@ impl StorageManager {
|
||||
let routing_domain = RoutingDomain::PublicInternet;
|
||||
|
||||
// Get the DHT parameters for 'WatchValue', some of which are the same for 'SetValue' operations
|
||||
let (key_count, timeout_us, set_value_count) = {
|
||||
let c = self.unlocked_inner.config.get();
|
||||
let (key_count, timeout_us, set_value_count) = self.config().with(|c| {
|
||||
(
|
||||
c.network.dht.max_find_node_count as usize,
|
||||
TimestampDuration::from(ms_to_us(c.network.dht.set_value_timeout_ms)),
|
||||
c.network.dht.set_value_count as usize,
|
||||
)
|
||||
};
|
||||
});
|
||||
|
||||
// Get the appropriate watcher key, if anonymous use a static anonymous watch key
|
||||
// which lives for the duration of the app's runtime
|
||||
|
@ -70,21 +70,41 @@ struct TableStoreInner {
|
||||
encryption_key: Option<TypedSharedSecret>,
|
||||
all_table_names: HashMap<String, String>,
|
||||
all_tables_db: Option<Database>,
|
||||
crypto: Option<Crypto>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for TableStoreInner {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("TableStoreInner")
|
||||
.field("opened", &self.opened)
|
||||
.field("encryption_key", &self.encryption_key)
|
||||
.field("all_table_names", &self.all_table_names)
|
||||
//.field("all_tables_db", &self.all_tables_db)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Veilid Table Storage.
|
||||
/// Database for storing key value pairs persistently and securely across runs.
|
||||
#[derive(Clone)]
|
||||
pub struct TableStore {
|
||||
_event_bus: EventBus,
|
||||
config: VeilidConfig,
|
||||
protected_store: ProtectedStore,
|
||||
table_store_driver: TableStoreDriver,
|
||||
registry: VeilidComponentRegistry,
|
||||
inner: Arc<Mutex<TableStoreInner>>, // Sync mutex here because TableDB drops can happen at any time
|
||||
async_lock: Arc<AsyncMutex<()>>, // Async mutex for operations
|
||||
table_store_driver: TableStoreDriver,
|
||||
async_lock: Arc<AsyncMutex<()>>, // Async mutex for operations
|
||||
}
|
||||
|
||||
impl fmt::Debug for TableStore {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("TableStore")
|
||||
.field("registry", &self.registry)
|
||||
.field("inner", &self.inner)
|
||||
//.field("table_store_driver", &self.table_store_driver)
|
||||
.field("async_lock", &self.async_lock)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl_veilid_component!(TableStore);
|
||||
|
||||
impl TableStore {
|
||||
fn new_inner() -> TableStoreInner {
|
||||
TableStoreInner {
|
||||
@ -92,32 +112,20 @@ impl TableStore {
|
||||
encryption_key: None,
|
||||
all_table_names: HashMap::new(),
|
||||
all_tables_db: None,
|
||||
crypto: None,
|
||||
}
|
||||
}
|
||||
pub(crate) fn new(
|
||||
event_bus: EventBus,
|
||||
config: VeilidConfig,
|
||||
protected_store: ProtectedStore,
|
||||
) -> Self {
|
||||
pub(crate) fn new(registry: VeilidComponentRegistry) -> Self {
|
||||
let inner = Self::new_inner();
|
||||
let table_store_driver = TableStoreDriver::new(config.clone());
|
||||
let table_store_driver = TableStoreDriver::new(registry.config());
|
||||
|
||||
Self {
|
||||
_event_bus: event_bus,
|
||||
config,
|
||||
protected_store,
|
||||
registry,
|
||||
inner: Arc::new(Mutex::new(inner)),
|
||||
table_store_driver,
|
||||
async_lock: Arc::new(AsyncMutex::new(())),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_crypto(&self, crypto: Crypto) {
|
||||
let mut inner = self.inner.lock();
|
||||
inner.crypto = Some(crypto);
|
||||
}
|
||||
|
||||
// Flush internal control state (must not use crypto)
|
||||
async fn flush(&self) {
|
||||
let (all_table_names_value, all_tables_db) = {
|
||||
@ -142,8 +150,7 @@ impl TableStore {
|
||||
{
|
||||
apibail_invalid_argument!("table name is invalid", "table", table);
|
||||
}
|
||||
let c = self.config.get();
|
||||
let namespace = c.namespace.clone();
|
||||
let namespace = self.config().with(|c| c.namespace.clone());
|
||||
Ok(if namespace.is_empty() {
|
||||
table.to_string()
|
||||
} else {
|
||||
@ -260,8 +267,7 @@ impl TableStore {
|
||||
|
||||
// Get cryptosystem
|
||||
let kind = FourCC::try_from(&dek_bytes[0..4]).unwrap();
|
||||
let crypto = self.inner.lock().crypto.as_ref().unwrap().clone();
|
||||
let Some(vcrypto) = crypto.get(kind) else {
|
||||
let Some(vcrypto) = self.crypto().get(kind) else {
|
||||
bail!("unsupported cryptosystem '{kind}'");
|
||||
};
|
||||
|
||||
@ -316,8 +322,7 @@ impl TableStore {
|
||||
}
|
||||
|
||||
// Get cryptosystem
|
||||
let crypto = self.inner.lock().crypto.as_ref().unwrap().clone();
|
||||
let Some(vcrypto) = crypto.get(dek.kind) else {
|
||||
let Some(vcrypto) = self.crypto().get(dek.kind) else {
|
||||
bail!("unsupported cryptosystem '{}'", dek.kind);
|
||||
};
|
||||
|
||||
@ -340,7 +345,7 @@ impl TableStore {
|
||||
#[instrument(level = "trace", target = "tstore", skip_all)]
|
||||
async fn load_device_encryption_key(&self) -> EyreResult<Option<TypedSharedSecret>> {
|
||||
let dek_bytes: Option<Vec<u8>> = self
|
||||
.protected_store
|
||||
.protected_store()
|
||||
.load_user_secret("device_encryption_key")
|
||||
.await?;
|
||||
let Some(dek_bytes) = dek_bytes else {
|
||||
@ -349,10 +354,9 @@ impl TableStore {
|
||||
};
|
||||
|
||||
// Get device encryption key protection password if we have it
|
||||
let device_encryption_key_password = {
|
||||
let c = self.config.get();
|
||||
c.protected_store.device_encryption_key_password.clone()
|
||||
};
|
||||
let device_encryption_key_password = self
|
||||
.config()
|
||||
.with(|c| c.protected_store.device_encryption_key_password.clone());
|
||||
|
||||
Ok(Some(self.maybe_unprotect_device_encryption_key(
|
||||
&dek_bytes,
|
||||
@ -368,7 +372,7 @@ impl TableStore {
|
||||
let Some(device_encryption_key) = device_encryption_key else {
|
||||
// Remove the device encryption key
|
||||
let existed = self
|
||||
.protected_store
|
||||
.protected_store()
|
||||
.remove_user_secret("device_encryption_key")
|
||||
.await?;
|
||||
log_tstore!(debug "removed device encryption key. existed: {}", existed);
|
||||
@ -377,15 +381,15 @@ impl TableStore {
|
||||
|
||||
// Get new device encryption key protection password if we are changing it
|
||||
let new_device_encryption_key_password = {
|
||||
let c = self.config.get();
|
||||
c.protected_store.new_device_encryption_key_password.clone()
|
||||
self.config()
|
||||
.with(|c| c.protected_store.new_device_encryption_key_password.clone())
|
||||
};
|
||||
let device_encryption_key_password =
|
||||
if let Some(new_device_encryption_key_password) = new_device_encryption_key_password {
|
||||
// Change password
|
||||
log_tstore!(debug "changing dek password");
|
||||
self.config
|
||||
.with_mut(|c| {
|
||||
self.config()
|
||||
.try_with_mut(|c| {
|
||||
c.protected_store
|
||||
.device_encryption_key_password
|
||||
.clone_from(&new_device_encryption_key_password);
|
||||
@ -395,8 +399,8 @@ impl TableStore {
|
||||
} else {
|
||||
// Get device encryption key protection password if we have it
|
||||
log_tstore!(debug "saving with existing dek password");
|
||||
let c = self.config.get();
|
||||
c.protected_store.device_encryption_key_password.clone()
|
||||
self.config()
|
||||
.with(|c| c.protected_store.device_encryption_key_password.clone())
|
||||
};
|
||||
|
||||
let dek_bytes = self.maybe_protect_device_encryption_key(
|
||||
@ -406,7 +410,7 @@ impl TableStore {
|
||||
|
||||
// Save the new device encryption key
|
||||
let existed = self
|
||||
.protected_store
|
||||
.protected_store()
|
||||
.save_user_secret("device_encryption_key", &dek_bytes)
|
||||
.await?;
|
||||
log_tstore!(debug "saving device encryption key. existed: {}", existed);
|
||||
@ -414,7 +418,7 @@ impl TableStore {
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", target = "tstore", skip_all)]
|
||||
pub(crate) async fn init(&self) -> EyreResult<()> {
|
||||
async fn init_async(&self) -> EyreResult<()> {
|
||||
let _async_guard = self.async_lock.lock().await;
|
||||
|
||||
// Get device encryption key from protected store
|
||||
@ -437,12 +441,11 @@ impl TableStore {
|
||||
}
|
||||
|
||||
// Check for password change
|
||||
let changing_password = self
|
||||
.config
|
||||
.get()
|
||||
.protected_store
|
||||
.new_device_encryption_key_password
|
||||
.is_some();
|
||||
let changing_password = self.config().with(|c| {
|
||||
c.protected_store
|
||||
.new_device_encryption_key_password
|
||||
.is_some()
|
||||
});
|
||||
|
||||
// Save encryption key if it has changed or if the protecting password wants to change
|
||||
if device_encryption_key_changed || changing_password {
|
||||
@ -481,10 +484,7 @@ impl TableStore {
|
||||
inner.all_tables_db = Some(all_tables_db);
|
||||
}
|
||||
|
||||
let do_delete = {
|
||||
let c = self.config.get();
|
||||
c.table_store.delete
|
||||
};
|
||||
let do_delete = self.config().with(|c| c.table_store.delete);
|
||||
|
||||
if do_delete {
|
||||
self.delete_all().await;
|
||||
@ -494,7 +494,7 @@ impl TableStore {
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", target = "tstore", skip_all)]
|
||||
pub(crate) async fn terminate(&self) {
|
||||
async fn terminate_async(&self) {
|
||||
let _async_guard = self.async_lock.lock().await;
|
||||
|
||||
self.flush().await;
|
||||
@ -599,8 +599,7 @@ impl TableStore {
|
||||
let mut inner = self.inner.lock();
|
||||
let table_db = TableDB::new(
|
||||
table_name.clone(),
|
||||
self.clone(),
|
||||
inner.crypto.as_ref().unwrap().clone(),
|
||||
self.registry(),
|
||||
db,
|
||||
inner.encryption_key,
|
||||
inner.encryption_key,
|
||||
|
@ -11,20 +11,17 @@ cfg_if! {
|
||||
}
|
||||
|
||||
struct CryptInfo {
|
||||
vcrypto: CryptoSystemVersion,
|
||||
key: SharedSecret,
|
||||
typed_key: TypedSharedSecret,
|
||||
}
|
||||
impl CryptInfo {
|
||||
pub fn new(crypto: Crypto, typed_key: TypedSharedSecret) -> Self {
|
||||
let vcrypto = crypto.get(typed_key.kind).unwrap();
|
||||
let key = typed_key.value;
|
||||
Self { vcrypto, key }
|
||||
pub fn new(typed_key: TypedSharedSecret) -> Self {
|
||||
Self { typed_key }
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TableDBUnlockedInner {
|
||||
table: String,
|
||||
table_store: TableStore,
|
||||
registry: VeilidComponentRegistry,
|
||||
database: Database,
|
||||
// Encryption and decryption key will be the same unless configured for an in-place migration
|
||||
encrypt_info: Option<CryptInfo>,
|
||||
@ -39,7 +36,8 @@ impl fmt::Debug for TableDBUnlockedInner {
|
||||
|
||||
impl Drop for TableDBUnlockedInner {
|
||||
fn drop(&mut self) {
|
||||
self.table_store.on_table_db_drop(self.table.clone());
|
||||
let table_store = self.registry.lookup::<TableStore>().unwrap();
|
||||
table_store.on_table_db_drop(self.table.clone());
|
||||
}
|
||||
}
|
||||
|
||||
@ -52,15 +50,14 @@ pub struct TableDB {
|
||||
impl TableDB {
|
||||
pub(super) fn new(
|
||||
table: String,
|
||||
table_store: TableStore,
|
||||
crypto: Crypto,
|
||||
registry: VeilidComponentRegistry,
|
||||
database: Database,
|
||||
encryption_key: Option<TypedSharedSecret>,
|
||||
decryption_key: Option<TypedSharedSecret>,
|
||||
opened_column_count: u32,
|
||||
) -> Self {
|
||||
let encrypt_info = encryption_key.map(|ek| CryptInfo::new(crypto.clone(), ek));
|
||||
let decrypt_info = decryption_key.map(|dk| CryptInfo::new(crypto.clone(), dk));
|
||||
let encrypt_info = encryption_key.map(CryptInfo::new);
|
||||
let decrypt_info = decryption_key.map(CryptInfo::new);
|
||||
|
||||
let total_columns = database.num_columns().unwrap();
|
||||
|
||||
@ -72,7 +69,7 @@ impl TableDB {
|
||||
},
|
||||
unlocked_inner: Arc::new(TableDBUnlockedInner {
|
||||
table,
|
||||
table_store,
|
||||
registry,
|
||||
database,
|
||||
encrypt_info,
|
||||
decrypt_info,
|
||||
@ -102,6 +99,9 @@ impl TableDB {
|
||||
Arc::downgrade(&self.unlocked_inner)
|
||||
}
|
||||
|
||||
pub(super) fn crypto(&self) -> VeilidComponentGuard<'_, Crypto> {
|
||||
self.unlocked_inner.registry.lookup::<Crypto>().unwrap()
|
||||
}
|
||||
/// Get the internal name of the table
|
||||
pub fn table_name(&self) -> String {
|
||||
self.unlocked_inner.table.clone()
|
||||
@ -131,14 +131,16 @@ impl TableDB {
|
||||
fn maybe_encrypt(&self, data: &[u8], keyed_nonce: bool) -> Vec<u8> {
|
||||
let data = compress_prepend_size(data);
|
||||
if let Some(ei) = &self.unlocked_inner.encrypt_info {
|
||||
let crypto = self.crypto();
|
||||
let vcrypto = crypto.get(ei.typed_key.kind).unwrap();
|
||||
let mut out = unsafe { unaligned_u8_vec_uninit(NONCE_LENGTH + data.len()) };
|
||||
|
||||
if keyed_nonce {
|
||||
// Key content nonce
|
||||
let mut noncedata = Vec::with_capacity(data.len() + PUBLIC_KEY_LENGTH);
|
||||
noncedata.extend_from_slice(&data);
|
||||
noncedata.extend_from_slice(&ei.key.bytes);
|
||||
let noncehash = ei.vcrypto.generate_hash(&noncedata);
|
||||
noncedata.extend_from_slice(&ei.typed_key.value.bytes);
|
||||
let noncehash = vcrypto.generate_hash(&noncedata);
|
||||
out[0..NONCE_LENGTH].copy_from_slice(&noncehash[0..NONCE_LENGTH])
|
||||
} else {
|
||||
// Random nonce
|
||||
@ -146,11 +148,11 @@ impl TableDB {
|
||||
}
|
||||
|
||||
let (nonce, encout) = out.split_at_mut(NONCE_LENGTH);
|
||||
ei.vcrypto.crypt_b2b_no_auth(
|
||||
vcrypto.crypt_b2b_no_auth(
|
||||
&data,
|
||||
encout,
|
||||
(nonce as &[u8]).try_into().unwrap(),
|
||||
&ei.key,
|
||||
&ei.typed_key.value,
|
||||
);
|
||||
out
|
||||
} else {
|
||||
@ -162,6 +164,8 @@ impl TableDB {
|
||||
#[instrument(level = "trace", target = "tstore", skip_all)]
|
||||
fn maybe_decrypt(&self, data: &[u8]) -> std::io::Result<Vec<u8>> {
|
||||
if let Some(di) = &self.unlocked_inner.decrypt_info {
|
||||
let crypto = self.crypto();
|
||||
let vcrypto = crypto.get(di.typed_key.kind).unwrap();
|
||||
assert!(data.len() >= NONCE_LENGTH);
|
||||
if data.len() == NONCE_LENGTH {
|
||||
return Ok(Vec::new());
|
||||
@ -169,11 +173,11 @@ impl TableDB {
|
||||
|
||||
let mut out = unsafe { unaligned_u8_vec_uninit(data.len() - NONCE_LENGTH) };
|
||||
|
||||
di.vcrypto.crypt_b2b_no_auth(
|
||||
vcrypto.crypt_b2b_no_auth(
|
||||
&data[NONCE_LENGTH..],
|
||||
&mut out,
|
||||
(&data[0..NONCE_LENGTH]).try_into().unwrap(),
|
||||
&di.key,
|
||||
&di.typed_key.value,
|
||||
);
|
||||
decompress_size_prepended(&out, None)
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))
|
||||
|
@ -15,7 +15,7 @@ async fn shutdown(api: VeilidAPI) {
|
||||
trace!("test_table_store: finished");
|
||||
}
|
||||
|
||||
pub async fn test_delete_open_delete(ts: TableStore) {
|
||||
pub async fn test_delete_open_delete(ts: &TableStore) {
|
||||
trace!("test_delete_open_delete");
|
||||
|
||||
let _ = ts.delete("test").await;
|
||||
@ -47,7 +47,7 @@ pub async fn test_delete_open_delete(ts: TableStore) {
|
||||
);
|
||||
}
|
||||
|
||||
pub async fn test_store_delete_load(ts: TableStore) {
|
||||
pub async fn test_store_delete_load(ts: &TableStore) {
|
||||
trace!("test_store_delete_load");
|
||||
|
||||
let _ = ts.delete("test").await;
|
||||
@ -132,7 +132,7 @@ pub async fn test_store_delete_load(ts: TableStore) {
|
||||
assert_eq!(db.load(2, b"baz").await.unwrap(), Some(b"QWERTY".to_vec()));
|
||||
}
|
||||
|
||||
pub async fn test_transaction(ts: TableStore) {
|
||||
pub async fn test_transaction(ts: &TableStore) {
|
||||
trace!("test_transaction");
|
||||
|
||||
let _ = ts.delete("test").await;
|
||||
@ -162,7 +162,7 @@ pub async fn test_transaction(ts: TableStore) {
|
||||
assert_eq!(db.load(0, b"ddd").await, Ok(None));
|
||||
}
|
||||
|
||||
pub async fn test_json(vcrypto: CryptoSystemVersion, ts: TableStore) {
|
||||
pub async fn test_json(vcrypto: &CryptoSystemGuard<'_>, ts: &TableStore) {
|
||||
trace!("test_json");
|
||||
|
||||
let _ = ts.delete("test").await;
|
||||
@ -200,7 +200,7 @@ pub async fn test_json(vcrypto: CryptoSystemVersion, ts: TableStore) {
|
||||
);
|
||||
}
|
||||
|
||||
pub async fn test_protect_unprotect(vcrypto: CryptoSystemVersion, ts: TableStore) {
|
||||
pub async fn test_protect_unprotect(vcrypto: &CryptoSystemGuard<'_>, ts: &TableStore) {
|
||||
trace!("test_protect_unprotect");
|
||||
|
||||
let dek1 = TypedSharedSecret::new(
|
||||
@ -267,11 +267,11 @@ pub async fn test_all() {
|
||||
|
||||
for ck in VALID_CRYPTO_KINDS {
|
||||
let vcrypto = crypto.get(ck).unwrap();
|
||||
test_protect_unprotect(vcrypto.clone(), ts.clone()).await;
|
||||
test_delete_open_delete(ts.clone()).await;
|
||||
test_store_delete_load(ts.clone()).await;
|
||||
test_transaction(ts.clone()).await;
|
||||
test_json(vcrypto, ts.clone()).await;
|
||||
test_protect_unprotect(&vcrypto, &ts).await;
|
||||
test_delete_open_delete(&ts).await;
|
||||
test_store_delete_load(&ts).await;
|
||||
test_transaction(&ts).await;
|
||||
test_json(&vcrypto, &ts).await;
|
||||
let _ = ts.delete("test").await;
|
||||
}
|
||||
|
||||
|
@ -284,6 +284,11 @@ pub fn config_callback(key: String) -> ConfigCallbackReturn {
|
||||
"network.protocol.wss.url" => Ok(Box::new(Option::<String>::None)),
|
||||
#[cfg(feature = "geolocation")]
|
||||
"network.privacy.country_code_denylist" => Ok(Box::new(Vec::<CountryCode>::new())),
|
||||
#[cfg(feature = "virtual-network")]
|
||||
"network.virtual_network.enabled" => Ok(Box::new(false)),
|
||||
#[cfg(feature = "virtual-network")]
|
||||
"network.virtual_network.server_address" => Ok(Box::new("".to_owned())),
|
||||
|
||||
_ => {
|
||||
let err = format!("config key '{}' doesn't exist", key);
|
||||
debug!("{}", err);
|
||||
@ -293,26 +298,20 @@ pub fn config_callback(key: String) -> ConfigCallbackReturn {
|
||||
}
|
||||
|
||||
pub fn get_config() -> VeilidConfig {
|
||||
let mut vc = VeilidConfig::new();
|
||||
match vc.setup(Arc::new(config_callback), Arc::new(update_callback)) {
|
||||
Ok(()) => (),
|
||||
Err(e) => {
|
||||
error!("Error: {}", e);
|
||||
unreachable!();
|
||||
}
|
||||
};
|
||||
let vc =
|
||||
match VeilidConfig::new_from_callback(Arc::new(config_callback), Arc::new(update_callback))
|
||||
{
|
||||
Ok(vc) => vc,
|
||||
Err(e) => {
|
||||
error!("Error: {}", e);
|
||||
unreachable!();
|
||||
}
|
||||
};
|
||||
vc
|
||||
}
|
||||
|
||||
pub async fn test_config() {
|
||||
let mut vc = VeilidConfig::new();
|
||||
match vc.setup(Arc::new(config_callback), Arc::new(update_callback)) {
|
||||
Ok(()) => (),
|
||||
Err(e) => {
|
||||
error!("Error: {}", e);
|
||||
unreachable!();
|
||||
}
|
||||
}
|
||||
let vc = get_config();
|
||||
|
||||
let inner = vc.get();
|
||||
assert_eq!(inner.program_name, String::from("VeilidCoreTests"));
|
||||
@ -424,6 +423,10 @@ pub async fn test_config() {
|
||||
|
||||
#[cfg(feature = "geolocation")]
|
||||
assert_eq!(inner.network.privacy.country_code_denylist, Vec::new());
|
||||
#[cfg(feature = "virtual-network")]
|
||||
assert!(!inner.network.virtual_network.enabled);
|
||||
#[cfg(feature = "virtual-network")]
|
||||
assert_eq!(inner.network.virtual_network.server_address, "");
|
||||
}
|
||||
|
||||
pub async fn test_all() {
|
||||
|
@ -23,7 +23,7 @@ impl Drop for VeilidAPIInner {
|
||||
|
||||
/// The primary developer entrypoint into `veilid-core` functionality.
|
||||
///
|
||||
/// From [VeilidAPI] one can access:
|
||||
/// From [VeilidAPI] one can access various components:
|
||||
///
|
||||
/// * [VeilidConfig] - The Veilid configuration specified at startup time.
|
||||
/// * [Crypto] - The available set of cryptosystems provided by Veilid.
|
||||
@ -42,7 +42,7 @@ pub struct VeilidAPI {
|
||||
impl VeilidAPI {
|
||||
#[instrument(target = "veilid_api", level = "debug", skip_all)]
|
||||
pub(crate) fn new(context: VeilidCoreContext) -> Self {
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
"VeilidAPI::new()");
|
||||
Self {
|
||||
inner: Arc::new(Mutex::new(VeilidAPIInner {
|
||||
@ -60,7 +60,7 @@ impl VeilidAPI {
|
||||
/// Shut down Veilid and terminate the API.
|
||||
#[instrument(target = "veilid_api", level = "debug", skip_all)]
|
||||
pub async fn shutdown(self) {
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
"VeilidAPI::shutdown()");
|
||||
let context = { self.inner.lock().context.take() };
|
||||
if let Some(context) = context {
|
||||
@ -79,83 +79,90 @@ impl VeilidAPI {
|
||||
/// Access the configuration that Veilid was initialized with.
|
||||
pub fn config(&self) -> VeilidAPIResult<VeilidConfig> {
|
||||
let inner = self.inner.lock();
|
||||
if let Some(context) = &inner.context {
|
||||
return Ok(context.config.clone());
|
||||
}
|
||||
Err(VeilidAPIError::NotInitialized)
|
||||
let Some(context) = &inner.context else {
|
||||
return Err(VeilidAPIError::NotInitialized);
|
||||
};
|
||||
Ok(context.registry().config())
|
||||
}
|
||||
|
||||
/// Get the cryptosystem manager.
|
||||
pub fn crypto(&self) -> VeilidAPIResult<Crypto> {
|
||||
/// Get the cryptosystem component.
|
||||
pub fn crypto(&self) -> VeilidAPIResult<VeilidComponentGuard<'_, Crypto>> {
|
||||
let inner = self.inner.lock();
|
||||
if let Some(context) = &inner.context {
|
||||
return Ok(context.crypto.clone());
|
||||
}
|
||||
Err(VeilidAPIError::NotInitialized)
|
||||
let Some(context) = &inner.context else {
|
||||
return Err(VeilidAPIError::NotInitialized);
|
||||
};
|
||||
context
|
||||
.registry()
|
||||
.lookup::<Crypto>()
|
||||
.ok_or(VeilidAPIError::NotInitialized)
|
||||
}
|
||||
|
||||
/// Get the TableStore manager.
|
||||
pub fn table_store(&self) -> VeilidAPIResult<TableStore> {
|
||||
/// Get the TableStore component.
|
||||
pub fn table_store(&self) -> VeilidAPIResult<VeilidComponentGuard<'_, TableStore>> {
|
||||
let inner = self.inner.lock();
|
||||
if let Some(context) = &inner.context {
|
||||
return Ok(context.table_store.clone());
|
||||
}
|
||||
Err(VeilidAPIError::NotInitialized)
|
||||
let Some(context) = &inner.context else {
|
||||
return Err(VeilidAPIError::NotInitialized);
|
||||
};
|
||||
context
|
||||
.registry()
|
||||
.lookup::<TableStore>()
|
||||
.ok_or(VeilidAPIError::NotInitialized)
|
||||
}
|
||||
|
||||
/// Get the ProtectedStore manager.
|
||||
pub fn protected_store(&self) -> VeilidAPIResult<ProtectedStore> {
|
||||
/// Get the ProtectedStore component.
|
||||
pub fn protected_store(&self) -> VeilidAPIResult<VeilidComponentGuard<'_, ProtectedStore>> {
|
||||
let inner = self.inner.lock();
|
||||
if let Some(context) = &inner.context {
|
||||
return Ok(context.protected_store.clone());
|
||||
}
|
||||
Err(VeilidAPIError::NotInitialized)
|
||||
let Some(context) = &inner.context else {
|
||||
return Err(VeilidAPIError::NotInitialized);
|
||||
};
|
||||
context
|
||||
.registry()
|
||||
.lookup::<ProtectedStore>()
|
||||
.ok_or(VeilidAPIError::NotInitialized)
|
||||
}
|
||||
|
||||
/// Get the BlockStore component.
|
||||
#[cfg(feature = "unstable-blockstore")]
|
||||
pub fn block_store(&self) -> VeilidAPIResult<VeilidComponentGuard<'_, BlockStore>> {
|
||||
let inner = self.inner.lock();
|
||||
let Some(context) = &inner.context else {
|
||||
return Err(VeilidAPIError::NotInitialized);
|
||||
};
|
||||
context
|
||||
.registry()
|
||||
.lookup::<BlockStore>()
|
||||
.ok_or(VeilidAPIError::NotInitialized)
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////
|
||||
// Internal Accessors
|
||||
pub(crate) fn attachment_manager(&self) -> VeilidAPIResult<AttachmentManager> {
|
||||
pub(crate) fn attachment_manager(
|
||||
&self,
|
||||
) -> VeilidAPIResult<VeilidComponentGuard<'_, AttachmentManager>> {
|
||||
let inner = self.inner.lock();
|
||||
if let Some(context) = &inner.context {
|
||||
return Ok(context.attachment_manager.clone());
|
||||
}
|
||||
Err(VeilidAPIError::NotInitialized)
|
||||
let Some(context) = &inner.context else {
|
||||
return Err(VeilidAPIError::NotInitialized);
|
||||
};
|
||||
context
|
||||
.registry()
|
||||
.lookup::<AttachmentManager>()
|
||||
.ok_or(VeilidAPIError::NotInitialized)
|
||||
}
|
||||
pub(crate) fn network_manager(&self) -> VeilidAPIResult<NetworkManager> {
|
||||
let inner = self.inner.lock();
|
||||
if let Some(context) = &inner.context {
|
||||
return Ok(context.attachment_manager.network_manager());
|
||||
}
|
||||
Err(VeilidAPIError::NotInitialized)
|
||||
self.attachment_manager().map(|a| a.network_manager())
|
||||
}
|
||||
pub(crate) fn rpc_processor(&self) -> VeilidAPIResult<RPCProcessor> {
|
||||
let inner = self.inner.lock();
|
||||
if let Some(context) = &inner.context {
|
||||
return Ok(context.attachment_manager.network_manager().rpc_processor());
|
||||
}
|
||||
Err(VeilidAPIError::NotInitialized)
|
||||
self.network_manager()
|
||||
.map(|a| a.opt_rpc_processor())?
|
||||
.ok_or(VeilidAPIError::NotInitialized)
|
||||
}
|
||||
pub(crate) fn routing_table(&self) -> VeilidAPIResult<RoutingTable> {
|
||||
let inner = self.inner.lock();
|
||||
if let Some(context) = &inner.context {
|
||||
return Ok(context.attachment_manager.network_manager().routing_table());
|
||||
}
|
||||
Err(VeilidAPIError::NotInitialized)
|
||||
self.attachment_manager()
|
||||
.map(|a| a.network_manager().routing_table())
|
||||
}
|
||||
pub(crate) fn storage_manager(&self) -> VeilidAPIResult<StorageManager> {
|
||||
let inner = self.inner.lock();
|
||||
if let Some(context) = &inner.context {
|
||||
return Ok(context.storage_manager.clone());
|
||||
}
|
||||
Err(VeilidAPIError::NotInitialized)
|
||||
}
|
||||
#[cfg(feature = "unstable-blockstore")]
|
||||
pub(crate) fn block_store(&self) -> VeilidAPIResult<BlockStore> {
|
||||
let inner = self.inner.lock();
|
||||
if let Some(context) = &inner.context {
|
||||
return Ok(context.block_store.clone());
|
||||
}
|
||||
Err(VeilidAPIError::NotInitialized)
|
||||
self.attachment_manager()
|
||||
.map(|a| a.network_manager().storage_manager())
|
||||
}
|
||||
|
||||
pub(crate) fn with_debug_cache<R, F: FnOnce(&mut DebugCache) -> R>(&self, callback: F) -> R {
|
||||
@ -186,7 +193,7 @@ impl VeilidAPI {
|
||||
/// Connect to the network.
|
||||
#[instrument(target = "veilid_api", level = "debug", skip_all, ret, err)]
|
||||
pub async fn attach(&self) -> VeilidAPIResult<()> {
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
"VeilidAPI::attach()");
|
||||
|
||||
let attachment_manager = self.attachment_manager()?;
|
||||
@ -199,7 +206,7 @@ impl VeilidAPI {
|
||||
/// Disconnect from the network.
|
||||
#[instrument(target = "veilid_api", level = "debug", skip_all, ret, err)]
|
||||
pub async fn detach(&self) -> VeilidAPIResult<()> {
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
"VeilidAPI::detach()");
|
||||
|
||||
let attachment_manager = self.attachment_manager()?;
|
||||
@ -215,7 +222,7 @@ impl VeilidAPI {
|
||||
/// Get a new `RoutingContext` object to use to send messages over the Veilid network with default safety, sequencing, and stability parameters.
|
||||
#[instrument(target = "veilid_api", level = "debug", skip_all, err, ret)]
|
||||
pub fn routing_context(&self) -> VeilidAPIResult<RoutingContext> {
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
"VeilidAPI::routing_context()");
|
||||
|
||||
RoutingContext::try_new(self.clone())
|
||||
@ -232,7 +239,7 @@ impl VeilidAPI {
|
||||
pub fn parse_as_target<S: ToString>(&self, s: S) -> VeilidAPIResult<Target> {
|
||||
let s = s.to_string();
|
||||
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
"VeilidAPI::parse_as_target(s: {:?})", s);
|
||||
|
||||
// Is this a route id?
|
||||
@ -289,7 +296,7 @@ impl VeilidAPI {
|
||||
stability: Stability,
|
||||
sequencing: Sequencing,
|
||||
) -> VeilidAPIResult<(RouteId, Vec<u8>)> {
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
"VeilidAPI::new_custom_private_route(crypto_kinds: {:?}, stability: {:?}, sequencing: {:?})",
|
||||
crypto_kinds,
|
||||
stability,
|
||||
@ -347,7 +354,7 @@ impl VeilidAPI {
|
||||
/// Returns a route id that can be used to send private messages to the node creating this route.
|
||||
#[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)]
|
||||
pub fn import_remote_private_route(&self, blob: Vec<u8>) -> VeilidAPIResult<RouteId> {
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
"VeilidAPI::import_remote_private_route(blob: {:?})", blob);
|
||||
let rss = self.routing_table()?.route_spec_store();
|
||||
rss.import_remote_private_route_blob(blob)
|
||||
@ -359,7 +366,7 @@ impl VeilidAPI {
|
||||
/// or received from.
|
||||
#[instrument(target = "veilid_api", level = "debug", skip(self), ret, err)]
|
||||
pub fn release_private_route(&self, route_id: RouteId) -> VeilidAPIResult<()> {
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
"VeilidAPI::release_private_route(route_id: {:?})", route_id);
|
||||
let rss = self.routing_table()?.route_spec_store();
|
||||
if !rss.release_route(route_id) {
|
||||
@ -381,7 +388,7 @@ impl VeilidAPI {
|
||||
call_id: OperationId,
|
||||
message: Vec<u8>,
|
||||
) -> VeilidAPIResult<()> {
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
event!(target: "veilid_api", Level::DEBUG,
|
||||
"VeilidAPI::app_call_reply(call_id: {:?}, message: {:?})", call_id, message);
|
||||
|
||||
let rpc_processor = self.rpc_processor()?;
|
||||
|
@ -229,7 +229,9 @@ fn get_keypair(text: &str) -> Option<KeyPair> {
|
||||
KeyPair::from_str(text).ok()
|
||||
}
|
||||
|
||||
fn get_crypto_system_version(crypto: Crypto) -> impl FnOnce(&str) -> Option<CryptoSystemVersion> {
|
||||
fn get_crypto_system_version(
|
||||
crypto: &Crypto,
|
||||
) -> impl FnOnce(&str) -> Option<CryptoSystemGuard<'_>> {
|
||||
move |text| {
|
||||
let kindstr = get_string(text)?;
|
||||
let kind = CryptoKind::from_str(&kindstr).ok()?;
|
||||
|
@ -858,7 +858,7 @@ impl VeilidConfigInner {
|
||||
/// The configuration built for each Veilid node during API startup
|
||||
#[derive(Clone)]
|
||||
pub struct VeilidConfig {
|
||||
update_cb: Option<UpdateCallback>,
|
||||
update_cb: UpdateCallback,
|
||||
inner: Arc<RwLock<VeilidConfigInner>>,
|
||||
}
|
||||
|
||||
@ -871,164 +871,144 @@ impl fmt::Debug for VeilidConfig {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for VeilidConfig {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
impl VeilidConfig {
|
||||
fn new_inner() -> VeilidConfigInner {
|
||||
VeilidConfigInner::default()
|
||||
}
|
||||
|
||||
pub(crate) fn new() -> Self {
|
||||
pub(crate) fn new_from_config(config: VeilidConfigInner, update_cb: UpdateCallback) -> Self {
|
||||
Self {
|
||||
update_cb: None,
|
||||
inner: Arc::new(RwLock::new(Self::new_inner())),
|
||||
update_cb,
|
||||
inner: Arc::new(RwLock::new(config)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn setup_from_config(
|
||||
&mut self,
|
||||
config: VeilidConfigInner,
|
||||
update_cb: UpdateCallback,
|
||||
) -> VeilidAPIResult<()> {
|
||||
self.update_cb = Some(update_cb);
|
||||
|
||||
self.with_mut(|inner| {
|
||||
*inner = config;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn setup(
|
||||
&mut self,
|
||||
pub(crate) fn new_from_callback(
|
||||
cb: ConfigCallback,
|
||||
update_cb: UpdateCallback,
|
||||
) -> VeilidAPIResult<()> {
|
||||
self.update_cb = Some(update_cb);
|
||||
self.with_mut(|inner| {
|
||||
// Simple config transformation
|
||||
macro_rules! get_config {
|
||||
($key:expr) => {
|
||||
let keyname = &stringify!($key)[6..];
|
||||
let v = cb(keyname.to_owned())?;
|
||||
$key = match v.downcast() {
|
||||
Ok(v) => *v,
|
||||
Err(e) => {
|
||||
apibail_generic!(format!(
|
||||
"incorrect type for key {}: {:?}",
|
||||
keyname,
|
||||
type_name_of_val(&*e)
|
||||
))
|
||||
}
|
||||
};
|
||||
};
|
||||
}
|
||||
) -> VeilidAPIResult<Self> {
|
||||
let mut inner = VeilidConfigInner::default();
|
||||
|
||||
get_config!(inner.program_name);
|
||||
get_config!(inner.namespace);
|
||||
get_config!(inner.capabilities.disable);
|
||||
get_config!(inner.table_store.directory);
|
||||
get_config!(inner.table_store.delete);
|
||||
get_config!(inner.block_store.directory);
|
||||
get_config!(inner.block_store.delete);
|
||||
get_config!(inner.protected_store.allow_insecure_fallback);
|
||||
get_config!(inner.protected_store.always_use_insecure_storage);
|
||||
get_config!(inner.protected_store.directory);
|
||||
get_config!(inner.protected_store.delete);
|
||||
get_config!(inner.protected_store.device_encryption_key_password);
|
||||
get_config!(inner.protected_store.new_device_encryption_key_password);
|
||||
get_config!(inner.network.connection_initial_timeout_ms);
|
||||
get_config!(inner.network.connection_inactivity_timeout_ms);
|
||||
get_config!(inner.network.max_connections_per_ip4);
|
||||
get_config!(inner.network.max_connections_per_ip6_prefix);
|
||||
get_config!(inner.network.max_connections_per_ip6_prefix_size);
|
||||
get_config!(inner.network.max_connection_frequency_per_min);
|
||||
get_config!(inner.network.client_allowlist_timeout_ms);
|
||||
get_config!(inner.network.reverse_connection_receipt_time_ms);
|
||||
get_config!(inner.network.hole_punch_receipt_time_ms);
|
||||
get_config!(inner.network.network_key_password);
|
||||
get_config!(inner.network.routing_table.node_id);
|
||||
get_config!(inner.network.routing_table.node_id_secret);
|
||||
get_config!(inner.network.routing_table.bootstrap);
|
||||
get_config!(inner.network.routing_table.limit_over_attached);
|
||||
get_config!(inner.network.routing_table.limit_fully_attached);
|
||||
get_config!(inner.network.routing_table.limit_attached_strong);
|
||||
get_config!(inner.network.routing_table.limit_attached_good);
|
||||
get_config!(inner.network.routing_table.limit_attached_weak);
|
||||
get_config!(inner.network.dht.max_find_node_count);
|
||||
get_config!(inner.network.dht.resolve_node_timeout_ms);
|
||||
get_config!(inner.network.dht.resolve_node_count);
|
||||
get_config!(inner.network.dht.resolve_node_fanout);
|
||||
get_config!(inner.network.dht.get_value_timeout_ms);
|
||||
get_config!(inner.network.dht.get_value_count);
|
||||
get_config!(inner.network.dht.get_value_fanout);
|
||||
get_config!(inner.network.dht.set_value_timeout_ms);
|
||||
get_config!(inner.network.dht.set_value_count);
|
||||
get_config!(inner.network.dht.set_value_fanout);
|
||||
get_config!(inner.network.dht.min_peer_count);
|
||||
get_config!(inner.network.dht.min_peer_refresh_time_ms);
|
||||
get_config!(inner.network.dht.validate_dial_info_receipt_time_ms);
|
||||
get_config!(inner.network.dht.local_subkey_cache_size);
|
||||
get_config!(inner.network.dht.local_max_subkey_cache_memory_mb);
|
||||
get_config!(inner.network.dht.remote_subkey_cache_size);
|
||||
get_config!(inner.network.dht.remote_max_records);
|
||||
get_config!(inner.network.dht.remote_max_subkey_cache_memory_mb);
|
||||
get_config!(inner.network.dht.remote_max_storage_space_mb);
|
||||
get_config!(inner.network.dht.public_watch_limit);
|
||||
get_config!(inner.network.dht.member_watch_limit);
|
||||
get_config!(inner.network.dht.max_watch_expiration_ms);
|
||||
get_config!(inner.network.rpc.concurrency);
|
||||
get_config!(inner.network.rpc.queue_size);
|
||||
get_config!(inner.network.rpc.max_timestamp_behind_ms);
|
||||
get_config!(inner.network.rpc.max_timestamp_ahead_ms);
|
||||
get_config!(inner.network.rpc.timeout_ms);
|
||||
get_config!(inner.network.rpc.max_route_hop_count);
|
||||
get_config!(inner.network.rpc.default_route_hop_count);
|
||||
get_config!(inner.network.upnp);
|
||||
get_config!(inner.network.detect_address_changes);
|
||||
get_config!(inner.network.restricted_nat_retries);
|
||||
get_config!(inner.network.tls.certificate_path);
|
||||
get_config!(inner.network.tls.private_key_path);
|
||||
get_config!(inner.network.tls.connection_initial_timeout_ms);
|
||||
get_config!(inner.network.application.https.enabled);
|
||||
get_config!(inner.network.application.https.listen_address);
|
||||
get_config!(inner.network.application.https.path);
|
||||
get_config!(inner.network.application.https.url);
|
||||
get_config!(inner.network.application.http.enabled);
|
||||
get_config!(inner.network.application.http.listen_address);
|
||||
get_config!(inner.network.application.http.path);
|
||||
get_config!(inner.network.application.http.url);
|
||||
get_config!(inner.network.protocol.udp.enabled);
|
||||
get_config!(inner.network.protocol.udp.socket_pool_size);
|
||||
get_config!(inner.network.protocol.udp.listen_address);
|
||||
get_config!(inner.network.protocol.udp.public_address);
|
||||
get_config!(inner.network.protocol.tcp.connect);
|
||||
get_config!(inner.network.protocol.tcp.listen);
|
||||
get_config!(inner.network.protocol.tcp.max_connections);
|
||||
get_config!(inner.network.protocol.tcp.listen_address);
|
||||
get_config!(inner.network.protocol.tcp.public_address);
|
||||
get_config!(inner.network.protocol.ws.connect);
|
||||
get_config!(inner.network.protocol.ws.listen);
|
||||
get_config!(inner.network.protocol.ws.max_connections);
|
||||
get_config!(inner.network.protocol.ws.listen_address);
|
||||
get_config!(inner.network.protocol.ws.path);
|
||||
get_config!(inner.network.protocol.ws.url);
|
||||
get_config!(inner.network.protocol.wss.connect);
|
||||
get_config!(inner.network.protocol.wss.listen);
|
||||
get_config!(inner.network.protocol.wss.max_connections);
|
||||
get_config!(inner.network.protocol.wss.listen_address);
|
||||
get_config!(inner.network.protocol.wss.path);
|
||||
get_config!(inner.network.protocol.wss.url);
|
||||
#[cfg(feature = "geolocation")]
|
||||
get_config!(inner.network.privacy.country_code_denylist);
|
||||
#[cfg(feature = "virtual-network")]
|
||||
{
|
||||
get_config!(inner.network.virtual_network.enabled);
|
||||
get_config!(inner.network.virtual_network.server_address);
|
||||
}
|
||||
Ok(())
|
||||
// Simple config transformation
|
||||
macro_rules! get_config {
|
||||
($key:expr) => {
|
||||
let keyname = &stringify!($key)[6..];
|
||||
let v = cb(keyname.to_owned())?;
|
||||
$key = match v.downcast() {
|
||||
Ok(v) => *v,
|
||||
Err(e) => {
|
||||
apibail_generic!(format!(
|
||||
"incorrect type for key {}: {:?}",
|
||||
keyname,
|
||||
type_name_of_val(&*e)
|
||||
))
|
||||
}
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
get_config!(inner.program_name);
|
||||
get_config!(inner.namespace);
|
||||
get_config!(inner.capabilities.disable);
|
||||
get_config!(inner.table_store.directory);
|
||||
get_config!(inner.table_store.delete);
|
||||
get_config!(inner.block_store.directory);
|
||||
get_config!(inner.block_store.delete);
|
||||
get_config!(inner.protected_store.allow_insecure_fallback);
|
||||
get_config!(inner.protected_store.always_use_insecure_storage);
|
||||
get_config!(inner.protected_store.directory);
|
||||
get_config!(inner.protected_store.delete);
|
||||
get_config!(inner.protected_store.device_encryption_key_password);
|
||||
get_config!(inner.protected_store.new_device_encryption_key_password);
|
||||
get_config!(inner.network.connection_initial_timeout_ms);
|
||||
get_config!(inner.network.connection_inactivity_timeout_ms);
|
||||
get_config!(inner.network.max_connections_per_ip4);
|
||||
get_config!(inner.network.max_connections_per_ip6_prefix);
|
||||
get_config!(inner.network.max_connections_per_ip6_prefix_size);
|
||||
get_config!(inner.network.max_connection_frequency_per_min);
|
||||
get_config!(inner.network.client_allowlist_timeout_ms);
|
||||
get_config!(inner.network.reverse_connection_receipt_time_ms);
|
||||
get_config!(inner.network.hole_punch_receipt_time_ms);
|
||||
get_config!(inner.network.network_key_password);
|
||||
get_config!(inner.network.routing_table.node_id);
|
||||
get_config!(inner.network.routing_table.node_id_secret);
|
||||
get_config!(inner.network.routing_table.bootstrap);
|
||||
get_config!(inner.network.routing_table.limit_over_attached);
|
||||
get_config!(inner.network.routing_table.limit_fully_attached);
|
||||
get_config!(inner.network.routing_table.limit_attached_strong);
|
||||
get_config!(inner.network.routing_table.limit_attached_good);
|
||||
get_config!(inner.network.routing_table.limit_attached_weak);
|
||||
get_config!(inner.network.dht.max_find_node_count);
|
||||
get_config!(inner.network.dht.resolve_node_timeout_ms);
|
||||
get_config!(inner.network.dht.resolve_node_count);
|
||||
get_config!(inner.network.dht.resolve_node_fanout);
|
||||
get_config!(inner.network.dht.get_value_timeout_ms);
|
||||
get_config!(inner.network.dht.get_value_count);
|
||||
get_config!(inner.network.dht.get_value_fanout);
|
||||
get_config!(inner.network.dht.set_value_timeout_ms);
|
||||
get_config!(inner.network.dht.set_value_count);
|
||||
get_config!(inner.network.dht.set_value_fanout);
|
||||
get_config!(inner.network.dht.min_peer_count);
|
||||
get_config!(inner.network.dht.min_peer_refresh_time_ms);
|
||||
get_config!(inner.network.dht.validate_dial_info_receipt_time_ms);
|
||||
get_config!(inner.network.dht.local_subkey_cache_size);
|
||||
get_config!(inner.network.dht.local_max_subkey_cache_memory_mb);
|
||||
get_config!(inner.network.dht.remote_subkey_cache_size);
|
||||
get_config!(inner.network.dht.remote_max_records);
|
||||
get_config!(inner.network.dht.remote_max_subkey_cache_memory_mb);
|
||||
get_config!(inner.network.dht.remote_max_storage_space_mb);
|
||||
get_config!(inner.network.dht.public_watch_limit);
|
||||
get_config!(inner.network.dht.member_watch_limit);
|
||||
get_config!(inner.network.dht.max_watch_expiration_ms);
|
||||
get_config!(inner.network.rpc.concurrency);
|
||||
get_config!(inner.network.rpc.queue_size);
|
||||
get_config!(inner.network.rpc.max_timestamp_behind_ms);
|
||||
get_config!(inner.network.rpc.max_timestamp_ahead_ms);
|
||||
get_config!(inner.network.rpc.timeout_ms);
|
||||
get_config!(inner.network.rpc.max_route_hop_count);
|
||||
get_config!(inner.network.rpc.default_route_hop_count);
|
||||
get_config!(inner.network.upnp);
|
||||
get_config!(inner.network.detect_address_changes);
|
||||
get_config!(inner.network.restricted_nat_retries);
|
||||
get_config!(inner.network.tls.certificate_path);
|
||||
get_config!(inner.network.tls.private_key_path);
|
||||
get_config!(inner.network.tls.connection_initial_timeout_ms);
|
||||
get_config!(inner.network.application.https.enabled);
|
||||
get_config!(inner.network.application.https.listen_address);
|
||||
get_config!(inner.network.application.https.path);
|
||||
get_config!(inner.network.application.https.url);
|
||||
get_config!(inner.network.application.http.enabled);
|
||||
get_config!(inner.network.application.http.listen_address);
|
||||
get_config!(inner.network.application.http.path);
|
||||
get_config!(inner.network.application.http.url);
|
||||
get_config!(inner.network.protocol.udp.enabled);
|
||||
get_config!(inner.network.protocol.udp.socket_pool_size);
|
||||
get_config!(inner.network.protocol.udp.listen_address);
|
||||
get_config!(inner.network.protocol.udp.public_address);
|
||||
get_config!(inner.network.protocol.tcp.connect);
|
||||
get_config!(inner.network.protocol.tcp.listen);
|
||||
get_config!(inner.network.protocol.tcp.max_connections);
|
||||
get_config!(inner.network.protocol.tcp.listen_address);
|
||||
get_config!(inner.network.protocol.tcp.public_address);
|
||||
get_config!(inner.network.protocol.ws.connect);
|
||||
get_config!(inner.network.protocol.ws.listen);
|
||||
get_config!(inner.network.protocol.ws.max_connections);
|
||||
get_config!(inner.network.protocol.ws.listen_address);
|
||||
get_config!(inner.network.protocol.ws.path);
|
||||
get_config!(inner.network.protocol.ws.url);
|
||||
get_config!(inner.network.protocol.wss.connect);
|
||||
get_config!(inner.network.protocol.wss.listen);
|
||||
get_config!(inner.network.protocol.wss.max_connections);
|
||||
get_config!(inner.network.protocol.wss.listen_address);
|
||||
get_config!(inner.network.protocol.wss.path);
|
||||
get_config!(inner.network.protocol.wss.url);
|
||||
#[cfg(feature = "geolocation")]
|
||||
get_config!(inner.network.privacy.country_code_denylist);
|
||||
#[cfg(feature = "virtual-network")]
|
||||
{
|
||||
get_config!(inner.network.virtual_network.enabled);
|
||||
get_config!(inner.network.virtual_network.server_address);
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
update_cb,
|
||||
inner: Arc::new(RwLock::new(inner)),
|
||||
})
|
||||
}
|
||||
|
||||
@ -1039,6 +1019,10 @@ impl VeilidConfig {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn update_callback(&self) -> UpdateCallback {
|
||||
self.update_cb.clone()
|
||||
}
|
||||
|
||||
pub fn get(&self) -> RwLockReadGuard<VeilidConfigInner> {
|
||||
self.inner.read()
|
||||
}
|
||||
@ -1068,7 +1052,15 @@ impl VeilidConfig {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_mut<F, R>(&self, f: F) -> VeilidAPIResult<R>
|
||||
pub fn with<F, R>(&self, f: F) -> R
|
||||
where
|
||||
F: FnOnce(&VeilidConfigInner) -> R,
|
||||
{
|
||||
let inner = self.inner.read();
|
||||
f(&*inner)
|
||||
}
|
||||
|
||||
pub fn try_with_mut<F, R>(&self, f: F) -> VeilidAPIResult<R>
|
||||
where
|
||||
F: FnOnce(&mut VeilidConfigInner) -> VeilidAPIResult<R>,
|
||||
{
|
||||
@ -1091,12 +1083,10 @@ impl VeilidConfig {
|
||||
};
|
||||
|
||||
// Send configuration update to clients
|
||||
if let Some(update_cb) = &self.update_cb {
|
||||
let safe_cfg = self.safe_config_inner();
|
||||
update_cb(VeilidUpdate::Config(Box::new(VeilidStateConfig {
|
||||
config: safe_cfg,
|
||||
})));
|
||||
}
|
||||
let safe_cfg = self.safe_config_inner();
|
||||
(self.update_cb)(VeilidUpdate::Config(Box::new(VeilidStateConfig {
|
||||
config: safe_cfg,
|
||||
})));
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
@ -1133,7 +1123,7 @@ impl VeilidConfig {
|
||||
}
|
||||
}
|
||||
pub fn set_key_json(&self, key: &str, value: &str) -> VeilidAPIResult<()> {
|
||||
self.with_mut(|c| {
|
||||
self.try_with_mut(|c| {
|
||||
// Split key into path parts
|
||||
let keypath: Vec<&str> = key.split('.').collect();
|
||||
|
||||
@ -1305,124 +1295,6 @@ impl VeilidConfig {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
async fn init_node_id(
|
||||
&self,
|
||||
vcrypto: CryptoSystemVersion,
|
||||
table_store: TableStore,
|
||||
) -> VeilidAPIResult<(TypedKey, TypedSecret)> {
|
||||
let ck = vcrypto.kind();
|
||||
let mut node_id = self.inner.read().network.routing_table.node_id.get(ck);
|
||||
let mut node_id_secret = self
|
||||
.inner
|
||||
.read()
|
||||
.network
|
||||
.routing_table
|
||||
.node_id_secret
|
||||
.get(ck);
|
||||
|
||||
// See if node id was previously stored in the table store
|
||||
let config_table = table_store.open("__veilid_config", 1).await?;
|
||||
|
||||
let table_key_node_id = format!("node_id_{}", ck);
|
||||
let table_key_node_id_secret = format!("node_id_secret_{}", ck);
|
||||
|
||||
if node_id.is_none() {
|
||||
log_tstore!(debug "pulling {} from storage", table_key_node_id);
|
||||
if let Ok(Some(stored_node_id)) = config_table
|
||||
.load_json::<TypedKey>(0, table_key_node_id.as_bytes())
|
||||
.await
|
||||
{
|
||||
log_tstore!(debug "{} found in storage", table_key_node_id);
|
||||
node_id = Some(stored_node_id);
|
||||
} else {
|
||||
log_tstore!(debug "{} not found in storage", table_key_node_id);
|
||||
}
|
||||
}
|
||||
|
||||
// See if node id secret was previously stored in the protected store
|
||||
if node_id_secret.is_none() {
|
||||
log_tstore!(debug "pulling {} from storage", table_key_node_id_secret);
|
||||
if let Ok(Some(stored_node_id_secret)) = config_table
|
||||
.load_json::<TypedSecret>(0, table_key_node_id_secret.as_bytes())
|
||||
.await
|
||||
{
|
||||
log_tstore!(debug "{} found in storage", table_key_node_id_secret);
|
||||
node_id_secret = Some(stored_node_id_secret);
|
||||
} else {
|
||||
log_tstore!(debug "{} not found in storage", table_key_node_id_secret);
|
||||
}
|
||||
}
|
||||
|
||||
// If we have a node id from storage, check it
|
||||
let (node_id, node_id_secret) =
|
||||
if let (Some(node_id), Some(node_id_secret)) = (node_id, node_id_secret) {
|
||||
// Validate node id
|
||||
if !vcrypto.validate_keypair(&node_id.value, &node_id_secret.value) {
|
||||
apibail_generic!(format!(
|
||||
"node_id_secret_{} and node_id_key_{} don't match",
|
||||
ck, ck
|
||||
));
|
||||
}
|
||||
(node_id, node_id_secret)
|
||||
} else {
|
||||
// If we still don't have a valid node id, generate one
|
||||
log_tstore!(debug "generating new node_id_{}", ck);
|
||||
let kp = vcrypto.generate_keypair();
|
||||
(TypedKey::new(ck, kp.key), TypedSecret::new(ck, kp.secret))
|
||||
};
|
||||
info!("Node Id: {}", node_id);
|
||||
|
||||
// Save the node id / secret in storage
|
||||
config_table
|
||||
.store_json(0, table_key_node_id.as_bytes(), &node_id)
|
||||
.await?;
|
||||
config_table
|
||||
.store_json(0, table_key_node_id_secret.as_bytes(), &node_id_secret)
|
||||
.await?;
|
||||
|
||||
Ok((node_id, node_id_secret))
|
||||
}
|
||||
|
||||
/// Get the node id from config if one is specified.
|
||||
/// Must be done -after- protected store startup.
|
||||
#[cfg_attr(test, allow(unused_variables))]
|
||||
pub async fn init_node_ids(
|
||||
&self,
|
||||
crypto: Crypto,
|
||||
table_store: TableStore,
|
||||
) -> VeilidAPIResult<()> {
|
||||
let mut out_node_id = TypedKeyGroup::new();
|
||||
let mut out_node_id_secret = TypedSecretGroup::new();
|
||||
|
||||
for ck in VALID_CRYPTO_KINDS {
|
||||
let vcrypto = crypto
|
||||
.get(ck)
|
||||
.expect("Valid crypto kind is not actually valid.");
|
||||
|
||||
#[cfg(test)]
|
||||
let (node_id, node_id_secret) = {
|
||||
let kp = vcrypto.generate_keypair();
|
||||
(TypedKey::new(ck, kp.key), TypedSecret::new(ck, kp.secret))
|
||||
};
|
||||
#[cfg(not(test))]
|
||||
let (node_id, node_id_secret) = self.init_node_id(vcrypto, table_store.clone()).await?;
|
||||
|
||||
// Save for config
|
||||
out_node_id.add(node_id);
|
||||
out_node_id_secret.add(node_id_secret);
|
||||
}
|
||||
|
||||
// Commit back to config
|
||||
self.with_mut(|c| {
|
||||
c.network.routing_table.node_id = out_node_id;
|
||||
c.network.routing_table.node_id_secret = out_node_id_secret;
|
||||
Ok(())
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the default veilid config as a json object.
|
||||
|
@ -127,7 +127,11 @@ pub use async_lock::RwLock as AsyncRwLock;
|
||||
#[doc(no_inline)]
|
||||
pub use async_lock::RwLockReadGuard as AsyncRwLockReadGuard;
|
||||
#[doc(no_inline)]
|
||||
pub use async_lock::RwLockReadGuardArc as AsyncRwLockReadGuardArc;
|
||||
#[doc(no_inline)]
|
||||
pub use async_lock::RwLockWriteGuard as AsyncRwLockWriteGuard;
|
||||
#[doc(no_inline)]
|
||||
pub use async_lock::RwLockWriteGuardArc as AsyncRwLockWriteGuardArc;
|
||||
|
||||
cfg_if! {
|
||||
if #[cfg(all(target_arch = "wasm32", target_os = "unknown"))] {
|
||||
@ -151,13 +155,6 @@ cfg_if! {
|
||||
#[doc(no_inline)]
|
||||
pub use async_std::sync::MutexGuardArc as AsyncMutexGuardArc;
|
||||
|
||||
// #[doc(no_inline)]
|
||||
// pub use async_std::sync::RwLock as AsyncRwLock;
|
||||
// #[doc(no_inline)]
|
||||
// pub use async_std::sync::RwLockReadGuard as AsyncRwLockReadGuard;
|
||||
// #[doc(no_inline)]
|
||||
// pub use async_std::sync::RwLockWriteGuard as AsyncRwLockWriteGuard;
|
||||
|
||||
#[doc(no_inline)]
|
||||
pub use async_std::task::JoinHandle as LowLevelJoinHandle;
|
||||
|
||||
@ -169,14 +166,6 @@ cfg_if! {
|
||||
#[doc(no_inline)]
|
||||
pub use tokio::sync::OwnedMutexGuard as AsyncMutexGuardArc;
|
||||
|
||||
// #[doc(no_inline)]
|
||||
// pub use tokio::sync::RwLock as AsyncRwLock;
|
||||
// #[doc(no_inline)]
|
||||
// pub use tokio::sync::RwLockReadGuard as AsyncRwLockReadGuard;
|
||||
// #[doc(no_inline)]
|
||||
// pub use tokio::sync::RwLockWriteGuard as AsyncRwLockWriteGuard;
|
||||
|
||||
|
||||
#[doc(no_inline)]
|
||||
pub use tokio::task::JoinHandle as LowLevelJoinHandle;
|
||||
} else {
|
||||
|
@ -47,6 +47,23 @@ impl<'a> Drop for StartupLockEnterGuard<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII-style lock for entry operations on a started-up region of code.
|
||||
#[derive(Debug)]
|
||||
pub struct StartupLockEnterGuardArc {
|
||||
_guard: AsyncRwLockReadGuardArc<bool>,
|
||||
#[cfg(feature = "debug-locks")]
|
||||
id: usize,
|
||||
#[cfg(feature = "debug-locks")]
|
||||
active_guards: Arc<Mutex<HashMap<usize, backtrace::Backtrace>>>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "debug-locks")]
|
||||
impl Drop for StartupLockEnterGuardArc {
|
||||
fn drop(&mut self) {
|
||||
self.active_guards.lock().remove(&self.id);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "debug-locks")]
|
||||
static GUARD_ID: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
@ -59,7 +76,7 @@ static GUARD_ID: AtomicUsize = AtomicUsize::new(0);
|
||||
/// asynchronous shutdown to wait for operations to finish before proceeding.
|
||||
#[derive(Debug)]
|
||||
pub struct StartupLock {
|
||||
startup_state: AsyncRwLock<bool>,
|
||||
startup_state: Arc<AsyncRwLock<bool>>,
|
||||
stop_source: Mutex<Option<StopSource>>,
|
||||
#[cfg(feature = "debug-locks")]
|
||||
active_guards: Arc<Mutex<HashMap<usize, backtrace::Backtrace>>>,
|
||||
@ -68,7 +85,7 @@ pub struct StartupLock {
|
||||
impl StartupLock {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
startup_state: AsyncRwLock::new(false),
|
||||
startup_state: Arc::new(AsyncRwLock::new(false)),
|
||||
stop_source: Mutex::new(None),
|
||||
#[cfg(feature = "debug-locks")]
|
||||
active_guards: Arc::new(Mutex::new(HashMap::new())),
|
||||
@ -168,6 +185,31 @@ impl StartupLock {
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// Enter an operation in a started-up module, using an owned lock.
|
||||
/// If this module has not yet started up or is in the process of startup or shutdown
|
||||
/// this will fail.
|
||||
pub fn enter_arc(&self) -> Result<StartupLockEnterGuardArc, StartupLockNotStartedError> {
|
||||
let guard =
|
||||
asyncrwlock_try_read_arc!(self.startup_state).ok_or(StartupLockNotStartedError)?;
|
||||
if !*guard {
|
||||
return Err(StartupLockNotStartedError);
|
||||
}
|
||||
let out = StartupLockEnterGuardArc {
|
||||
_guard: guard,
|
||||
#[cfg(feature = "debug-locks")]
|
||||
id: GUARD_ID.fetch_add(1, Ordering::AcqRel),
|
||||
#[cfg(feature = "debug-locks")]
|
||||
active_guards: self.active_guards.clone(),
|
||||
};
|
||||
|
||||
#[cfg(feature = "debug-locks")]
|
||||
self.active_guards
|
||||
.lock()
|
||||
.insert(out.id, backtrace::Backtrace::new());
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for StartupLock {
|
||||
|
@ -3,7 +3,7 @@ use super::*;
|
||||
static STRING_TABLE: std::sync::LazyLock<Mutex<BTreeSet<&'static str>>> =
|
||||
std::sync::LazyLock::new(|| Mutex::new(BTreeSet::new()));
|
||||
|
||||
static STRING_TRANSFORM_TABLE: std::sync::LazyLock<Mutex<HashMap<usize, &'static str>>> =
|
||||
static STRING_TRANSFORM_TABLE: std::sync::LazyLock<Mutex<HashMap<(usize, usize), &'static str>>> =
|
||||
std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
|
||||
|
||||
pub trait ToStaticStr {
|
||||
@ -35,7 +35,15 @@ impl StaticStrTransform for &'static str {
|
||||
self,
|
||||
transform: F,
|
||||
) -> &'static str {
|
||||
let key = self.as_ptr() as usize;
|
||||
// multiple keys can point to the same data, but it must be bounded due to static lifetime
|
||||
// a pointer to static memory plus its length must always be the same immutable slice
|
||||
// this is maybe slightly faster for use in log string transformation where speed is essential at scale
|
||||
// otherwise we would have used a hash here.
|
||||
// TODO: if performance does not suffer, consider switching to a hash at a later point, as this could cause
|
||||
// the STRING_TRANSFORM_TABLE to be bigger than necessary, depending on unknowns in rustc about 'static str deduplication.
|
||||
|
||||
let key = (self.as_ptr() as usize, self.len());
|
||||
|
||||
let mut transform_table = STRING_TRANSFORM_TABLE.lock();
|
||||
if let Some(v) = transform_table.get(&key) {
|
||||
return v;
|
||||
|
@ -104,6 +104,19 @@ macro_rules! asyncrwlock_try_write {
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! asyncrwlock_try_read_arc {
|
||||
($x:expr) => {
|
||||
$x.try_read_arc()
|
||||
};
|
||||
}
|
||||
#[macro_export]
|
||||
macro_rules! asyncrwlock_try_write_arc {
|
||||
($x:expr) => {
|
||||
$x.try_write_arc()
|
||||
};
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
pub fn system_boxed<'a, Out>(
|
||||
|
Loading…
x
Reference in New Issue
Block a user