diff --git a/veilid-core/src/component.rs b/veilid-core/src/component.rs index 847c788e..316fbbe8 100644 --- a/veilid-core/src/component.rs +++ b/veilid-core/src/component.rs @@ -12,23 +12,26 @@ impl AsAnyArcSendSync for T { } } -pub trait VeilidComponent: AsAnyArcSendSync + core::fmt::Debug { - fn registry(&self) -> VeilidComponentRegistry; - +pub trait VeilidComponent: + AsAnyArcSendSync + VeilidComponentRegistryAccessor + core::fmt::Debug +{ fn init(&self) -> SendPinBoxFutureLifetime<'_, EyreResult<()>>; fn post_init(&self) -> SendPinBoxFutureLifetime<'_, EyreResult<()>>; fn pre_terminate(&self) -> SendPinBoxFutureLifetime<'_, ()>; fn terminate(&self) -> SendPinBoxFutureLifetime<'_, ()>; +} + +pub trait VeilidComponentRegistryAccessor { + fn registry(&self) -> VeilidComponentRegistry; - // Registry shortcuts fn config(&self) -> VeilidConfig { - self.registry().config() + self.registry().config.clone() } fn update_callback(&self) -> UpdateCallback { - self.registry().update_callback() + self.registry().config.update_callback() } fn event_bus(&self) -> EventBus { - self.registry().event_bus() + self.registry().event_bus.clone() } } @@ -52,6 +55,7 @@ where struct VeilidComponentRegistryInner { type_map: HashMap>, init_order: Vec, + mock: bool, } #[derive(Clone, Debug)] @@ -68,6 +72,7 @@ impl VeilidComponentRegistry { inner: Arc::new(Mutex::new(VeilidComponentRegistryInner { type_map: HashMap::new(), init_order: Vec::new(), + mock: false, })), config, event_bus: EventBus::new(), @@ -75,6 +80,11 @@ impl VeilidComponentRegistry { } } + pub fn enable_mock(&self) { + let mut inner = self.inner.lock(); + inner.mock = true; + } + pub fn register< T: VeilidComponent + Send + Sync + 'static, F: FnOnce(VeilidComponentRegistry) -> T, @@ -191,16 +201,6 @@ impl VeilidComponentRegistry { ////////////////////////////////////////////////////////////// - pub fn config(&self) -> VeilidConfig { - self.config.clone() - } - pub fn update_callback(&self) -> UpdateCallback { - self.config.update_callback() - } - pub fn event_bus(&self) -> EventBus { - self.event_bus.clone() - } - pub fn lookup<'a, T: VeilidComponent + Send + Sync + 'static>( &self, ) -> Option> { @@ -218,13 +218,33 @@ impl VeilidComponentRegistry { } } -macro_rules! impl_veilid_component { - ($component_name:ident) => { - impl VeilidComponent for $component_name { +impl VeilidComponentRegistryAccessor for VeilidComponentRegistry { + fn registry(&self) -> VeilidComponentRegistry { + self.clone() + } +} + +//////////////////////////////////////////////////////////////////// + +macro_rules! impl_veilid_component_registry_accessor { + ($struct_name:ident) => { + impl VeilidComponentRegistryAccessor for $struct_name { fn registry(&self) -> VeilidComponentRegistry { self.registry.clone() } + } + }; +} +pub(crate) use impl_veilid_component_registry_accessor; + +///////////////////////////////////////////////////////////////////// + +macro_rules! impl_veilid_component { + ($component_name:ident) => { + impl_veilid_component_registry_accessor!($component_name); + + impl VeilidComponent for $component_name { fn init(&self) -> SendPinBoxFutureLifetime<'_, EyreResult<()>> { Box::pin(async { self.init_async().await }) } @@ -245,3 +265,42 @@ macro_rules! impl_veilid_component { } pub(crate) use impl_veilid_component; + +///////////////////////////////////////////////////////////////////// + +// Utility macro for setting up a background TickTask +// Should be called during new/construction of a component with background tasks +// and before any post-init 'tick' operations are started +macro_rules! impl_setup_task { + ($this:expr, $this_type:ty, $task_name:ident, $task_routine:ident ) => {{ + let registry = $this.registry(); + $this.$task_name.set_routine(move |s, l, t| { + let registry = registry.clone(); + Box::pin(async move { + let this = registry.lookup::<$this_type>().unwrap(); + this.$task_routine(s, Timestamp::new(l), Timestamp::new(t)) + .await + }) + }); + }}; +} + +pub(crate) use impl_setup_task; + +// Utility macro for setting up an event bus handler +// Should be called after init, during post-init or later +// Subscription should be unsubscribed before termination +macro_rules! impl_subscribe_event_bus { + ($this:expr, $this_type:ty, $event_handler:ident ) => {{ + let registry = $this.registry(); + $this.event_bus().subscribe(move |evt| { + let registry = registry.clone(); + Box::pin(async move { + let this = registry.lookup::<$this_type>().unwrap(); + this.$event_handler(evt); + }) + }) + }}; +} + +pub(crate) use impl_subscribe_event_bus; diff --git a/veilid-core/src/core_context.rs b/veilid-core/src/core_context.rs index ae7b382f..4824ce12 100644 --- a/veilid-core/src/core_context.rs +++ b/veilid-core/src/core_context.rs @@ -1,7 +1,10 @@ -use crate::attachment_manager::*; +use crate::attachment_manager::AttachmentManager; use crate::crypto::Crypto; use crate::logging::*; -use crate::storage_manager::*; +use crate::network_manager::NetworkManager; +use crate::routing_table::RoutingTable; +use crate::rpc_processor::RPCProcessor; +use crate::storage_manager::StorageManager; use crate::veilid_api::*; use crate::veilid_config::*; use crate::*; @@ -70,6 +73,9 @@ impl VeilidCoreContext { #[cfg(feature = "unstable-blockstore")] registry.register(BlockStore::new); registry.register(StorageManager::new); + registry.register(RoutingTable::new); + registry.register(NetworkManager::new); + registry.register(RPCProcessor::new); registry.register(AttachmentManager::new); // Run initialization @@ -83,7 +89,7 @@ impl VeilidCoreContext { // current subsystem, which is not available until after init succeeds if let Err(e) = registry.post_init().await { registry.terminate().await; - return VeilidAPIError::internal(e); + return Err(VeilidAPIError::internal(e)); } info!("Veilid API startup complete"); @@ -129,7 +135,18 @@ impl VeilidCoreContext { ///////////////////////////////////////////////////////////////////////////// -pub trait RegisteredComponents: VeilidComponent { +pub trait RegisteredComponents { + fn protected_store(&self) -> VeilidComponentGuard<'_, ProtectedStore>; + fn crypto(&self) -> VeilidComponentGuard<'_, Crypto>; + fn table_store(&self) -> VeilidComponentGuard<'_, TableStore>; + fn storage_manager(&self) -> VeilidComponentGuard<'_, StorageManager>; + fn routing_table(&self) -> VeilidComponentGuard<'_, RoutingTable>; + fn network_manager(&self) -> VeilidComponentGuard<'_, NetworkManager>; + fn rpc_processor(&self) -> VeilidComponentGuard<'_, RPCProcessor>; + fn attachment_manager(&self) -> VeilidComponentGuard<'_, AttachmentManager>; +} + +impl RegisteredComponents for T { fn protected_store(&self) -> VeilidComponentGuard<'_, ProtectedStore> { self.registry().lookup::().unwrap() } @@ -142,8 +159,19 @@ pub trait RegisteredComponents: VeilidComponent { fn storage_manager(&self) -> VeilidComponentGuard<'_, StorageManager> { self.registry().lookup::().unwrap() } + fn routing_table(&self) -> VeilidComponentGuard<'_, RoutingTable> { + self.registry().lookup::().unwrap() + } + fn network_manager(&self) -> VeilidComponentGuard<'_, NetworkManager> { + self.registry().lookup::().unwrap() + } + fn rpc_processor(&self) -> VeilidComponentGuard<'_, RPCProcessor> { + self.registry().lookup::().unwrap() + } + fn attachment_manager(&self) -> VeilidComponentGuard<'_, AttachmentManager> { + self.registry().lookup::().unwrap() + } } -impl RegisteredComponents for T {} ///////////////////////////////////////////////////////////////////////////// diff --git a/veilid-core/src/intf/native/protected_store.rs b/veilid-core/src/intf/native/protected_store.rs index 35e3e499..a5686dbc 100644 --- a/veilid-core/src/intf/native/protected_store.rs +++ b/veilid-core/src/intf/native/protected_store.rs @@ -107,6 +107,11 @@ impl ProtectedStore { Ok(()) } + async fn post_init_async(&self) -> EyreResult<()> { + Ok(()) + } + async fn pre_terminate_async(&self) {} + #[instrument(level = "debug", skip(self))] async fn terminate_async(&self) { *self.inner.lock() = Self::new_inner(); diff --git a/veilid-core/src/intf/wasm/protected_store.rs b/veilid-core/src/intf/wasm/protected_store.rs index 09864f1a..6aab1f56 100644 --- a/veilid-core/src/intf/wasm/protected_store.rs +++ b/veilid-core/src/intf/wasm/protected_store.rs @@ -28,12 +28,15 @@ impl ProtectedStore { } #[instrument(level = "debug", skip(self), err)] - pub async fn init(&self) -> EyreResult<()> { + pub(crate) async fn init_async(&self) -> EyreResult<()> { Ok(()) } + pub(crate) async fn post_init_async(&self) -> EyreResult<()> {} + pub(crate) async fn pre_terminate_async(&self) {} + #[instrument(level = "debug", skip(self))] - pub async fn terminate(&self) {} + pub(crate) async fn terminate_async(&self) {} fn browser_key_name(&self, key: &str) -> String { let c = self.config(); diff --git a/veilid-core/src/network_manager/address_check.rs b/veilid-core/src/network_manager/address_check.rs index 8a84d832..6b9981e3 100644 --- a/veilid-core/src/network_manager/address_check.rs +++ b/veilid-core/src/network_manager/address_check.rs @@ -23,6 +23,7 @@ pub const ADDRESS_CHECK_CACHE_SIZE: usize = 10; // TimestampDuration::new(3_600_000_000_u64); // 60 minutes /// Address checker config +#[derive(Debug)] pub struct AddressCheckConfig { pub detect_address_changes: bool, pub ip6_prefix_size: usize, @@ -44,6 +45,22 @@ pub struct AddressCheck { address_consistency_table: BTreeMap>, } +impl fmt::Debug for AddressCheck { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AddressCheck") + .field("config", &self.config) + //.field("net", &self.net) + .field("current_network_class", &self.current_network_class) + .field("current_addresses", &self.current_addresses) + .field( + "address_inconsistency_table", + &self.address_inconsistency_table, + ) + .field("address_consistency_table", &self.address_consistency_table) + .finish() + } +} + impl AddressCheck { pub fn new(config: AddressCheckConfig, net: Network) -> Self { Self { diff --git a/veilid-core/src/network_manager/address_filter.rs b/veilid-core/src/network_manager/address_filter.rs index 2f6919c6..f1ee727f 100644 --- a/veilid-core/src/network_manager/address_filter.rs +++ b/veilid-core/src/network_manager/address_filter.rs @@ -39,7 +39,6 @@ struct AddressFilterUnlockedInner { max_connection_frequency_per_min: usize, punishment_duration_min: usize, dial_info_failure_duration_min: usize, - routing_table: RoutingTable, } impl fmt::Debug for AddressFilterUnlockedInner { @@ -69,14 +68,19 @@ impl fmt::Debug for AddressFilterUnlockedInner { #[derive(Clone, Debug)] pub(crate) struct AddressFilter { + registry: VeilidComponentRegistry, unlocked_inner: Arc, inner: Arc>, } +impl_veilid_component_registry_accessor!(AddressFilter); + impl AddressFilter { - pub fn new(config: VeilidConfig, routing_table: RoutingTable) -> Self { + pub fn new(registry: VeilidComponentRegistry) -> Self { + let config = registry.config(); let c = config.get(); Self { + registry, unlocked_inner: Arc::new(AddressFilterUnlockedInner { max_connections_per_ip4: c.network.max_connections_per_ip4 as usize, max_connections_per_ip6_prefix: c.network.max_connections_per_ip6_prefix as usize, @@ -86,7 +90,6 @@ impl AddressFilter { as usize, punishment_duration_min: PUNISHMENT_DURATION_MIN, dial_info_failure_duration_min: DIAL_INFO_FAILURE_DURATION_MIN, - routing_table, }), inner: Arc::new(Mutex::new(AddressFilterInner { conn_count_by_ip4: BTreeMap::new(), @@ -192,7 +195,7 @@ impl AddressFilter { warn!("Forgiving: {}", key); inner.punishments_by_node_id.remove(&key); // make the entry alive again if it's still here - if let Ok(Some(nr)) = self.unlocked_inner.routing_table.lookup_node_ref(key) { + if let Ok(Some(nr)) = self.routing_table().lookup_node_ref(key) { nr.operate_mut(|_rti, e| e.set_punished(None)); } } diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index fe3c4478..60f89983 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -124,24 +124,21 @@ pub enum StartupDisposition { } // The mutable state of the network manager +#[derive(Debug)] struct NetworkManagerInner { stats: NetworkManagerStats, client_allowlist: LruCache, node_contact_method_cache: LruCache, address_check: Option, + peer_info_change_subscription: Option, + socket_address_change_subscription: Option, } -struct NetworkManagerUnlockedInner { - // Handles - event_bus: EventBus, - config: VeilidConfig, - storage_manager: StorageManager, - table_store: TableStore, - #[cfg(feature = "unstable-blockstore")] - block_store: BlockStore, - crypto: Crypto, +pub(crate) struct NetworkManager { + registry: VeilidComponentRegistry, + inner: Arc>, + // Accessors - routing_table: RwLock>, address_filter: RwLock>, components: RwLock>, update_callback: RwLock>, @@ -154,10 +151,22 @@ struct NetworkManagerUnlockedInner { startup_lock: StartupLock, } -#[derive(Clone)] -pub(crate) struct NetworkManager { - inner: Arc>, - unlocked_inner: Arc, +impl_veilid_component!(NetworkManager); + +impl fmt::Debug for NetworkManager { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("NetworkManager") + //.field("registry", &self.registry) + .field("inner", &self.inner) + .field("address_filter", &self.address_filter) + // .field("components", &self.components) + // .field("update_callback", &self.update_callback) + // .field("rolling_transfers_task", &self.rolling_transfers_task) + // .field("address_filter_task", &self.address_filter_task) + .field("network_key", &self.network_key) + .field("startup_lock", &self.startup_lock) + .finish() + } } impl NetworkManager { @@ -167,52 +176,17 @@ impl NetworkManager { client_allowlist: LruCache::new_unbounded(), node_contact_method_cache: LruCache::new(NODE_CONTACT_METHOD_CACHE_SIZE), address_check: None, - } - } - fn new_unlocked_inner( - event_bus: EventBus, - config: VeilidConfig, - storage_manager: StorageManager, - table_store: TableStore, - #[cfg(feature = "unstable-blockstore")] block_store: BlockStore, - crypto: Crypto, - network_key: Option, - ) -> NetworkManagerUnlockedInner { - NetworkManagerUnlockedInner { - event_bus, - config: config.clone(), - storage_manager, - table_store, - #[cfg(feature = "unstable-blockstore")] - block_store, - crypto, - address_filter: RwLock::new(None), - routing_table: RwLock::new(None), - components: RwLock::new(None), - update_callback: RwLock::new(None), - rolling_transfers_task: TickTask::new( - "rolling_transfers_task", - ROLLING_TRANSFERS_INTERVAL_SECS, - ), - address_filter_task: TickTask::new( - "address_filter_task", - ADDRESS_FILTER_TASK_INTERVAL_SECS, - ), - network_key, - startup_lock: StartupLock::new(), + peer_info_change_subscription: None, + socket_address_change_subscription: None, } } - pub fn new( - event_bus: EventBus, - config: VeilidConfig, - storage_manager: StorageManager, - table_store: TableStore, - #[cfg(feature = "unstable-blockstore")] block_store: BlockStore, - crypto: Crypto, - ) -> Self { + pub fn new(registry: VeilidComponentRegistry) -> Self { // Make the network key let network_key = { + let config = registry.config(); + let crypto = registry.crypto(); + let c = config.get(); let network_key_password = c.network.network_key_password.clone(); let network_key = if let Some(network_key_password) = network_key_password { @@ -239,109 +213,49 @@ impl NetworkManager { }; let this = Self { + registry, inner: Arc::new(Mutex::new(Self::new_inner())), - unlocked_inner: Arc::new(Self::new_unlocked_inner( - event_bus, - config, - storage_manager, - table_store, - #[cfg(feature = "unstable-blockstore")] - block_store, - crypto, - network_key, - )), + address_filter: RwLock::new(None), + components: RwLock::new(None), + update_callback: RwLock::new(None), + rolling_transfers_task: TickTask::new( + "rolling_transfers_task", + ROLLING_TRANSFERS_INTERVAL_SECS, + ), + address_filter_task: TickTask::new( + "address_filter_task", + ADDRESS_FILTER_TASK_INTERVAL_SECS, + ), + network_key, + startup_lock: StartupLock::new(), }; this.setup_tasks(); this } - pub fn event_bus(&self) -> EventBus { - self.unlocked_inner.event_bus.clone() - } - pub fn config(&self) -> VeilidConfig { - self.unlocked_inner.config.clone() - } - pub fn with_config(&self, f: F) -> R - where - F: FnOnce(&VeilidConfigInner) -> R, - { - f(&self.unlocked_inner.config.get()) - } - pub fn storage_manager(&self) -> StorageManager { - self.unlocked_inner.storage_manager.clone() - } - pub fn table_store(&self) -> TableStore { - self.unlocked_inner.table_store.clone() - } - #[cfg(feature = "unstable-blockstore")] - pub fn block_store(&self) -> BlockStore { - self.unlocked_inner.block_store.clone() - } - pub fn crypto(&self) -> Crypto { - self.unlocked_inner.crypto.clone() - } + pub fn address_filter(&self) -> AddressFilter { - self.unlocked_inner - .address_filter - .read() - .as_ref() - .unwrap() - .clone() - } - pub fn routing_table(&self) -> RoutingTable { - self.unlocked_inner - .routing_table - .read() - .as_ref() - .unwrap() - .clone() + self.address_filter.read().as_ref().unwrap().clone() } + fn net(&self) -> Network { - self.unlocked_inner - .components - .read() - .as_ref() - .unwrap() - .net - .clone() + self.components.read().as_ref().unwrap().net.clone() } fn opt_net(&self) -> Option { - self.unlocked_inner - .components - .read() - .as_ref() - .map(|x| x.net.clone()) + self.components.read().as_ref().map(|x| x.net.clone()) } fn receipt_manager(&self) -> ReceiptManager { - self.unlocked_inner - .components + self.components .read() .as_ref() .unwrap() .receipt_manager .clone() } - pub fn rpc_processor(&self) -> RPCProcessor { - self.unlocked_inner - .components - .read() - .as_ref() - .unwrap() - .rpc_processor - .clone() - } - pub fn opt_rpc_processor(&self) -> Option { - self.unlocked_inner - .components - .read() - .as_ref() - .map(|x| x.rpc_processor.clone()) - } pub fn connection_manager(&self) -> ConnectionManager { - self.unlocked_inner - .components + self.components .read() .as_ref() .unwrap() @@ -349,58 +263,26 @@ impl NetworkManager { .clone() } pub fn opt_connection_manager(&self) -> Option { - self.unlocked_inner - .components + self.components .read() .as_ref() .map(|x| x.connection_manager.clone()) } - pub fn update_callback(&self) -> UpdateCallback { - self.unlocked_inner - .update_callback - .read() - .as_ref() - .unwrap() - .clone() - } - #[instrument(level = "debug", skip_all, err)] - pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> { - let routing_table = RoutingTable::new(self.clone()); - routing_table.init().await?; - let address_filter = AddressFilter::new(self.config(), routing_table.clone()); - *self.unlocked_inner.routing_table.write() = Some(routing_table.clone()); - *self.unlocked_inner.address_filter.write() = Some(address_filter); - *self.unlocked_inner.update_callback.write() = Some(update_callback); - - // Register event handlers - let this = self.clone(); - self.event_bus().subscribe(move |evt| { - let this = this.clone(); - Box::pin(async move { - this.peer_info_change_event_handler(evt); - }) - }); - let this = self.clone(); - self.event_bus().subscribe(move |evt| { - let this = this.clone(); - Box::pin(async move { - this.socket_address_change_event_handler(evt); - }) - }); + async fn init_async(&self) -> EyreResult<()> { + let address_filter = AddressFilter::new(self.registry()); + *self.address_filter.write() = Some(address_filter); Ok(()) } + async fn post_init_async(&self) -> EyreResult<()> {} + + async fn pre_terminate_async(&self) {} + #[instrument(level = "debug", skip_all)] - pub async fn terminate(&self) { - let routing_table = self.unlocked_inner.routing_table.write().take(); - if let Some(routing_table) = routing_table { - routing_table.terminate().await; - } - *self.unlocked_inner.update_callback.write() = None; - } + async fn terminate_async(&self) {} #[instrument(level = "debug", skip_all, err)] pub async fn internal_startup(&self) -> EyreResult { @@ -456,7 +338,20 @@ impl NetworkManager { ip6_prefix_size, }; let address_check = AddressCheck::new(address_check_config, net.clone()); - self.inner.lock().address_check = Some(address_check); + + // Register event handlers + let peer_info_change_subscription = + impl_subscribe_event_bus!(self, Self, peer_info_change_event_handler); + + let socket_address_change_subscription = + impl_subscribe_event_bus!(self, Self, socket_address_change_event_handler); + + { + let mut inner = self.inner.lock(); + inner.address_check = Some(address_check); + inner.peer_info_change_subscription = Some(peer_info_change_subscription); + inner.socket_address_change_subscription = Some(socket_address_change_subscription); + } rpc_processor.startup().await?; receipt_manager.startup().await?; @@ -495,8 +390,17 @@ impl NetworkManager { // Cancel all tasks self.cancel_tasks().await; - // Shutdown address check - self.inner.lock().address_check = Option::::None; + // Shutdown event bus subscriptions and address check + { + let mut inner = self.inner.lock(); + if let Some(sub) = inner.socket_address_change_subscription.take() { + self.event_bus().unsubscribe(sub); + } + if let Some(sub) = inner.peer_info_change_subscription.take() { + self.event_bus().unsubscribe(sub); + } + inner.address_check = None; + } // Shutdown network components if they started up log_net!(debug "shutting down network components"); diff --git a/veilid-core/src/network_manager/native/discovery_context.rs b/veilid-core/src/network_manager/native/discovery_context.rs index 993d87a7..8810db62 100644 --- a/veilid-core/src/network_manager/native/discovery_context.rs +++ b/veilid-core/src/network_manager/native/discovery_context.rs @@ -44,8 +44,6 @@ struct DiscoveryContextInner { } struct DiscoveryContextUnlockedInner { - routing_table: RoutingTable, - net: Network, config: DiscoveryContextConfig, // per-protocol @@ -54,25 +52,26 @@ struct DiscoveryContextUnlockedInner { #[derive(Clone)] pub(super) struct DiscoveryContext { + registry: VeilidComponentRegistry, unlocked_inner: Arc, inner: Arc>, } +impl VeilidComponentRegistryAccessor for DiscoveryContext { + fn registry(&self) -> VeilidComponentRegistry { + self.registry.clone() + } +} + impl DiscoveryContext { - pub fn new(routing_table: RoutingTable, net: Network, config: DiscoveryContextConfig) -> Self { - let intf_addrs = Self::get_local_addresses( - routing_table.clone(), - config.protocol_type, - config.address_type, - ); + pub fn new(registry: VeilidComponentRegistry, config: DiscoveryContextConfig) -> Self { + let routing_table = registry.routing_table(); + let intf_addrs = + Self::get_local_addresses(&routing_table, config.protocol_type, config.address_type); Self { - unlocked_inner: Arc::new(DiscoveryContextUnlockedInner { - routing_table, - net, - config, - intf_addrs, - }), + registry, + unlocked_inner: Arc::new(DiscoveryContextUnlockedInner { config, intf_addrs }), inner: Arc::new(Mutex::new(DiscoveryContextInner { external_info: Vec::new(), })), @@ -85,7 +84,7 @@ impl DiscoveryContext { // This pulls the already-detected local interface dial info from the routing table #[instrument(level = "trace", skip(routing_table), ret)] fn get_local_addresses( - routing_table: RoutingTable, + routing_table: &RoutingTable, protocol_type: ProtocolType, address_type: AddressType, ) -> Vec { @@ -109,7 +108,7 @@ impl DiscoveryContext { // This is done over the normal port using RPC #[instrument(level = "trace", skip(self), ret)] async fn request_public_address(&self, node_ref: FilteredNodeRef) -> Option { - let rpc = self.unlocked_inner.routing_table.rpc_processor(); + let rpc = self.rpc_processor(); let res = network_result_value_or_log!(match rpc.rpc_call_status(Destination::direct(node_ref.clone())).await { Ok(v) => v, @@ -138,7 +137,7 @@ impl DiscoveryContext { #[instrument(level = "trace", skip(self), ret)] async fn discover_external_addresses(&self) -> bool { let node_count = { - let config = self.unlocked_inner.routing_table.network_manager().config(); + let config = self.registry.config(); let c = config.get(); c.network.dht.max_find_node_count as usize }; @@ -188,10 +187,11 @@ impl DiscoveryContext { ]); // Find public nodes matching this filter - let nodes = self - .unlocked_inner - .routing_table - .find_fast_non_local_nodes_filtered(routing_domain, node_count, filters); + let nodes = self.routing_table().find_fast_non_local_nodes_filtered( + routing_domain, + node_count, + filters, + ); if nodes.is_empty() { log_net!(debug "no external address detection peers of type {:?}:{:?}", diff --git a/veilid-core/src/network_manager/native/tasks/network_interfaces_task.rs b/veilid-core/src/network_manager/native/tasks/network_interfaces_task.rs index b5d05f8d..ebbce3ff 100644 --- a/veilid-core/src/network_manager/native/tasks/network_interfaces_task.rs +++ b/veilid-core/src/network_manager/native/tasks/network_interfaces_task.rs @@ -3,7 +3,7 @@ use super::*; impl Network { #[instrument(level = "trace", target = "net", skip_all, err)] pub(super) async fn network_interfaces_task_routine( - self, + &self, _stop_token: StopToken, _l: Timestamp, _t: Timestamp, diff --git a/veilid-core/src/network_manager/native/tasks/update_network_class_task.rs b/veilid-core/src/network_manager/native/tasks/update_network_class_task.rs index a4c56e3a..465933b5 100644 --- a/veilid-core/src/network_manager/native/tasks/update_network_class_task.rs +++ b/veilid-core/src/network_manager/native/tasks/update_network_class_task.rs @@ -8,7 +8,7 @@ type InboundProtocolMap = HashMap<(AddressType, LowLevelProtocolType, u16), Vec< impl Network { #[instrument(parent = None, level = "trace", skip(self), err)] pub async fn update_network_class_task_routine( - self, + &self, stop_token: StopToken, l: Timestamp, t: Timestamp, @@ -156,7 +156,7 @@ impl Network { port, }; context_configs.insert(dcc); - let discovery_context = DiscoveryContext::new(self.routing_table(), self.clone(), dcc); + let discovery_context = DiscoveryContext::new(self.registry(), dcc); discovery_context.discover(&mut unord).await; } diff --git a/veilid-core/src/network_manager/stats.rs b/veilid-core/src/network_manager/stats.rs index f8fc7a37..8694c4b3 100644 --- a/veilid-core/src/network_manager/stats.rs +++ b/veilid-core/src/network_manager/stats.rs @@ -1,7 +1,7 @@ use super::*; // Statistics per address -#[derive(Clone, Default)] +#[derive(Clone, Debug, Default)] pub struct PerAddressStats { pub last_seen_ts: Timestamp, pub transfer_stats_accounting: TransferStatsAccounting, @@ -18,7 +18,7 @@ impl Default for PerAddressStatsKey { } // Statistics about the low-level network -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct NetworkManagerStats { pub self_stats: PerAddressStats, pub per_address_stats: LruCache, diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index c9a3628b..c2d62495 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -640,7 +640,7 @@ impl BucketEntryInner { only_live: bool, filter: NodeRefFilter, ) -> Vec<(Flow, Timestamp)> { - let opt_connection_manager = rti.unlocked_inner.network_manager.opt_connection_manager(); + let opt_connection_manager = rti.network_manager().opt_connection_manager(); let mut out: Vec<(Flow, Timestamp)> = self .last_flows diff --git a/veilid-core/src/routing_table/debug.rs b/veilid-core/src/routing_table/debug.rs index 2ee4ead5..9bb02b95 100644 --- a/veilid-core/src/routing_table/debug.rs +++ b/veilid-core/src/routing_table/debug.rs @@ -35,7 +35,6 @@ impl RoutingTable { let valid_envelope_versions = VALID_ENVELOPE_VERSIONS.map(|x| x.to_string()).join(","); let node_ids = self - .unlocked_inner .node_ids() .iter() .map(|x| x.to_string()) @@ -57,7 +56,7 @@ impl RoutingTable { pub fn debug_info_nodeid(&self) -> String { let mut out = String::new(); - for nid in self.unlocked_inner.node_ids().iter() { + for nid in self.node_ids().iter() { out += &format!("{}\n", nid); } out @@ -66,7 +65,7 @@ impl RoutingTable { pub fn debug_info_nodeinfo(&self) -> String { let mut out = String::new(); let inner = self.inner.read(); - out += &format!("Node Ids: {}\n", self.unlocked_inner.node_ids()); + out += &format!("Node Ids: {}\n", self.node_ids()); out += &format!( "Self Transfer Stats:\n{}", indent_all_string(&inner.self_transfer_stats) @@ -250,7 +249,7 @@ impl RoutingTable { out += &format!("{:?}: {}: {}\n", routing_domain, crypto_kind, count); } for ck in &VALID_CRYPTO_KINDS { - let our_node_id = self.unlocked_inner.node_id(*ck); + let our_node_id = self.node_id(*ck); let mut filtered_total = 0; let mut b = 0; @@ -319,7 +318,7 @@ impl RoutingTable { ) -> String { let cur_ts = Timestamp::now(); let relay_node_filter = self.make_public_internet_relay_node_filter(); - let our_node_ids = self.unlocked_inner.node_ids(); + let our_node_ids = self.node_ids(); let mut relay_count = 0usize; let mut relaying_count = 0usize; @@ -340,7 +339,7 @@ impl RoutingTable { node_count, filters, |_rti, entry: Option>| { - NodeRef::new(self.clone(), entry.unwrap().clone()) + NodeRef::new(self.registry(), entry.unwrap().clone()) }, ); let mut out = String::new(); diff --git a/veilid-core/src/routing_table/find_peers.rs b/veilid-core/src/routing_table/find_peers.rs index d09799ea..128c35c6 100644 --- a/veilid-core/src/routing_table/find_peers.rs +++ b/veilid-core/src/routing_table/find_peers.rs @@ -42,10 +42,9 @@ impl RoutingTable { ) as RoutingTableEntryFilter; let filters = VecDeque::from([filter]); - let node_count = { - let c = self.config.get(); - c.network.dht.max_find_node_count as usize - }; + let node_count = self + .config() + .with(|c| c.network.dht.max_find_node_count as usize); let closest_nodes = match self.find_preferred_closest_nodes( node_count, @@ -82,11 +81,13 @@ impl RoutingTable { // find N nodes closest to the target node in our routing table // ensure the nodes returned are only the ones closer to the target node than ourself - let Some(vcrypto) = self.crypto().get(crypto_kind) else { + let crypto = self.crypto(); + let Some(vcrypto) = crypto.get(crypto_kind) else { return NetworkResult::invalid_message("unsupported cryptosystem"); }; + let vcrypto = &*vcrypto; + let own_distance = vcrypto.distance(&own_node_id.value, &key.value); - let vcrypto2 = vcrypto.clone(); let filter = Box::new( move |rti: &RoutingTableInner, opt_entry: Option>| { @@ -121,10 +122,9 @@ impl RoutingTable { ) as RoutingTableEntryFilter; let filters = VecDeque::from([filter]); - let node_count = { - let c = self.config.get(); - c.network.dht.max_find_node_count as usize - }; + let node_count = self + .config() + .with(|c| c.network.dht.max_find_node_count as usize); // let closest_nodes = match self.find_preferred_closest_nodes( @@ -147,7 +147,7 @@ impl RoutingTable { // Validate peers returned are, in fact, closer to the key than the node we sent this to // This same test is used on the other side so we vet things here - let valid = match Self::verify_peers_closer(vcrypto2, own_node_id, key, &closest_nodes) { + let valid = match Self::verify_peers_closer(vcrypto, own_node_id, key, &closest_nodes) { Ok(v) => v, Err(e) => { panic!("missing cryptosystem in peers node ids: {}", e); @@ -165,8 +165,8 @@ impl RoutingTable { /// Determine if set of peers is closer to key_near than key_far is to key_near #[instrument(level = "trace", target = "rtab", skip_all, err)] - pub fn verify_peers_closer( - vcrypto: CryptoSystemVersion, + pub fn verify_peers_closer<'a>( + vcrypto: &'a (dyn CryptoSystem + Send + Sync), key_far: TypedKey, key_near: TypedKey, peers: &[Arc], diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index a7c6560a..ce917901 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -91,11 +91,9 @@ pub struct RecentPeersEntry { pub last_connection: Flow, } -pub(crate) struct RoutingTableUnlockedInner { - // Accessors - event_bus: EventBus, - config: VeilidConfig, - network_manager: NetworkManager, +pub(crate) struct RoutingTable { + registry: VeilidComponentRegistry, + inner: Arc>, /// The current node's public DHT keys node_id: TypedKeyGroup, @@ -131,26 +129,155 @@ pub(crate) struct RoutingTableUnlockedInner { private_route_management_task: TickTask, } -impl RoutingTableUnlockedInner { - pub fn network_manager(&self) -> NetworkManager { - self.network_manager.clone() +impl fmt::Debug for RoutingTable { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RoutingTable") + // .field("inner", &self.inner) + // .field("unlocked_inner", &self.unlocked_inner) + .finish() } - pub fn crypto(&self) -> Crypto { - self.network_manager().crypto() +} + +impl_veilid_component!(RoutingTable); + +impl RoutingTable { + pub fn new(registry: VeilidComponentRegistry) -> Self { + let config = registry.config(); + let c = config.get(); + let inner = Arc::new(RwLock::new(RoutingTableInner::new(registry.clone()))); + let this = Self { + registry, + inner, + node_id: c.network.routing_table.node_id.clone(), + node_id_secret: c.network.routing_table.node_id_secret.clone(), + kick_queue: Mutex::new(BTreeSet::default()), + rolling_transfers_task: TickTask::new( + "rolling_transfers_task", + ROLLING_TRANSFERS_INTERVAL_SECS, + ), + update_state_stats_task: TickTask::new( + "update_state_stats_task", + UPDATE_STATE_STATS_INTERVAL_SECS, + ), + rolling_answers_task: TickTask::new( + "rolling_answers_task", + ROLLING_ANSWER_INTERVAL_SECS, + ), + kick_buckets_task: TickTask::new("kick_buckets_task", 1), + bootstrap_task: TickTask::new("bootstrap_task", 1), + peer_minimum_refresh_task: TickTask::new("peer_minimum_refresh_task", 1), + closest_peers_refresh_task: TickTask::new_ms( + "closest_peers_refresh_task", + c.network.dht.min_peer_refresh_time_ms, + ), + ping_validator_public_internet_task: TickTask::new( + "ping_validator_public_internet_task", + 1, + ), + ping_validator_local_network_task: TickTask::new( + "ping_validator_local_network_task", + 1, + ), + ping_validator_public_internet_relay_task: TickTask::new( + "ping_validator_public_internet_relay_task", + 1, + ), + ping_validator_active_watch_task: TickTask::new("ping_validator_active_watch_task", 1), + relay_management_task: TickTask::new( + "relay_management_task", + RELAY_MANAGEMENT_INTERVAL_SECS, + ), + private_route_management_task: TickTask::new( + "private_route_management_task", + PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS, + ), + }; + + this.setup_tasks(); + + this } - pub fn rpc_processor(&self) -> RPCProcessor { - self.network_manager().rpc_processor() + + ///////////////////////////////////// + /// Initialization + + /// Called to initialize the routing table after it is created + async fn init_async(&self) -> EyreResult<()> { + log_rtab!(debug "starting routing table init"); + + // Set up routing buckets + { + let mut inner = self.inner.write(); + inner.init_buckets(); + } + + // Load bucket entries from table db if possible + log_rtab!(debug "loading routing table entries"); + if let Err(e) = self.load_buckets().await { + log_rtab!(debug "Error loading buckets from storage: {:#?}. Resetting.", e); + let mut inner = self.inner.write(); + inner.init_buckets(); + } + + // Set up routespecstore + log_rtab!(debug "starting route spec store init"); + let route_spec_store = match RouteSpecStore::load(self.registry()).await { + Ok(v) => v, + Err(e) => { + log_rtab!(debug "Error loading route spec store: {:#?}. Resetting.", e); + RouteSpecStore::new(self.registry()) + } + }; + log_rtab!(debug "finished route spec store init"); + + { + let mut inner = self.inner.write(); + inner.route_spec_store = Some(route_spec_store); + } + + log_rtab!(debug "finished routing table init"); + Ok(()) } - pub fn update_callback(&self) -> UpdateCallback { - self.network_manager().update_callback() + + async fn post_init_async(&self) -> EyreResult<()> { + Ok(()) } - pub fn with_config(&self, f: F) -> R - where - F: FnOnce(&VeilidConfigInner) -> R, - { - f(&self.config.get()) + + async fn pre_terminate_async(&self) { + // Stop tasks + self.cancel_tasks().await; } + /// Called to shut down the routing table + async fn terminate_async(&self) { + log_rtab!(debug "starting routing table terminate"); + + // Load bucket entries from table db if possible + log_rtab!(debug "saving routing table entries"); + if let Err(e) = self.save_buckets().await { + error!("failed to save routing table entries: {}", e); + } + + log_rtab!(debug "saving route spec store"); + let rss = { + let mut inner = self.inner.write(); + inner.route_spec_store.take() + }; + if let Some(rss) = rss { + if let Err(e) = rss.save().await { + error!("couldn't save route spec store: {}", e); + } + } + log_rtab!(debug "shutting down routing table"); + + let mut inner = self.inner.write(); + *inner = RoutingTableInner::new(self.registry()); + + log_rtab!(debug "finished routing table terminate"); + } + + /////////////////////////////////////////////////////////////////// + pub fn node_id(&self, kind: CryptoKind) -> TypedKey { self.node_id.get(kind).unwrap() } @@ -206,169 +333,6 @@ impl RoutingTableUnlockedInner { .unwrap(), ) } -} - -#[derive(Clone)] -pub(crate) struct RoutingTable { - inner: Arc>, - unlocked_inner: Arc, -} - -impl RoutingTable { - fn new_unlocked_inner( - event_bus: EventBus, - config: VeilidConfig, - network_manager: NetworkManager, - ) -> RoutingTableUnlockedInner { - let c = config.get(); - - RoutingTableUnlockedInner { - event_bus, - config: config.clone(), - network_manager, - node_id: c.network.routing_table.node_id.clone(), - node_id_secret: c.network.routing_table.node_id_secret.clone(), - kick_queue: Mutex::new(BTreeSet::default()), - rolling_transfers_task: TickTask::new( - "rolling_transfers_task", - ROLLING_TRANSFERS_INTERVAL_SECS, - ), - update_state_stats_task: TickTask::new( - "update_state_stats_task", - UPDATE_STATE_STATS_INTERVAL_SECS, - ), - rolling_answers_task: TickTask::new( - "rolling_answers_task", - ROLLING_ANSWER_INTERVAL_SECS, - ), - kick_buckets_task: TickTask::new("kick_buckets_task", 1), - bootstrap_task: TickTask::new("bootstrap_task", 1), - peer_minimum_refresh_task: TickTask::new("peer_minimum_refresh_task", 1), - closest_peers_refresh_task: TickTask::new_ms( - "closest_peers_refresh_task", - c.network.dht.min_peer_refresh_time_ms, - ), - ping_validator_public_internet_task: TickTask::new( - "ping_validator_public_internet_task", - 1, - ), - ping_validator_local_network_task: TickTask::new( - "ping_validator_local_network_task", - 1, - ), - ping_validator_public_internet_relay_task: TickTask::new( - "ping_validator_public_internet_relay_task", - 1, - ), - ping_validator_active_watch_task: TickTask::new("ping_validator_active_watch_task", 1), - relay_management_task: TickTask::new( - "relay_management_task", - RELAY_MANAGEMENT_INTERVAL_SECS, - ), - private_route_management_task: TickTask::new( - "private_route_management_task", - PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS, - ), - } - } - pub fn new(network_manager: NetworkManager) -> Self { - let event_bus = network_manager.event_bus(); - let config = network_manager.config(); - let unlocked_inner = Arc::new(Self::new_unlocked_inner(event_bus, config, network_manager)); - let inner = Arc::new(RwLock::new(RoutingTableInner::new(unlocked_inner.clone()))); - let this = Self { - inner, - unlocked_inner, - }; - - this.setup_tasks(); - - this - } - - ///////////////////////////////////// - /// Initialization - - /// Called to initialize the routing table after it is created - pub async fn init(&self) -> EyreResult<()> { - log_rtab!(debug "starting routing table init"); - - // Set up routing buckets - { - let mut inner = self.inner.write(); - inner.init_buckets(); - } - - // Load bucket entries from table db if possible - log_rtab!(debug "loading routing table entries"); - if let Err(e) = self.load_buckets().await { - log_rtab!(debug "Error loading buckets from storage: {:#?}. Resetting.", e); - let mut inner = self.inner.write(); - inner.init_buckets(); - } - - // Set up routespecstore - log_rtab!(debug "starting route spec store init"); - let route_spec_store = match RouteSpecStore::load(self.clone()).await { - Ok(v) => v, - Err(e) => { - log_rtab!(debug "Error loading route spec store: {:#?}. Resetting.", e); - RouteSpecStore::new(self.clone()) - } - }; - log_rtab!(debug "finished route spec store init"); - - { - let mut inner = self.inner.write(); - inner.route_spec_store = Some(route_spec_store); - } - - // Inform storage manager we are up - self.network_manager - .storage_manager() - .set_routing_table(Some(self.clone())) - .await; - - log_rtab!(debug "finished routing table init"); - Ok(()) - } - - /// Called to shut down the routing table - pub async fn terminate(&self) { - log_rtab!(debug "starting routing table terminate"); - - // Stop storage manager from using us - self.network_manager - .storage_manager() - .set_routing_table(None) - .await; - - // Stop tasks - self.cancel_tasks().await; - - // Load bucket entries from table db if possible - log_rtab!(debug "saving routing table entries"); - if let Err(e) = self.save_buckets().await { - error!("failed to save routing table entries: {}", e); - } - - log_rtab!(debug "saving route spec store"); - let rss = { - let mut inner = self.inner.write(); - inner.route_spec_store.take() - }; - if let Some(rss) = rss { - if let Err(e) = rss.save().await { - error!("couldn't save route spec store: {}", e); - } - } - log_rtab!(debug "shutting down routing table"); - - let mut inner = self.inner.write(); - *inner = RoutingTableInner::new(self.unlocked_inner.clone()); - - log_rtab!(debug "finished routing table terminate"); - } /// Serialize the routing table. fn serialized_buckets(&self) -> (SerializedBucketMap, SerializedBuckets) { @@ -406,7 +370,7 @@ impl RoutingTable { async fn save_buckets(&self) -> EyreResult<()> { let (serialized_bucket_map, all_entry_bytes) = self.serialized_buckets(); - let table_store = self.unlocked_inner.network_manager().table_store(); + let table_store = self.table_store(); let tdb = table_store.open(ROUTING_TABLE, 1).await?; let dbx = tdb.transact(); if let Err(e) = dbx.store_json(0, SERIALIZED_BUCKET_MAP, &serialized_bucket_map) { @@ -420,12 +384,14 @@ impl RoutingTable { dbx.commit().await?; Ok(()) } + /// Deserialize routing table from table store async fn load_buckets(&self) -> EyreResult<()> { // Make a cache validity key of all our node ids and our bootstrap choice let mut cache_validity_key: Vec = Vec::new(); { - let c = self.unlocked_inner.config.get(); + let config = self.config(); + let c = config.get(); for ck in VALID_CRYPTO_KINDS { if let Some(nid) = c.network.routing_table.node_id.get(ck) { cache_validity_key.append(&mut nid.value.bytes.to_vec()); @@ -446,7 +412,7 @@ impl RoutingTable { }; // Deserialize bucket map and all entries from the table store - let table_store = self.unlocked_inner.network_manager().table_store(); + let table_store = self.table_store(); let db = table_store.open(ROUTING_TABLE, 1).await?; let caches_valid = match db.load(0, CACHE_VALIDITY_KEY).await? { @@ -479,14 +445,13 @@ impl RoutingTable { // Reconstruct all entries let inner = &mut *self.inner.write(); - self.populate_routing_table(inner, serialized_bucket_map, all_entry_bytes)?; + Self::populate_routing_table_inner(inner, serialized_bucket_map, all_entry_bytes)?; Ok(()) } /// Write the deserialized table store data to the routing table. - pub fn populate_routing_table( - &self, + pub fn populate_routing_table_inner( inner: &mut RoutingTableInner, serialized_bucket_map: SerializedBucketMap, all_entry_bytes: SerializedBuckets, @@ -542,16 +507,14 @@ impl RoutingTable { self.inner.read().routing_domain_for_address(address) } - pub fn route_spec_store(&self) -> RwLockReadGuard<'_, RouteSpecStore> { - self.inner.read().route_spec_store.as_ref().unwrap().clone() - } - pub fn route_spec_store_mut(&self) -> RwLockReadGuard<'_, RouteSpecStore> { - self.inner - .write() - .route_spec_store - .as_ref() - .unwrap() - .clone() + pub fn route_spec_store(&self) -> Option> { + let inner = self.inner.read(); + if inner.route_spec_store.is_none() { + return None; + } + Some(RwLockReadGuard::map(inner, |x| { + x.route_spec_store.as_ref().unwrap() + })) } pub fn relay_node(&self, domain: RoutingDomain) -> Option { @@ -664,7 +627,7 @@ impl RoutingTable { ) -> Vec { self.inner .read() - .get_nodes_needing_ping(self.clone(), routing_domain, cur_ts) + .get_nodes_needing_ping(self.registry(), routing_domain, cur_ts) } fn queue_bucket_kicks(&self, node_ids: TypedKeyGroup) { @@ -675,21 +638,19 @@ impl RoutingTable { } // Put it in the kick queue - let x = self.unlocked_inner.calculate_bucket_index(node_id); - self.unlocked_inner.kick_queue.lock().insert(x); + let x = self.calculate_bucket_index(node_id); + self.kick_queue.lock().insert(x); } } /// Resolve an existing routing table entry using any crypto kind and return a reference to it pub fn lookup_any_node_ref(&self, node_id_key: PublicKey) -> EyreResult> { - self.inner - .read() - .lookup_any_node_ref(self.clone(), node_id_key) + self.inner.read().lookup_any_node_ref(node_id_key) } /// Resolve an existing routing table entry and return a reference to it pub fn lookup_node_ref(&self, node_id: TypedKey) -> EyreResult> { - self.inner.read().lookup_node_ref(self.clone(), node_id) + self.inner.read().lookup_node_ref(node_id) } /// Resolve an existing routing table entry and return a filtered reference to it @@ -700,12 +661,9 @@ impl RoutingTable { routing_domain_set: RoutingDomainSet, dial_info_filter: DialInfoFilter, ) -> EyreResult> { - self.inner.read().lookup_and_filter_noderef( - self.clone(), - node_id, - routing_domain_set, - dial_info_filter, - ) + self.inner + .read() + .lookup_and_filter_noderef(node_id, routing_domain_set, dial_info_filter) } /// Shortcut function to add a node to our routing table if it doesn't exist @@ -719,7 +677,7 @@ impl RoutingTable { ) -> EyreResult { self.inner .write() - .register_node_with_peer_info(self.clone(), peer_info, allow_invalid) + .register_node_with_peer_info(peer_info, allow_invalid) } /// Shortcut function to add a node to our routing table if it doesn't exist @@ -734,7 +692,7 @@ impl RoutingTable { ) -> EyreResult { self.inner .write() - .register_node_with_id(self.clone(), routing_domain, node_id, timestamp) + .register_node_with_id(routing_domain, node_id, timestamp) } ////////////////////////////////////////////////////////////////////// @@ -893,7 +851,7 @@ impl RoutingTable { filters: VecDeque, ) -> Vec { self.inner.read().find_fast_non_local_nodes_filtered( - self.clone(), + self.registry(), routing_domain, node_count, filters, @@ -979,7 +937,7 @@ impl RoutingTable { protocol_types_len * 2 * max_per_type, filters, |_rti, entry: Option>| { - NodeRef::new(self.clone(), entry.unwrap().clone()) + NodeRef::new(self.registry(), entry.unwrap().clone()) }, ) } @@ -1081,7 +1039,6 @@ impl RoutingTable { let res = network_result_try!( rpc_processor - .clone() .rpc_call_find_node( Destination::direct(node_ref.default_filtered()), node_id, @@ -1170,11 +1127,3 @@ impl RoutingTable { } } } - -impl core::ops::Deref for RoutingTable { - type Target = RoutingTableUnlockedInner; - - fn deref(&self) -> &Self::Target { - &self.unlocked_inner - } -} diff --git a/veilid-core/src/routing_table/node_ref/filtered_node_ref.rs b/veilid-core/src/routing_table/node_ref/filtered_node_ref.rs index 9cdc4e57..4ee38d6c 100644 --- a/veilid-core/src/routing_table/node_ref/filtered_node_ref.rs +++ b/veilid-core/src/routing_table/node_ref/filtered_node_ref.rs @@ -1,7 +1,7 @@ use super::*; pub(crate) struct FilteredNodeRef { - routing_table: RoutingTable, + registry: VeilidComponentRegistry, entry: Arc, filter: NodeRefFilter, sequencing: Sequencing, @@ -9,9 +9,15 @@ pub(crate) struct FilteredNodeRef { track_id: usize, } +impl VeilidComponentRegistryAccessor for FilteredNodeRef { + fn registry(&self) -> VeilidComponentRegistry { + self.registry.clone() + } +} + impl FilteredNodeRef { pub fn new( - routing_table: RoutingTable, + registry: VeilidComponentRegistry, entry: Arc, filter: NodeRefFilter, sequencing: Sequencing, @@ -19,7 +25,7 @@ impl FilteredNodeRef { entry.ref_count.fetch_add(1u32, Ordering::AcqRel); Self { - routing_table, + registry, entry, filter, sequencing, @@ -29,7 +35,7 @@ impl FilteredNodeRef { } pub fn unfiltered(&self) -> NodeRef { - NodeRef::new(self.routing_table.clone(), self.entry.clone()) + NodeRef::new(self.registry(), self.entry.clone()) } pub fn filtered_clone(&self, filter: NodeRefFilter) -> FilteredNodeRef { @@ -40,7 +46,7 @@ impl FilteredNodeRef { pub fn sequencing_clone(&self, sequencing: Sequencing) -> FilteredNodeRef { FilteredNodeRef::new( - self.routing_table.clone(), + self.registry.clone(), self.entry.clone(), self.filter(), sequencing, @@ -70,9 +76,6 @@ impl FilteredNodeRef { } impl NodeRefAccessorsTrait for FilteredNodeRef { - fn routing_table(&self) -> RoutingTable { - self.routing_table.clone() - } fn entry(&self) -> Arc { self.entry.clone() } @@ -105,7 +108,8 @@ impl NodeRefOperateTrait for FilteredNodeRef { where F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> T, { - let inner = &*self.routing_table.inner.read(); + let routing_table = self.registry.routing_table(); + let inner = &*routing_table.inner.read(); self.entry.with(inner, f) } @@ -113,7 +117,8 @@ impl NodeRefOperateTrait for FilteredNodeRef { where F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> T, { - let inner = &mut *self.routing_table.inner.write(); + let routing_table = self.registry.routing_table(); + let inner = &mut *routing_table.inner.write(); self.entry.with_mut(inner, f) } } @@ -125,7 +130,7 @@ impl Clone for FilteredNodeRef { self.entry.ref_count.fetch_add(1u32, Ordering::AcqRel); Self { - routing_table: self.routing_table.clone(), + registry: self.registry.clone(), entry: self.entry.clone(), filter: self.filter, sequencing: self.sequencing, @@ -162,7 +167,7 @@ impl Drop for FilteredNodeRef { // get node ids with inner unlocked because nothing could be referencing this entry now // and we don't know when it will get dropped, possibly inside a lock let node_ids = self.entry.with_inner(|e| e.node_ids()); - self.routing_table.queue_bucket_kicks(node_ids); + self.routing_table().queue_bucket_kicks(node_ids); } } } diff --git a/veilid-core/src/routing_table/node_ref/mod.rs b/veilid-core/src/routing_table/node_ref/mod.rs index 9be39762..aa58666b 100644 --- a/veilid-core/src/routing_table/node_ref/mod.rs +++ b/veilid-core/src/routing_table/node_ref/mod.rs @@ -16,18 +16,24 @@ pub(crate) use traits::*; // Default NodeRef pub(crate) struct NodeRef { - routing_table: RoutingTable, + registry: VeilidComponentRegistry, entry: Arc, #[cfg(feature = "tracking")] track_id: usize, } +impl VeilidComponentRegistryAccessor for NodeRef { + fn registry(&self) -> VeilidComponentRegistry { + self.registry.clone() + } +} + impl NodeRef { - pub fn new(routing_table: RoutingTable, entry: Arc) -> Self { + pub fn new(registry: VeilidComponentRegistry, entry: Arc) -> Self { entry.ref_count.fetch_add(1u32, Ordering::AcqRel); Self { - routing_table, + registry, entry, #[cfg(feature = "tracking")] track_id: entry.track(), @@ -36,7 +42,7 @@ impl NodeRef { pub fn default_filtered(&self) -> FilteredNodeRef { FilteredNodeRef::new( - self.routing_table.clone(), + self.registry.clone(), self.entry.clone(), NodeRefFilter::new(), Sequencing::default(), @@ -45,7 +51,7 @@ impl NodeRef { pub fn sequencing_filtered(&self, sequencing: Sequencing) -> FilteredNodeRef { FilteredNodeRef::new( - self.routing_table.clone(), + self.registry.clone(), self.entry.clone(), NodeRefFilter::new(), sequencing, @@ -57,7 +63,7 @@ impl NodeRef { routing_domain_set: R, ) -> FilteredNodeRef { FilteredNodeRef::new( - self.routing_table.clone(), + self.registry.clone(), self.entry.clone(), NodeRefFilter::new().with_routing_domain_set(routing_domain_set.into()), Sequencing::default(), @@ -66,7 +72,7 @@ impl NodeRef { pub fn custom_filtered(&self, filter: NodeRefFilter) -> FilteredNodeRef { FilteredNodeRef::new( - self.routing_table.clone(), + self.registry.clone(), self.entry.clone(), filter, Sequencing::default(), @@ -76,7 +82,7 @@ impl NodeRef { #[expect(dead_code)] pub fn dial_info_filtered(&self, filter: DialInfoFilter) -> FilteredNodeRef { FilteredNodeRef::new( - self.routing_table.clone(), + self.registry.clone(), self.entry.clone(), NodeRefFilter::new().with_dial_info_filter(filter), Sequencing::default(), @@ -92,9 +98,6 @@ impl NodeRef { } impl NodeRefAccessorsTrait for NodeRef { - fn routing_table(&self) -> RoutingTable { - self.routing_table.clone() - } fn entry(&self) -> Arc { self.entry.clone() } @@ -125,7 +128,8 @@ impl NodeRefOperateTrait for NodeRef { where F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> T, { - let inner = &*self.routing_table.inner.read(); + let routing_table = self.routing_table(); + let inner = &*routing_table.inner.read(); self.entry.with(inner, f) } @@ -133,7 +137,8 @@ impl NodeRefOperateTrait for NodeRef { where F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> T, { - let inner = &mut *self.routing_table.inner.write(); + let routing_table = self.routing_table(); + let inner = &mut *routing_table.inner.write(); self.entry.with_mut(inner, f) } } @@ -145,7 +150,7 @@ impl Clone for NodeRef { self.entry.ref_count.fetch_add(1u32, Ordering::AcqRel); Self { - routing_table: self.routing_table.clone(), + registry: self.registry.clone(), entry: self.entry.clone(), #[cfg(feature = "tracking")] track_id: self.entry.write().track(), @@ -178,7 +183,7 @@ impl Drop for NodeRef { // get node ids with inner unlocked because nothing could be referencing this entry now // and we don't know when it will get dropped, possibly inside a lock let node_ids = self.entry.with_inner(|e| e.node_ids()); - self.routing_table.queue_bucket_kicks(node_ids); + self.routing_table().queue_bucket_kicks(node_ids); } } } diff --git a/veilid-core/src/routing_table/node_ref/node_ref_lock.rs b/veilid-core/src/routing_table/node_ref/node_ref_lock.rs index c38598b7..b8e33f38 100644 --- a/veilid-core/src/routing_table/node_ref/node_ref_lock.rs +++ b/veilid-core/src/routing_table/node_ref/node_ref_lock.rs @@ -15,6 +15,21 @@ pub(crate) struct NodeRefLock< nr: N, } +impl< + 'a, + N: NodeRefAccessorsTrait + + NodeRefOperateTrait + + VeilidComponentRegistryAccessor + + fmt::Debug + + fmt::Display + + Clone, + > VeilidComponentRegistryAccessor for NodeRefLock<'a, N> +{ + fn registry(&self) -> VeilidComponentRegistry { + self.nr.registry() + } +} + impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone> NodeRefLock<'a, N> { @@ -33,9 +48,6 @@ impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Disp impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone> NodeRefAccessorsTrait for NodeRefLock<'a, N> { - fn routing_table(&self) -> RoutingTable { - self.nr.routing_table() - } fn entry(&self) -> Arc { self.nr.entry() } diff --git a/veilid-core/src/routing_table/node_ref/node_ref_lock_mut.rs b/veilid-core/src/routing_table/node_ref/node_ref_lock_mut.rs index 9b725ab6..c132e85c 100644 --- a/veilid-core/src/routing_table/node_ref/node_ref_lock_mut.rs +++ b/veilid-core/src/routing_table/node_ref/node_ref_lock_mut.rs @@ -15,6 +15,21 @@ pub(crate) struct NodeRefLockMut< nr: N, } +impl< + 'a, + N: NodeRefAccessorsTrait + + NodeRefOperateTrait + + VeilidComponentRegistryAccessor + + fmt::Debug + + fmt::Display + + Clone, + > VeilidComponentRegistryAccessor for NodeRefLockMut<'a, N> +{ + fn registry(&self) -> VeilidComponentRegistry { + self.nr.registry() + } +} + impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone> NodeRefLockMut<'a, N> { @@ -34,9 +49,6 @@ impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Disp impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone> NodeRefAccessorsTrait for NodeRefLockMut<'a, N> { - fn routing_table(&self) -> RoutingTable { - self.nr.routing_table() - } fn entry(&self) -> Arc { self.nr.entry() } diff --git a/veilid-core/src/routing_table/node_ref/traits.rs b/veilid-core/src/routing_table/node_ref/traits.rs index 4ddecf23..b4cc2d73 100644 --- a/veilid-core/src/routing_table/node_ref/traits.rs +++ b/veilid-core/src/routing_table/node_ref/traits.rs @@ -2,7 +2,6 @@ use super::*; // Field accessors pub(crate) trait NodeRefAccessorsTrait { - fn routing_table(&self) -> RoutingTable; fn entry(&self) -> Arc; fn sequencing(&self) -> Sequencing; fn routing_domain_set(&self) -> RoutingDomainSet; @@ -125,12 +124,12 @@ pub(crate) trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait }; // If relay is ourselves, then return None, because we can't relay through ourselves // and to contact this node we should have had an existing inbound connection - if rti.unlocked_inner.matches_own_node_id(rpi.node_ids()) { + if rti.routing_table().matches_own_node_id(rpi.node_ids()) { bail!("Can't relay though ourselves"); } // Register relay node and return noderef - let nr = rti.register_node_with_peer_info(self.routing_table(), rpi, false)?; + let nr = rti.register_node_with_peer_info(rpi, false)?; Ok(Some(nr)) }) } diff --git a/veilid-core/src/routing_table/privacy.rs b/veilid-core/src/routing_table/privacy.rs index b15f9b8f..927dd39a 100644 --- a/veilid-core/src/routing_table/privacy.rs +++ b/veilid-core/src/routing_table/privacy.rs @@ -31,7 +31,7 @@ pub(crate) enum RouteNode { } impl RouteNode { - pub fn validate(&self, crypto: Crypto) -> VeilidAPIResult<()> { + pub fn validate(&self, crypto: &Crypto) -> VeilidAPIResult<()> { match self { RouteNode::NodeId(_) => Ok(()), RouteNode::PeerInfo(pi) => pi.validate(crypto), @@ -91,7 +91,7 @@ pub(crate) struct RouteHop { pub next_hop: Option, } impl RouteHop { - pub fn validate(&self, crypto: Crypto) -> VeilidAPIResult<()> { + pub fn validate(&self, crypto: &Crypto) -> VeilidAPIResult<()> { self.node.validate(crypto) } } @@ -108,7 +108,7 @@ pub(crate) enum PrivateRouteHops { } impl PrivateRouteHops { - pub fn validate(&self, crypto: Crypto) -> VeilidAPIResult<()> { + pub fn validate(&self, crypto: &Crypto) -> VeilidAPIResult<()> { match self { PrivateRouteHops::FirstHop(rh) => rh.validate(crypto), PrivateRouteHops::Data(_) => Ok(()), @@ -138,7 +138,7 @@ impl PrivateRoute { } } - pub fn validate(&self, crypto: Crypto) -> VeilidAPIResult<()> { + pub fn validate(&self, crypto: &Crypto) -> VeilidAPIResult<()> { self.hops.validate(crypto) } diff --git a/veilid-core/src/routing_table/route_spec_store/mod.rs b/veilid-core/src/routing_table/route_spec_store/mod.rs index 529c2d10..c523db89 100644 --- a/veilid-core/src/routing_table/route_spec_store/mod.rs +++ b/veilid-core/src/routing_table/route_spec_store/mod.rs @@ -34,53 +34,40 @@ struct RouteSpecStoreInner { cache: RouteSpecStoreCache, } -struct RouteSpecStoreUnlockedInner { - /// Handle to routing table - routing_table: RoutingTable, +/// The routing table's storage for private/safety routes +#[derive(Debug)] +pub(crate) struct RouteSpecStore { + registry: VeilidComponentRegistry, + inner: Arc>, + /// Maximum number of hops in a route max_route_hop_count: usize, /// Default number of hops in a route default_route_hop_count: usize, } -impl fmt::Debug for RouteSpecStoreUnlockedInner { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("RouteSpecStoreUnlockedInner") - .field("max_route_hop_count", &self.max_route_hop_count) - .field("default_route_hop_count", &self.default_route_hop_count) - .finish() - } -} - -/// The routing table's storage for private/safety routes -#[derive(Clone, Debug)] -pub(crate) struct RouteSpecStore { - inner: Arc>, - unlocked_inner: Arc, -} +impl_veilid_component_registry_accessor!(RouteSpecStore); impl RouteSpecStore { - pub fn new(routing_table: RoutingTable) -> Self { - let config = routing_table.network_manager().config(); + pub fn new(registry: VeilidComponentRegistry) -> Self { + let config = registry.config(); let c = config.get(); Self { - unlocked_inner: Arc::new(RouteSpecStoreUnlockedInner { - max_route_hop_count: c.network.rpc.max_route_hop_count.into(), - default_route_hop_count: c.network.rpc.default_route_hop_count.into(), - routing_table, - }), + registry, inner: Arc::new(Mutex::new(RouteSpecStoreInner { content: RouteSpecStoreContent::new(), cache: Default::default(), })), + max_route_hop_count: c.network.rpc.max_route_hop_count.into(), + default_route_hop_count: c.network.rpc.default_route_hop_count.into(), } } - #[instrument(level = "trace", target = "route", skip(routing_table), err)] - pub async fn load(routing_table: RoutingTable) -> EyreResult { + #[instrument(level = "trace", target = "route", skip_all, err)] + pub async fn load(registry: VeilidComponentRegistry) -> EyreResult { let (max_route_hop_count, default_route_hop_count) = { - let config = routing_table.network_manager().config(); + let config = registry.config(); let c = config.get(); ( c.network.rpc.max_route_hop_count as usize, @@ -89,7 +76,10 @@ impl RouteSpecStore { }; // Get frozen blob from table store - let content = RouteSpecStoreContent::load(routing_table.clone()).await?; + let table_store = registry.lookup::().unwrap(); + let routing_table = registry.lookup::().unwrap(); + + let content = RouteSpecStoreContent::load(&table_store, &routing_table).await?; let mut inner = RouteSpecStoreInner { content, @@ -104,12 +94,10 @@ impl RouteSpecStore { // Return the loaded RouteSpecStore let rss = RouteSpecStore { - unlocked_inner: Arc::new(RouteSpecStoreUnlockedInner { - max_route_hop_count, - default_route_hop_count, - routing_table: routing_table.clone(), - }), + registry, inner: Arc::new(Mutex::new(inner)), + max_route_hop_count, + default_route_hop_count, }; Ok(rss) @@ -123,9 +111,8 @@ impl RouteSpecStore { }; // Save our content - content - .save(self.unlocked_inner.routing_table.clone()) - .await?; + let table_store = self.registry.lookup::().unwrap(); + content.save(&table_store).await?; Ok(()) } @@ -146,16 +133,17 @@ impl RouteSpecStore { dead_remote_routes, })); - let update_callback = self.unlocked_inner.routing_table.update_callback(); + let update_callback = self.registry.update_callback(); update_callback(update); } /// Purge the route spec store pub async fn purge(&self) -> VeilidAPIResult<()> { // Briefly pause routing table ticker while changes are made - let _tick_guard = self.unlocked_inner.routing_table.pause_tasks().await; - self.unlocked_inner.routing_table.cancel_tasks().await; + let routing_table = self.registry.lookup::().unwrap(); + let _tick_guard = routing_table.pause_tasks().await; + routing_table.cancel_tasks().await; { let inner = &mut *self.inner.lock(); inner.content = Default::default(); @@ -181,7 +169,7 @@ impl RouteSpecStore { automatic: bool, ) -> VeilidAPIResult { let inner = &mut *self.inner.lock(); - let routing_table = self.unlocked_inner.routing_table.clone(); + let routing_table = self.routing_table(); let rti = &mut *routing_table.inner.write(); self.allocate_route_inner( @@ -213,12 +201,10 @@ impl RouteSpecStore { apibail_generic!("safety_spec.preferred_route must be empty when allocating new route"); } - let ip6_prefix_size = rti - .unlocked_inner - .config - .get() - .network - .max_connections_per_ip6_prefix_size as usize; + let ip6_prefix_size = self + .registry() + .config() + .with(|c| c.network.max_connections_per_ip6_prefix_size as usize); if safety_spec.hop_count < 1 { apibail_invalid_argument!( @@ -228,7 +214,7 @@ impl RouteSpecStore { ); } - if safety_spec.hop_count > self.unlocked_inner.max_route_hop_count { + if safety_spec.hop_count > self.max_route_hop_count { apibail_invalid_argument!( "Not allocating route longer than max route hop count", "hop_count", @@ -492,9 +478,8 @@ impl RouteSpecStore { }) }; - let routing_table = self.unlocked_inner.routing_table.clone(); let transform = |_rti: &RoutingTableInner, entry: Option>| -> NodeRef { - NodeRef::new(routing_table.clone(), entry.unwrap()) + NodeRef::new(self.registry(), entry.unwrap()) }; // Pull the whole routing table in sorted order @@ -667,13 +652,9 @@ impl RouteSpecStore { // Got a unique route, lets build the details, register it, and return it let hop_node_refs: Vec = route_nodes.iter().map(|k| nodes[*k].clone()).collect(); let mut route_set = BTreeMap::::new(); + let crypto = self.crypto(); for crypto_kind in crypto_kinds.iter().copied() { - let vcrypto = self - .unlocked_inner - .routing_table - .crypto() - .get(crypto_kind) - .unwrap(); + let vcrypto = crypto.get(crypto_kind).unwrap(); let keypair = vcrypto.generate_keypair(); let hops: Vec = route_nodes .iter() @@ -734,7 +715,7 @@ impl RouteSpecStore { R: fmt::Debug, { let inner = &*self.inner.lock(); - let crypto = self.unlocked_inner.routing_table.crypto(); + let crypto = self.crypto(); let Some(vcrypto) = crypto.get(public_key.kind) else { log_rpc!(debug "can't handle route with public key: {:?}", public_key); return None; @@ -852,7 +833,7 @@ impl RouteSpecStore { }; // Test with double-round trip ping to self - let rpc_processor = self.unlocked_inner.routing_table.rpc_processor(); + let rpc_processor = self.rpc_processor(); let _res = match rpc_processor.rpc_call_status(dest).await? { NetworkResult::Value(v) => v, _ => { @@ -886,7 +867,7 @@ impl RouteSpecStore { // Get a safety route that is good enough let safety_spec = SafetySpec { preferred_route: None, - hop_count: self.unlocked_inner.default_route_hop_count, + hop_count: self.default_route_hop_count, stability, sequencing, }; @@ -900,8 +881,7 @@ impl RouteSpecStore { }; // Test with double-round trip ping to self - let rpc_processor = self.unlocked_inner.routing_table.rpc_processor(); - let _res = match rpc_processor.rpc_call_status(dest).await? { + let _res = match self.rpc_processor().rpc_call_status(dest).await? { NetworkResult::Value(v) => v, _ => { // Did not error, but did not come back, just return false @@ -1097,7 +1077,7 @@ impl RouteSpecStore { ) -> VeilidAPIResult { // let profile_start_ts = get_timestamp(); let inner = &mut *self.inner.lock(); - let routing_table = self.unlocked_inner.routing_table.clone(); + let routing_table = self.routing_table(); let rti = &mut *routing_table.inner.write(); // Get useful private route properties @@ -1108,7 +1088,7 @@ impl RouteSpecStore { }; let pr_pubkey = private_route.public_key.value; let pr_hopcount = private_route.hop_count as usize; - let max_route_hop_count = self.unlocked_inner.max_route_hop_count; + let max_route_hop_count = self.max_route_hop_count; // Check private route hop count isn't larger than the max route hop count plus one for the 'first hop' header if pr_hopcount > (max_route_hop_count + 1) { @@ -1130,10 +1110,10 @@ impl RouteSpecStore { let opt_first_hop = match pr_first_hop_node { RouteNode::NodeId(id) => rti - .lookup_node_ref(routing_table.clone(), TypedKey::new(crypto_kind, id)) + .lookup_node_ref(TypedKey::new(crypto_kind, id)) .map_err(VeilidAPIError::internal)?, RouteNode::PeerInfo(pi) => Some( - rti.register_node_with_peer_info(routing_table.clone(), pi, false) + rti.register_node_with_peer_info(pi, false) .map_err(VeilidAPIError::internal)? .unfiltered(), ), @@ -1362,7 +1342,7 @@ impl RouteSpecStore { avoid_nodes: &[TypedKey], ) -> VeilidAPIResult { // Ensure the total hop count isn't too long for our config - let max_route_hop_count = self.unlocked_inner.max_route_hop_count; + let max_route_hop_count = self.max_route_hop_count; if safety_spec.hop_count == 0 { apibail_invalid_argument!( "safety route hop count is zero", @@ -1438,7 +1418,7 @@ impl RouteSpecStore { avoid_nodes: &[TypedKey], ) -> VeilidAPIResult { let inner = &mut *self.inner.lock(); - let routing_table = self.unlocked_inner.routing_table.clone(); + let routing_table = self.routing_table(); let rti = &mut *routing_table.inner.write(); self.get_route_for_safety_spec_inner( @@ -1457,7 +1437,7 @@ impl RouteSpecStore { rsd: &RouteSpecDetail, optimized: bool, ) -> VeilidAPIResult { - let routing_table = self.unlocked_inner.routing_table.clone(); + let routing_table = self.routing_table(); let rti = &*routing_table.inner.read(); // Ensure we get the crypto for it @@ -1732,8 +1712,7 @@ impl RouteSpecStore { cur_ts: Timestamp, ) -> VeilidAPIResult<()> { let Some(our_node_info_ts) = self - .unlocked_inner - .routing_table + .routing_table() .get_published_peer_info(RoutingDomain::PublicInternet) .map(|pi| pi.signed_node_info().timestamp()) else { @@ -1767,11 +1746,7 @@ impl RouteSpecStore { let inner = &mut *self.inner.lock(); // Check for stub route - if self - .unlocked_inner - .routing_table - .matches_own_node_id_key(key) - { + if self.routing_table().matches_own_node_id_key(key) { return None; } @@ -1869,7 +1844,7 @@ impl RouteSpecStore { /// Convert binary blob to private route vector pub fn blob_to_private_routes(&self, blob: Vec) -> VeilidAPIResult> { // Get crypto - let crypto = self.unlocked_inner.routing_table.crypto(); + let crypto = self.crypto(); // Deserialize count if blob.is_empty() { @@ -1904,7 +1879,7 @@ impl RouteSpecStore { let private_route = decode_private_route(&decode_context, &pr_reader).map_err(|e| { VeilidAPIError::invalid_argument("failed to decode private route", "e", e) })?; - private_route.validate(crypto.clone()).map_err(|e| { + private_route.validate(&crypto).map_err(|e| { VeilidAPIError::invalid_argument("failed to validate private route", "e", e) })?; @@ -1920,7 +1895,7 @@ impl RouteSpecStore { /// Generate RouteId from typed key set of route public keys fn generate_allocated_route_id(&self, rssd: &RouteSetSpecDetail) -> VeilidAPIResult { let route_set_keys = rssd.get_route_set_keys(); - let crypto = self.unlocked_inner.routing_table.crypto(); + let crypto = self.crypto(); let mut idbytes = Vec::with_capacity(PUBLIC_KEY_LENGTH * route_set_keys.len()); let mut best_kind: Option = None; @@ -1945,7 +1920,7 @@ impl RouteSpecStore { &self, private_routes: &[PrivateRoute], ) -> VeilidAPIResult { - let crypto = self.unlocked_inner.routing_table.crypto(); + let crypto = self.crypto(); let mut idbytes = Vec::with_capacity(PUBLIC_KEY_LENGTH * private_routes.len()); let mut best_kind: Option = None; diff --git a/veilid-core/src/routing_table/route_spec_store/route_spec_store_content.rs b/veilid-core/src/routing_table/route_spec_store/route_spec_store_content.rs index f46b54dd..0ec61aa2 100644 --- a/veilid-core/src/routing_table/route_spec_store/route_spec_store_content.rs +++ b/veilid-core/src/routing_table/route_spec_store/route_spec_store_content.rs @@ -17,9 +17,11 @@ impl RouteSpecStoreContent { } } - pub async fn load(routing_table: RoutingTable) -> EyreResult { + pub async fn load( + table_store: &TableStore, + routing_table: &RoutingTable, + ) -> EyreResult { // Deserialize what we can - let table_store = routing_table.network_manager().table_store(); let rsstdb = table_store.open("RouteSpecStore", 1).await?; let mut content: RouteSpecStoreContent = rsstdb.load_json(0, b"content").await?.unwrap_or_default(); @@ -59,10 +61,9 @@ impl RouteSpecStoreContent { Ok(content) } - pub async fn save(&self, routing_table: RoutingTable) -> EyreResult<()> { + pub async fn save(&self, table_store: &TableStore) -> EyreResult<()> { // Save all the fields we care about to the frozen blob in table storage // This skips #[with(Skip)] saving the secret keys, we save them in the protected store instead - let table_store = routing_table.network_manager().table_store(); let rsstdb = table_store.open("RouteSpecStore", 1).await?; rsstdb.store_json(0, b"content", self).await?; diff --git a/veilid-core/src/routing_table/routing_table_inner/mod.rs b/veilid-core/src/routing_table/routing_table_inner/mod.rs index 9e3dae90..e399d17e 100644 --- a/veilid-core/src/routing_table/routing_table_inner/mod.rs +++ b/veilid-core/src/routing_table/routing_table_inner/mod.rs @@ -15,8 +15,8 @@ pub type EntryCounts = BTreeMap<(RoutingDomain, CryptoKind), usize>; /// RoutingTable rwlock-internal data pub struct RoutingTableInner { - /// Extra pointer to unlocked members to simplify access - pub(super) unlocked_inner: Arc, + /// Convenience accessor for the global component registry + pub(super) registry: VeilidComponentRegistry, /// Routing table buckets that hold references to entries, per crypto kind pub(super) buckets: BTreeMap>, /// A weak set of all the entries we have in the buckets for faster iteration @@ -44,10 +44,12 @@ pub struct RoutingTableInner { pub(super) opt_active_watch_keepalive_ts: Option, } +impl_veilid_component_registry_accessor!(RoutingTableInner); + impl RoutingTableInner { - pub(super) fn new(unlocked_inner: Arc) -> RoutingTableInner { + pub(super) fn new(registry: VeilidComponentRegistry) -> RoutingTableInner { RoutingTableInner { - unlocked_inner, + registry, buckets: BTreeMap::new(), public_internet_routing_domain: PublicInternetRoutingDomainDetail::default(), local_network_routing_domain: LocalNetworkRoutingDomainDetail::default(), @@ -458,7 +460,7 @@ impl RoutingTableInner { // Collect all entries that are 'needs_ping' and have some node info making them reachable somehow pub(super) fn get_nodes_needing_ping( &self, - outer_self: RoutingTable, + registry: VeilidComponentRegistry, routing_domain: RoutingDomain, cur_ts: Timestamp, ) -> Vec { @@ -559,7 +561,7 @@ impl RoutingTableInner { let transform = |_rti: &RoutingTableInner, v: Option>| { FilteredNodeRef::new( - outer_self.clone(), + self.registry.clone(), v.unwrap().clone(), NodeRefFilter::new().with_routing_domain(routing_domain), Sequencing::default(), @@ -570,10 +572,10 @@ impl RoutingTableInner { } #[expect(dead_code)] - pub fn get_all_alive_nodes(&self, outer_self: RoutingTable, cur_ts: Timestamp) -> Vec { + pub fn get_all_alive_nodes(&self, cur_ts: Timestamp) -> Vec { let mut node_refs = Vec::::with_capacity(self.bucket_entry_count()); self.with_entries(cur_ts, BucketEntryState::Unreliable, |_rti, entry| { - node_refs.push(NodeRef::new(outer_self.clone(), entry)); + node_refs.push(NodeRef::new(self.registry(), entry)); Option::<()>::None }); node_refs @@ -687,15 +689,16 @@ impl RoutingTableInner { #[instrument(level = "trace", skip_all, err)] fn create_node_ref( &mut self, - outer_self: RoutingTable, node_ids: &TypedKeyGroup, update_func: F, ) -> EyreResult where F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner), { + let routing_table = self.routing_table(); + // Ensure someone isn't trying register this node itself - if self.unlocked_inner.matches_own_node_id(node_ids) { + if routing_table.matches_own_node_id(node_ids) { bail!("can't register own node"); } @@ -708,7 +711,7 @@ impl RoutingTableInner { continue; } // Find the first in crypto sort order - let bucket_index = self.unlocked_inner.calculate_bucket_index(node_id); + let bucket_index = routing_table.calculate_bucket_index(node_id); let bucket = self.get_bucket(bucket_index); if let Some(entry) = bucket.entry(&node_id.value) { // Best entry is the first one in sorted order that exists from the node id list @@ -730,7 +733,7 @@ impl RoutingTableInner { } // Make a noderef to return - let nr = NodeRef::new(outer_self.clone(), best_entry.clone()); + let nr = NodeRef::new(self.registry(), best_entry.clone()); // Update the entry with the update func best_entry.with_mut_inner(|e| update_func(self, e)); @@ -741,11 +744,11 @@ impl RoutingTableInner { // If no entry exists yet, add the first entry to a bucket, possibly evicting a bucket member let first_node_id = node_ids[0]; - let bucket_entry = self.unlocked_inner.calculate_bucket_index(&first_node_id); + let bucket_entry = routing_table.calculate_bucket_index(&first_node_id); let bucket = self.get_bucket_mut(bucket_entry); let new_entry = bucket.add_new_entry(first_node_id.value); self.all_entries.insert(new_entry.clone()); - self.unlocked_inner.kick_queue.lock().insert(bucket_entry); + routing_table.kick_queue.lock().insert(bucket_entry); // Update the other bucket entries with the remaining node ids if let Err(e) = self.update_bucket_entry_node_ids(new_entry.clone(), node_ids) { @@ -753,7 +756,7 @@ impl RoutingTableInner { } // Make node ref to return - let nr = NodeRef::new(outer_self.clone(), new_entry.clone()); + let nr = NodeRef::new(self.registry(), new_entry.clone()); // Update the entry with the update func new_entry.with_mut_inner(|e| update_func(self, e)); @@ -766,15 +769,9 @@ impl RoutingTableInner { /// Resolve an existing routing table entry using any crypto kind and return a reference to it #[instrument(level = "trace", skip_all, err)] - pub fn lookup_any_node_ref( - &self, - outer_self: RoutingTable, - node_id_key: PublicKey, - ) -> EyreResult> { + pub fn lookup_any_node_ref(&self, node_id_key: PublicKey) -> EyreResult> { for ck in VALID_CRYPTO_KINDS { - if let Some(nr) = - self.lookup_node_ref(outer_self.clone(), TypedKey::new(ck, node_id_key))? - { + if let Some(nr) = self.lookup_node_ref(TypedKey::new(ck, node_id_key))? { return Ok(Some(nr)); } } @@ -783,35 +780,30 @@ impl RoutingTableInner { /// Resolve an existing routing table entry and return a reference to it #[instrument(level = "trace", skip_all, err)] - pub fn lookup_node_ref( - &self, - outer_self: RoutingTable, - node_id: TypedKey, - ) -> EyreResult> { - if self.unlocked_inner.matches_own_node_id(&[node_id]) { + pub fn lookup_node_ref(&self, node_id: TypedKey) -> EyreResult> { + if self.routing_table().matches_own_node_id(&[node_id]) { bail!("can't look up own node id in routing table"); } if !VALID_CRYPTO_KINDS.contains(&node_id.kind) { bail!("can't look up node id with invalid crypto kind"); } - let bucket_index = self.unlocked_inner.calculate_bucket_index(&node_id); + let bucket_index = self.routing_table().calculate_bucket_index(&node_id); let bucket = self.get_bucket(bucket_index); Ok(bucket .entry(&node_id.value) - .map(|e| NodeRef::new(outer_self, e))) + .map(|e| NodeRef::new(self.registry(), e))) } /// Resolve an existing routing table entry and return a filtered reference to it #[instrument(level = "trace", skip_all, err)] pub fn lookup_and_filter_noderef( &self, - outer_self: RoutingTable, node_id: TypedKey, routing_domain_set: RoutingDomainSet, dial_info_filter: DialInfoFilter, ) -> EyreResult> { - let nr = self.lookup_node_ref(outer_self, node_id)?; + let nr = self.lookup_node_ref(node_id)?; Ok(nr.map(|nr| { nr.custom_filtered( NodeRefFilter::new() @@ -826,7 +818,7 @@ impl RoutingTableInner { where F: FnOnce(Arc) -> R, { - if self.unlocked_inner.matches_own_node_id(&[node_id]) { + if self.routing_table().matches_own_node_id(&[node_id]) { log_rtab!(error "can't look up own node id in routing table"); return None; } @@ -834,7 +826,7 @@ impl RoutingTableInner { log_rtab!(error "can't look up node id with invalid crypto kind"); return None; } - let bucket_entry = self.unlocked_inner.calculate_bucket_index(&node_id); + let bucket_entry = self.routing_table().calculate_bucket_index(&node_id); let bucket = self.get_bucket(bucket_entry); bucket.entry(&node_id.value).map(f) } @@ -845,7 +837,6 @@ impl RoutingTableInner { #[instrument(level = "trace", skip_all, err)] pub fn register_node_with_peer_info( &mut self, - outer_self: RoutingTable, peer_info: Arc, allow_invalid: bool, ) -> EyreResult { @@ -853,7 +844,7 @@ impl RoutingTableInner { // if our own node is in the list, then ignore it as we don't add ourselves to our own routing table if self - .unlocked_inner + .routing_table() .matches_own_node_id(peer_info.node_ids()) { bail!("can't register own node id in routing table"); @@ -891,10 +882,10 @@ impl RoutingTableInner { if let Some(relay_peer_info) = peer_info.signed_node_info().relay_peer_info(routing_domain) { if !self - .unlocked_inner + .routing_table() .matches_own_node_id(relay_peer_info.node_ids()) { - self.register_node_with_peer_info(outer_self.clone(), relay_peer_info, false)?; + self.register_node_with_peer_info(relay_peer_info, false)?; } } @@ -902,7 +893,7 @@ impl RoutingTableInner { Arc::unwrap_or_clone(peer_info).destructure(); let mut updated = false; let mut old_peer_info = None; - let nr = self.create_node_ref(outer_self, &node_ids, |_rti, e| { + let nr = self.create_node_ref(&node_ids, |_rti, e| { old_peer_info = e.make_peer_info(routing_domain); updated = e.update_signed_node_info(routing_domain, &signed_node_info); })?; @@ -922,12 +913,11 @@ impl RoutingTableInner { #[instrument(level = "trace", skip_all, err)] pub fn register_node_with_id( &mut self, - outer_self: RoutingTable, routing_domain: RoutingDomain, node_id: TypedKey, timestamp: Timestamp, ) -> EyreResult { - let nr = self.create_node_ref(outer_self, &TypedKeyGroup::from(node_id), |_rti, e| { + let nr = self.create_node_ref(&TypedKeyGroup::from(node_id), |_rti, e| { //e.make_not_dead(timestamp); e.touch_last_seen(timestamp); })?; @@ -1057,7 +1047,7 @@ impl RoutingTableInner { #[instrument(level = "trace", skip_all)] pub fn find_fast_non_local_nodes_filtered( &self, - outer_self: RoutingTable, + registry: VeilidComponentRegistry, routing_domain: RoutingDomain, node_count: usize, mut filters: VecDeque, @@ -1089,7 +1079,7 @@ impl RoutingTableInner { node_count, filters, |_rti: &RoutingTableInner, v: Option>| { - NodeRef::new(outer_self.clone(), v.unwrap().clone()) + NodeRef::new(registry.clone(), v.unwrap().clone()) }, ) } @@ -1283,10 +1273,12 @@ impl RoutingTableInner { T: for<'r> FnMut(&'r RoutingTableInner, Option>) -> O, { let cur_ts = Timestamp::now(); + let routing_table = self.routing_table(); // Get the crypto kind let crypto_kind = node_id.kind; - let Some(vcrypto) = self.unlocked_inner.crypto().get(crypto_kind) else { + let crypto = self.crypto(); + let Some(vcrypto) = crypto.get(crypto_kind) else { apibail_generic!("invalid crypto kind"); }; @@ -1338,12 +1330,12 @@ impl RoutingTableInner { let a_key = if let Some(a_entry) = a_entry { a_entry.with_inner(|e| e.node_ids().get(crypto_kind).unwrap()) } else { - self.unlocked_inner.node_id(crypto_kind) + routing_table.node_id(crypto_kind) }; let b_key = if let Some(b_entry) = b_entry { b_entry.with_inner(|e| e.node_ids().get(crypto_kind).unwrap()) } else { - self.unlocked_inner.node_id(crypto_kind) + routing_table.node_id(crypto_kind) }; // distance is the next metric, closer nodes first @@ -1379,7 +1371,8 @@ impl RoutingTableInner { .collect(); // Sort closest - let sort = make_closest_noderef_sort(self.unlocked_inner.crypto(), node_id); + let crypto = self.crypto(); + let sort = make_closest_noderef_sort(&crypto, node_id); closest_nodes_locked.sort_by(sort); // Unlock noderefs @@ -1388,10 +1381,10 @@ impl RoutingTableInner { } #[instrument(level = "trace", skip_all)] -pub fn make_closest_noderef_sort( - crypto: Crypto, +pub fn make_closest_noderef_sort<'a>( + crypto: &'a Crypto, node_id: TypedKey, -) -> impl Fn(&LockedNodeRef, &LockedNodeRef) -> core::cmp::Ordering { +) -> impl Fn(&LockedNodeRef, &LockedNodeRef) -> core::cmp::Ordering + 'a { let kind = node_id.kind; // Get cryptoversion to check distance with let vcrypto = crypto.get(kind).unwrap(); @@ -1417,10 +1410,10 @@ pub fn make_closest_noderef_sort( } } -pub fn make_closest_node_id_sort( - crypto: Crypto, +pub fn make_closest_node_id_sort<'a>( + crypto: &'a Crypto, node_id: TypedKey, -) -> impl Fn(&CryptoKey, &CryptoKey) -> core::cmp::Ordering { +) -> impl Fn(&CryptoKey, &CryptoKey) -> core::cmp::Ordering + 'a { let kind = node_id.kind; // Get cryptoversion to check distance with let vcrypto = crypto.get(kind).unwrap(); diff --git a/veilid-core/src/routing_table/routing_table_inner/routing_domains/local_network/editor.rs b/veilid-core/src/routing_table/routing_table_inner/routing_domains/local_network/editor.rs index f752df8f..7cfc4f76 100644 --- a/veilid-core/src/routing_table/routing_table_inner/routing_domains/local_network/editor.rs +++ b/veilid-core/src/routing_table/routing_table_inner/routing_domains/local_network/editor.rs @@ -10,15 +10,15 @@ enum RoutingDomainChangeLocalNetwork { Common(RoutingDomainChangeCommon), } -pub struct RoutingDomainEditorLocalNetwork { - routing_table: RoutingTable, +pub struct RoutingDomainEditorLocalNetwork<'a> { + routing_table: &'a RoutingTable, changes: Vec, } -impl RoutingDomainEditorLocalNetwork { - pub(in crate::routing_table) fn new(routing_table: RoutingTable) -> Self { +impl<'a> RoutingDomainEditorLocalNetwork<'a> { + pub(in crate::routing_table) fn new(routing_table: &'a RoutingTable) -> Self { Self { - routing_table: routing_table.clone(), + routing_table, changes: Vec::new(), } } @@ -30,7 +30,7 @@ impl RoutingDomainEditorLocalNetwork { } } -impl RoutingDomainEditorCommonTrait for RoutingDomainEditorLocalNetwork { +impl<'a> RoutingDomainEditorCommonTrait for RoutingDomainEditorLocalNetwork<'a> { #[instrument(level = "debug", skip(self))] fn clear_dial_info_details( &mut self, diff --git a/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/editor.rs b/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/editor.rs index dd644395..3e9db0a2 100644 --- a/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/editor.rs +++ b/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/editor.rs @@ -5,13 +5,13 @@ enum RoutingDomainChangePublicInternet { Common(RoutingDomainChangeCommon), } -pub struct RoutingDomainEditorPublicInternet { - routing_table: RoutingTable, +pub struct RoutingDomainEditorPublicInternet<'a> { + routing_table: &'a RoutingTable, changes: Vec, } -impl RoutingDomainEditorPublicInternet { - pub(in crate::routing_table) fn new(routing_table: RoutingTable) -> Self { +impl<'a> RoutingDomainEditorPublicInternet<'a> { + pub(in crate::routing_table) fn new(routing_table: &'a RoutingTable) -> Self { Self { routing_table, changes: Vec::new(), @@ -41,7 +41,7 @@ impl RoutingDomainEditorPublicInternet { } } -impl RoutingDomainEditorCommonTrait for RoutingDomainEditorPublicInternet { +impl<'a> RoutingDomainEditorCommonTrait for RoutingDomainEditorPublicInternet<'a> { #[instrument(level = "debug", skip(self))] fn clear_dial_info_details( &mut self, @@ -263,8 +263,9 @@ impl RoutingDomainEditorCommonTrait for RoutingDomainEditorPublicInternet { if changed { // Clear the routespecstore cache if our PublicInternet dial info has changed - let rss = self.routing_table.route_spec_store(); - rss.reset_cache(); + if let Some(rss) = self.routing_table.route_spec_store() { + rss.reset_cache(); + } } } diff --git a/veilid-core/src/routing_table/tasks/bootstrap.rs b/veilid-core/src/routing_table/tasks/bootstrap.rs index aa7a4c1b..d5cb6621 100644 --- a/veilid-core/src/routing_table/tasks/bootstrap.rs +++ b/veilid-core/src/routing_table/tasks/bootstrap.rs @@ -81,7 +81,7 @@ impl RoutingTable { } // If this is our own node id, then we skip it for bootstrap, in case we are a bootstrap node - if self.unlocked_inner.matches_own_node_id(&node_ids) { + if self.matches_own_node_id(&node_ids) { return Ok(None); } @@ -255,7 +255,7 @@ impl RoutingTable { //#[instrument(level = "trace", skip(self), err)] pub fn bootstrap_with_peer( - self, + &self, crypto_kinds: Vec, pi: Arc, unord: &FuturesUnordered>, @@ -325,7 +325,7 @@ impl RoutingTable { #[instrument(level = "trace", skip(self), err)] pub async fn bootstrap_with_peer_list( - self, + &self, peers: Vec>, stop_token: StopToken, ) -> EyreResult<()> { @@ -364,10 +364,15 @@ impl RoutingTable { } #[instrument(level = "trace", skip(self), err)] - pub async fn bootstrap_task_routine(self, stop_token: StopToken) -> EyreResult<()> { + pub async fn bootstrap_task_routine( + &self, + stop_token: StopToken, + _last_ts: Timestamp, + _cur_ts: Timestamp, + ) -> EyreResult<()> { let bootstrap = self - .unlocked_inner - .with_config(|c| c.network.routing_table.bootstrap.clone()); + .config() + .with(|c| c.network.routing_table.bootstrap.clone()); // Don't bother if bootstraps aren't configured if bootstrap.is_empty() { diff --git a/veilid-core/src/routing_table/tasks/closest_peers_refresh.rs b/veilid-core/src/routing_table/tasks/closest_peers_refresh.rs index 3da4e3cf..3769085f 100644 --- a/veilid-core/src/routing_table/tasks/closest_peers_refresh.rs +++ b/veilid-core/src/routing_table/tasks/closest_peers_refresh.rs @@ -10,14 +10,18 @@ impl RoutingTable { /// Ask our closest peers to give us more peers close to ourselves. This will /// assist with the DHT and other algorithms that utilize the distance metric. #[instrument(level = "trace", skip(self), err)] - pub async fn closest_peers_refresh_task_routine(self, stop_token: StopToken) -> EyreResult<()> { + pub async fn closest_peers_refresh_task_routine( + &self, + stop_token: StopToken, + _last_ts: Timestamp, + _cur_ts: Timestamp, + ) -> EyreResult<()> { let mut unord = FuturesUnordered::new(); for crypto_kind in VALID_CRYPTO_KINDS { // Get our node id for this cryptokind let self_node_id = self.node_id(crypto_kind); - let routing_table = self.clone(); let mut filters = VecDeque::new(); let filter = Box::new( move |rti: &RoutingTableInner, opt_entry: Option>| { @@ -47,13 +51,13 @@ impl RoutingTable { ) as RoutingTableEntryFilter; filters.push_front(filter); - let noderefs = routing_table + let noderefs = self .find_preferred_closest_nodes( CLOSEST_PEERS_REQUEST_COUNT, self_node_id, filters, |_rti, entry: Option>| { - NodeRef::new(routing_table.clone(), entry.unwrap().clone()) + NodeRef::new(self.registry(), entry.unwrap().clone()) }, ) .unwrap(); diff --git a/veilid-core/src/routing_table/tasks/kick_buckets.rs b/veilid-core/src/routing_table/tasks/kick_buckets.rs index e46a12e6..72f3b644 100644 --- a/veilid-core/src/routing_table/tasks/kick_buckets.rs +++ b/veilid-core/src/routing_table/tasks/kick_buckets.rs @@ -11,15 +11,15 @@ impl RoutingTable { // Attempts to keep the size of the routing table down to the bucket depth #[instrument(level = "trace", skip(self), err)] pub async fn kick_buckets_task_routine( - self, + &self, _stop_token: StopToken, _last_ts: Timestamp, cur_ts: Timestamp, ) -> EyreResult<()> { - let kick_queue: Vec = - core::mem::take(&mut *self.unlocked_inner.kick_queue.lock()) - .into_iter() - .collect(); + let crypto = self.crypto(); + let kick_queue: Vec = core::mem::take(&mut *self.kick_queue.lock()) + .into_iter() + .collect(); let mut inner = self.inner.write(); // Get our exempt nodes for each crypto kind @@ -30,7 +30,7 @@ impl RoutingTable { let Some(buckets) = inner.buckets.get(&kind) else { continue; }; - let sort = make_closest_node_id_sort(self.crypto(), our_node_id); + let sort = make_closest_node_id_sort(&crypto, our_node_id); let mut closest_peers = BTreeSet::::new(); let mut closest_unreliable_count = 0usize; diff --git a/veilid-core/src/routing_table/tasks/mod.rs b/veilid-core/src/routing_table/tasks/mod.rs index 268c8b76..936497ae 100644 --- a/veilid-core/src/routing_table/tasks/mod.rs +++ b/veilid-core/src/routing_table/tasks/mod.rs @@ -12,175 +12,98 @@ use super::*; impl RoutingTable { pub fn setup_tasks(&self) { // Set rolling transfers tick task - { - let this = self.clone(); - self.unlocked_inner - .rolling_transfers_task - .set_routine(move |s, l, t| { - Box::pin(this.clone().rolling_transfers_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) - }); - } + impl_setup_task!( + self, + Self, + rolling_transfers_task, + rolling_transfers_task_routine + ); // Set update state stats tick task - { - let this = self.clone(); - self.unlocked_inner - .update_state_stats_task - .set_routine(move |s, l, t| { - Box::pin(this.clone().update_state_stats_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) - }); - } + impl_setup_task!( + self, + Self, + update_state_stats_task, + update_state_stats_task_routine + ); // Set rolling answers tick task - { - let this = self.clone(); - self.unlocked_inner - .rolling_answers_task - .set_routine(move |s, l, t| { - Box::pin(this.clone().rolling_answers_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) - }); - } + impl_setup_task!( + self, + Self, + rolling_answers_task, + rolling_answers_task_routine + ); // Set kick buckets tick task - { - let this = self.clone(); - self.unlocked_inner - .kick_buckets_task - .set_routine(move |s, l, t| { - Box::pin(this.clone().kick_buckets_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) - }); - } + impl_setup_task!(self, Self, kick_buckets_task, kick_buckets_task_routine); // Set bootstrap tick task - { - let this = self.clone(); - self.unlocked_inner - .bootstrap_task - .set_routine(move |s, _l, _t| Box::pin(this.clone().bootstrap_task_routine(s))); - } + impl_setup_task!(self, Self, bootstrap_task, bootstrap_task_routine); // Set peer minimum refresh tick task - { - let this = self.clone(); - self.unlocked_inner - .peer_minimum_refresh_task - .set_routine(move |s, _l, _t| { - Box::pin(this.clone().peer_minimum_refresh_task_routine(s)) - }); - } + impl_setup_task!( + self, + Self, + peer_minimum_refresh_task, + peer_minimum_refresh_task_routine + ); // Set closest peers refresh tick task - { - let this = self.clone(); - self.unlocked_inner - .closest_peers_refresh_task - .set_routine(move |s, _l, _t| { - Box::pin(this.clone().closest_peers_refresh_task_routine(s)) - }); - } + impl_setup_task!( + self, + Self, + closest_peers_refresh_task, + closest_peers_refresh_task_routine + ); // Set ping validator PublicInternet tick task - { - let this = self.clone(); - self.unlocked_inner - .ping_validator_public_internet_task - .set_routine(move |s, l, t| { - Box::pin(this.clone().ping_validator_public_internet_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) - }); - } + impl_setup_task!( + self, + Self, + ping_validator_public_internet_task, + ping_validator_public_internet_task_routine + ); // Set ping validator LocalNetwork tick task - { - let this = self.clone(); - self.unlocked_inner - .ping_validator_local_network_task - .set_routine(move |s, l, t| { - Box::pin(this.clone().ping_validator_local_network_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) - }); - } + impl_setup_task!( + self, + Self, + ping_validator_local_network_task, + ping_validator_local_network_task_routine + ); // Set ping validator PublicInternet Relay tick task - { - let this = self.clone(); - self.unlocked_inner - .ping_validator_public_internet_relay_task - .set_routine(move |s, l, t| { - Box::pin( - this.clone() - .ping_validator_public_internet_relay_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - ), - ) - }); - } + impl_setup_task!( + self, + Self, + ping_validator_public_internet_relay_task, + ping_validator_public_internet_relay_task_routine + ); // Set ping validator Active Watch tick task - { - let this = self.clone(); - self.unlocked_inner - .ping_validator_active_watch_task - .set_routine(move |s, l, t| { - Box::pin(this.clone().ping_validator_active_watch_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) - }); - } + impl_setup_task!( + self, + Self, + ping_validator_active_watch_task, + ping_validator_active_watch_task_routine + ); // Set relay management tick task - { - let this = self.clone(); - self.unlocked_inner - .relay_management_task - .set_routine(move |s, l, t| { - Box::pin(this.clone().relay_management_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) - }); - } + impl_setup_task!( + self, + Self, + relay_management_task, + relay_management_task_routine + ); // Set private route management tick task - { - let this = self.clone(); - self.unlocked_inner - .private_route_management_task - .set_routine(move |s, l, t| { - Box::pin(this.clone().private_route_management_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) - }); - } + impl_setup_task!( + self, + Self, + private_route_management_task, + private_route_management_task_routine + ); } /// Ticks about once per second @@ -197,18 +120,18 @@ impl RoutingTable { }; // Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs - self.unlocked_inner.rolling_transfers_task.tick().await?; + self.rolling_transfers_task.tick().await?; // Do state stats update every UPDATE_STATE_STATS_INTERVAL_SECS secs - self.unlocked_inner.update_state_stats_task.tick().await?; + self.update_state_stats_task.tick().await?; // Do rolling answers every ROLLING_ANSWER_INTERVAL_SECS secs - self.unlocked_inner.rolling_answers_task.tick().await?; + self.rolling_answers_task.tick().await?; // Kick buckets task - let kick_bucket_queue_count = self.unlocked_inner.kick_queue.lock().len(); + let kick_bucket_queue_count = self.kick_queue.lock().len(); if kick_bucket_queue_count > 0 { - self.unlocked_inner.kick_buckets_task.tick().await?; + self.kick_buckets_task.tick().await?; } // Refresh entry counts @@ -222,7 +145,9 @@ impl RoutingTable { return Ok(()); } - let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize); + let min_peer_count = self + .config() + .with(|c| c.network.dht.min_peer_count as usize); // Figure out which tables need bootstrap or peer minimum refresh let mut needs_bootstrap = false; @@ -237,40 +162,27 @@ impl RoutingTable { } } if needs_bootstrap { - self.unlocked_inner.bootstrap_task.tick().await?; + self.bootstrap_task.tick().await?; } if needs_peer_minimum_refresh { - self.unlocked_inner.peer_minimum_refresh_task.tick().await?; + self.peer_minimum_refresh_task.tick().await?; } // Ping validate some nodes to groom the table - self.unlocked_inner - .ping_validator_public_internet_task - .tick() - .await?; - self.unlocked_inner - .ping_validator_local_network_task - .tick() - .await?; - self.unlocked_inner - .ping_validator_public_internet_relay_task - .tick() - .await?; - self.unlocked_inner - .ping_validator_active_watch_task + self.ping_validator_public_internet_task.tick().await?; + self.ping_validator_local_network_task.tick().await?; + self.ping_validator_public_internet_relay_task .tick() .await?; + self.ping_validator_active_watch_task.tick().await?; // Run the relay management task - self.unlocked_inner.relay_management_task.tick().await?; + self.relay_management_task.tick().await?; // Get more nodes if we need to if !needs_bootstrap && !needs_peer_minimum_refresh { // Run closest peers refresh task - self.unlocked_inner - .closest_peers_refresh_task - .tick() - .await?; + self.closest_peers_refresh_task.tick().await?; } // Only perform these operations if we already have a published peer info @@ -279,10 +191,7 @@ impl RoutingTable { .is_some() { // Run the private route management task - self.unlocked_inner - .private_route_management_task - .tick() - .await?; + self.private_route_management_task.tick().await?; } Ok(()) @@ -295,82 +204,57 @@ impl RoutingTable { pub async fn cancel_tasks(&self) { // Cancel all tasks being ticked log_rtab!(debug "stopping rolling transfers task"); - if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { + if let Err(e) = self.rolling_transfers_task.stop().await { error!("rolling_transfers_task not stopped: {}", e); } log_rtab!(debug "stopping update state stats task"); - if let Err(e) = self.unlocked_inner.update_state_stats_task.stop().await { + if let Err(e) = self.update_state_stats_task.stop().await { error!("update_state_stats_task not stopped: {}", e); } log_rtab!(debug "stopping rolling answers task"); - if let Err(e) = self.unlocked_inner.rolling_answers_task.stop().await { + if let Err(e) = self.rolling_answers_task.stop().await { error!("rolling_answers_task not stopped: {}", e); } log_rtab!(debug "stopping kick buckets task"); - if let Err(e) = self.unlocked_inner.kick_buckets_task.stop().await { + if let Err(e) = self.kick_buckets_task.stop().await { error!("kick_buckets_task not stopped: {}", e); } log_rtab!(debug "stopping bootstrap task"); - if let Err(e) = self.unlocked_inner.bootstrap_task.stop().await { + if let Err(e) = self.bootstrap_task.stop().await { error!("bootstrap_task not stopped: {}", e); } log_rtab!(debug "stopping peer minimum refresh task"); - if let Err(e) = self.unlocked_inner.peer_minimum_refresh_task.stop().await { + if let Err(e) = self.peer_minimum_refresh_task.stop().await { error!("peer_minimum_refresh_task not stopped: {}", e); } log_rtab!(debug "stopping ping_validator tasks"); - if let Err(e) = self - .unlocked_inner - .ping_validator_public_internet_task - .stop() - .await - { + if let Err(e) = self.ping_validator_public_internet_task.stop().await { error!("ping_validator_public_internet_task not stopped: {}", e); } - if let Err(e) = self - .unlocked_inner - .ping_validator_local_network_task - .stop() - .await - { + if let Err(e) = self.ping_validator_local_network_task.stop().await { error!("ping_validator_local_network_task not stopped: {}", e); } - if let Err(e) = self - .unlocked_inner - .ping_validator_public_internet_relay_task - .stop() - .await - { + if let Err(e) = self.ping_validator_public_internet_relay_task.stop().await { error!( "ping_validator_public_internet_relay_task not stopped: {}", e ); } - if let Err(e) = self - .unlocked_inner - .ping_validator_active_watch_task - .stop() - .await - { + if let Err(e) = self.ping_validator_active_watch_task.stop().await { error!("ping_validator_active_watch_task not stopped: {}", e); } log_rtab!(debug "stopping relay management task"); - if let Err(e) = self.unlocked_inner.relay_management_task.stop().await { + if let Err(e) = self.relay_management_task.stop().await { warn!("relay_management_task not stopped: {}", e); } log_rtab!(debug "stopping private route management task"); - if let Err(e) = self - .unlocked_inner - .private_route_management_task - .stop() - .await - { + if let Err(e) = self.private_route_management_task.stop().await { warn!("private_route_management_task not stopped: {}", e); } log_rtab!(debug "stopping closest peers refresh task"); - if let Err(e) = self.unlocked_inner.closest_peers_refresh_task.stop().await { + if let Err(e) = self.closest_peers_refresh_task.stop().await { warn!("closest_peers_refresh_task not stopped: {}", e); } } diff --git a/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs b/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs index f4176e75..febd66ce 100644 --- a/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs +++ b/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs @@ -12,11 +12,16 @@ impl RoutingTable { // nodes for their PublicInternet peers, which is a very fast way to get // a new node online. #[instrument(level = "trace", skip(self), err)] - pub async fn peer_minimum_refresh_task_routine(self, stop_token: StopToken) -> EyreResult<()> { + pub async fn peer_minimum_refresh_task_routine( + &self, + stop_token: StopToken, + _last_ts: Timestamp, + _cur_ts: Timestamp, + ) -> EyreResult<()> { // Get counts by crypto kind let entry_count = self.inner.read().cached_entry_counts(); - let (min_peer_count, min_peer_refresh_time_ms) = self.with_config(|c| { + let (min_peer_count, min_peer_refresh_time_ms) = self.config().with(|c| { ( c.network.dht.min_peer_count as usize, c.network.dht.min_peer_refresh_time_ms, @@ -39,7 +44,6 @@ impl RoutingTable { continue; } - let routing_table = self.clone(); let mut filters = VecDeque::new(); let filter = Box::new( move |rti: &RoutingTableInner, opt_entry: Option>| { @@ -64,23 +68,18 @@ impl RoutingTable { ) as RoutingTableEntryFilter; filters.push_front(filter); - let noderefs = routing_table.find_preferred_fastest_nodes( + let noderefs = self.find_preferred_fastest_nodes( min_peer_count, filters, |_rti, entry: Option>| { - NodeRef::new(routing_table.clone(), entry.unwrap().clone()) + NodeRef::new(self.registry(), entry.unwrap().clone()) }, ); for nr in noderefs { - let routing_table = self.clone(); ord.push_back( - async move { - routing_table - .reverse_find_node(crypto_kind, nr, false, vec![]) - .await - } - .instrument(Span::current()), + async move { self.reverse_find_node(crypto_kind, nr, false, vec![]).await } + .instrument(Span::current()), ); } } diff --git a/veilid-core/src/routing_table/tasks/ping_validator.rs b/veilid-core/src/routing_table/tasks/ping_validator.rs index fcceca69..d932a725 100644 --- a/veilid-core/src/routing_table/tasks/ping_validator.rs +++ b/veilid-core/src/routing_table/tasks/ping_validator.rs @@ -18,7 +18,7 @@ impl RoutingTable { // Task routine for PublicInternet status pings #[instrument(level = "trace", skip(self), err)] pub async fn ping_validator_public_internet_task_routine( - self, + &self, stop_token: StopToken, _last_ts: Timestamp, cur_ts: Timestamp, @@ -37,7 +37,7 @@ impl RoutingTable { // Task routine for LocalNetwork status pings #[instrument(level = "trace", skip(self), err)] pub async fn ping_validator_local_network_task_routine( - self, + &self, stop_token: StopToken, _last_ts: Timestamp, cur_ts: Timestamp, @@ -56,7 +56,7 @@ impl RoutingTable { // Task routine for PublicInternet relay keepalive pings #[instrument(level = "trace", skip(self), err)] pub async fn ping_validator_public_internet_relay_task_routine( - self, + &self, stop_token: StopToken, _last_ts: Timestamp, cur_ts: Timestamp, @@ -75,7 +75,7 @@ impl RoutingTable { // Task routine for active watch keepalive pings #[instrument(level = "trace", skip(self), err)] pub async fn ping_validator_active_watch_task_routine( - self, + &self, stop_token: StopToken, _last_ts: Timestamp, cur_ts: Timestamp, @@ -180,11 +180,11 @@ impl RoutingTable { } for relay_nr_filtered in relay_noderefs { - let rpc = rpc.clone(); futurequeue.push_back( async move { log_rtab!("--> PublicInternet Relay ping to {:?}", relay_nr_filtered); - let _ = rpc + let _ = self + .rpc_processor() .rpc_call_status(Destination::direct(relay_nr_filtered)) .await?; Ok(()) @@ -202,8 +202,6 @@ impl RoutingTable { cur_ts: Timestamp, futurequeue: &mut VecDeque, ) -> EyreResult<()> { - let rpc = self.rpc_processor(); - let watches_need_keepalive = { let mut inner = self.inner.write(); let need = inner @@ -224,15 +222,16 @@ impl RoutingTable { } // Get all the active watches from the storage manager - let storage_manager = self.unlocked_inner.network_manager.storage_manager(); - let watch_destinations = storage_manager.get_active_watch_nodes().await; + let watch_destinations = self.storage_manager().get_active_watch_nodes().await; for watch_destination in watch_destinations { - let rpc = rpc.clone(); futurequeue.push_back( async move { log_rtab!("--> Watch Keepalive ping to {:?}", watch_destination); - let _ = rpc.rpc_call_status(watch_destination).await?; + let _ = self + .rpc_processor() + .rpc_call_status(watch_destination) + .await?; Ok(()) } .boxed(), @@ -249,8 +248,6 @@ impl RoutingTable { cur_ts: Timestamp, futurequeue: &mut VecDeque, ) -> EyreResult<()> { - let rpc = self.rpc_processor(); - // Get all nodes needing pings in the PublicInternet routing domain let node_refs = self.get_nodes_needing_ping(RoutingDomain::PublicInternet, cur_ts); @@ -258,12 +255,14 @@ impl RoutingTable { for nr in node_refs { let nr = nr.sequencing_clone(Sequencing::PreferOrdered); - let rpc = rpc.clone(); futurequeue.push_back( async move { #[cfg(feature = "verbose-tracing")] log_rtab!(debug "--> PublicInternet Validator ping to {:?}", nr); - let _ = rpc.rpc_call_status(Destination::direct(nr)).await?; + let _ = self + .rpc_processor() + .rpc_call_status(Destination::direct(nr)) + .await?; Ok(()) } .boxed(), diff --git a/veilid-core/src/routing_table/tasks/private_route_management.rs b/veilid-core/src/routing_table/tasks/private_route_management.rs index 4b708b80..d485b081 100644 --- a/veilid-core/src/routing_table/tasks/private_route_management.rs +++ b/veilid-core/src/routing_table/tasks/private_route_management.rs @@ -8,12 +8,13 @@ const BACKGROUND_SAFETY_ROUTE_COUNT: usize = 2; impl RoutingTable { fn get_background_safety_route_count(&self) -> usize { - let c = self.config.get(); - if c.capabilities.disable.contains(&CAP_ROUTE) { - 0 - } else { - BACKGROUND_SAFETY_ROUTE_COUNT - } + self.config().with(|c| { + if c.capabilities.disable.contains(&CAP_ROUTE) { + 0 + } else { + BACKGROUND_SAFETY_ROUTE_COUNT + } + }) } /// Fastest routes sort fn route_sort_latency_fn(a: &(RouteId, u64), b: &(RouteId, u64)) -> cmp::Ordering { @@ -44,10 +45,13 @@ impl RoutingTable { /// If a route doesn't 'need_testing', then we neither test nor drop it #[instrument(level = "trace", skip(self))] fn get_allocated_routes_to_test(&self, cur_ts: Timestamp) -> Vec { - let default_route_hop_count = - self.with_config(|c| c.network.rpc.default_route_hop_count as usize); + let default_route_hop_count = self + .config() + .with(|c| c.network.rpc.default_route_hop_count as usize); - let rss = self.route_spec_store(); + let Some(rss) = self.route_spec_store() else { + return vec![]; + }; let mut must_test_routes = Vec::::new(); let mut unpublished_routes = Vec::<(RouteId, u64)>::new(); let mut expired_routes = Vec::::new(); @@ -115,7 +119,10 @@ impl RoutingTable { log_rtab!("Testing routes: {:?}", routes_needing_testing); // Test all the routes that need testing at the same time - let rss = self.route_spec_store(); + let Some(rss) = self.route_spec_store() else { + return Ok(()); + }; + #[derive(Default, Debug)] struct TestRouteContext { dead_routes: Vec, @@ -125,10 +132,11 @@ impl RoutingTable { { let mut unord = FuturesUnordered::new(); for r in routes_needing_testing { - let rss = rss.clone(); let ctx = ctx.clone(); unord.push( async move { + let rss = self.route_spec_store().unwrap(); + let success = match rss.test_route(r).await { // Test had result Ok(Some(v)) => v, @@ -169,7 +177,7 @@ impl RoutingTable { /// Keep private routes assigned and accessible #[instrument(level = "trace", skip(self, stop_token), err)] pub async fn private_route_management_task_routine( - self, + &self, stop_token: StopToken, _last_ts: Timestamp, cur_ts: Timestamp, @@ -183,10 +191,13 @@ impl RoutingTable { } // Ensure we have a minimum of N allocated local, unpublished routes with the default number of hops and all our supported crypto kinds - let default_route_hop_count = - self.with_config(|c| c.network.rpc.default_route_hop_count as usize); + let default_route_hop_count = self + .config() + .with(|c| c.network.rpc.default_route_hop_count as usize); let mut local_unpublished_route_count = 0usize; - let rss = self.route_spec_store(); + let Some(rss) = self.route_spec_store() else { + return Ok(()); + }; rss.list_allocated_routes(|_k, v| { if !v.is_published() && v.hop_count() == default_route_hop_count diff --git a/veilid-core/src/routing_table/tasks/relay_management.rs b/veilid-core/src/routing_table/tasks/relay_management.rs index 4d9a5f30..6c283a60 100644 --- a/veilid-core/src/routing_table/tasks/relay_management.rs +++ b/veilid-core/src/routing_table/tasks/relay_management.rs @@ -54,7 +54,7 @@ impl RoutingTable { // Keep relays assigned and accessible #[instrument(level = "trace", skip_all, err)] pub async fn relay_management_task_routine( - self, + &self, _stop_token: StopToken, _last_ts: Timestamp, cur_ts: Timestamp, diff --git a/veilid-core/src/routing_table/tasks/update_statistics.rs b/veilid-core/src/routing_table/tasks/update_statistics.rs index c165b630..92cf6681 100644 --- a/veilid-core/src/routing_table/tasks/update_statistics.rs +++ b/veilid-core/src/routing_table/tasks/update_statistics.rs @@ -4,7 +4,7 @@ impl RoutingTable { // Compute transfer statistics to determine how 'fast' a node is #[instrument(level = "trace", skip(self), err)] pub async fn rolling_transfers_task_routine( - self, + &self, _stop_token: StopToken, last_ts: Timestamp, cur_ts: Timestamp, @@ -27,8 +27,9 @@ impl RoutingTable { } // Roll all route transfers - let rss = self.route_spec_store(); - rss.roll_transfers(last_ts, cur_ts); + if let Some(rss) = self.route_spec_store() { + rss.roll_transfers(last_ts, cur_ts); + } Ok(()) } @@ -36,7 +37,7 @@ impl RoutingTable { // Update state statistics in PeerStats #[instrument(level = "trace", skip(self), err)] pub async fn update_state_stats_task_routine( - self, + &self, _stop_token: StopToken, _last_ts: Timestamp, _cur_ts: Timestamp, @@ -57,7 +58,7 @@ impl RoutingTable { // Update rolling answers in PeerStats #[instrument(level = "trace", skip(self), err)] pub async fn rolling_answers_task_routine( - self, + &self, _stop_token: StopToken, _last_ts: Timestamp, cur_ts: Timestamp, diff --git a/veilid-core/src/routing_table/tests/mod.rs b/veilid-core/src/routing_table/tests/mod.rs index 97b641d5..467694ba 100644 --- a/veilid-core/src/routing_table/tests/mod.rs +++ b/veilid-core/src/routing_table/tests/mod.rs @@ -1,30 +1,18 @@ use super::*; +use crate::storage_manager::StorageManager; pub mod test_serialize_routing_table; -pub(crate) fn mock_routing_table() -> routing_table::RoutingTable { +pub(crate) async fn mock_routing_table<'a>() -> VeilidComponentGuard<'a, RoutingTable> { let veilid_config = VeilidConfig::new_from_config(VeilidConfigInner::default(), Arc::new(|_| {})); let registry = VeilidComponentRegistry::new(veilid_config); + registry.enable_mock(); registry.register(ProtectedStore::new); registry.register(TableStore::new); registry.register(Crypto::new); - let storage_manager = storage_manager::StorageManager::new( - event_bus.clone(), - veilid_config.clone(), - crypto.clone(), - table_store.clone(), - #[cfg(feature = "unstable-blockstore")] - block_store.clone(), - ); - let network_manager = network_manager::NetworkManager::new( - event_bus.clone(), - veilid_config.clone(), - storage_manager, - table_store.clone(), - #[cfg(feature = "unstable-blockstore")] - block_store.clone(), - crypto.clone(), - ); - RoutingTable::new(network_manager) + registry.register(StorageManager::new); + registry.register(RoutingTable::new); + registry.init().await.unwrap(); + registry.lookup::().unwrap() } diff --git a/veilid-core/src/routing_table/tests/test_serialize_routing_table.rs b/veilid-core/src/routing_table/tests/test_serialize_routing_table.rs index 91576a4e..41f1618f 100644 --- a/veilid-core/src/routing_table/tests/test_serialize_routing_table.rs +++ b/veilid-core/src/routing_table/tests/test_serialize_routing_table.rs @@ -1,16 +1,14 @@ use super::*; pub async fn test_routingtable_buckets_round_trip() { - let original = mock_routing_table(); - let copy = mock_routing_table(); - original.init().await.unwrap(); - copy.init().await.unwrap(); + let original = mock_routing_table().await; + let copy = mock_routing_table().await; // Add lots of routes to `original` here to exercise all various types. let (serialized_bucket_map, all_entry_bytes) = original.serialized_buckets(); - copy.populate_routing_table( + RoutingTable::populate_routing_table_inner( &mut copy.inner.write(), serialized_bucket_map, all_entry_bytes, @@ -51,8 +49,8 @@ pub async fn test_routingtable_buckets_round_trip() { } // Even if these are mocks, we should still practice good hygiene. - original.terminate().await; - copy.terminate().await; + original.registry().terminate().await; + copy.registry().terminate().await; } pub async fn test_round_trip_peerinfo() { diff --git a/veilid-core/src/routing_table/types/peer_info.rs b/veilid-core/src/routing_table/types/peer_info.rs index 1199d14c..2d9e319f 100644 --- a/veilid-core/src/routing_table/types/peer_info.rs +++ b/veilid-core/src/routing_table/types/peer_info.rs @@ -43,7 +43,7 @@ impl PeerInfo { } } - pub fn validate(&self, crypto: Crypto) -> VeilidAPIResult<()> { + pub fn validate(&self, crypto: &Crypto) -> VeilidAPIResult<()> { let validated_node_ids = self.signed_node_info.validate(&self.node_ids, crypto)?; if validated_node_ids.is_empty() { // Shouldn't get here because signed node info validation also checks this @@ -65,11 +65,11 @@ impl PeerInfo { (self.routing_domain, self.node_ids, self.signed_node_info) } - pub fn validate_vec(peer_info_vec: &mut Vec>, crypto: Crypto) { + pub fn validate_vec(peer_info_vec: &mut Vec>, crypto: &Crypto) { let mut n = 0usize; while n < peer_info_vec.len() { let pi = peer_info_vec.get(n).unwrap(); - if pi.validate(crypto.clone()).is_err() { + if pi.validate(crypto).is_err() { peer_info_vec.remove(n); } else { n += 1; diff --git a/veilid-core/src/routing_table/types/signed_direct_node_info.rs b/veilid-core/src/routing_table/types/signed_direct_node_info.rs index e4587262..011aab45 100644 --- a/veilid-core/src/routing_table/types/signed_direct_node_info.rs +++ b/veilid-core/src/routing_table/types/signed_direct_node_info.rs @@ -36,7 +36,7 @@ impl SignedDirectNodeInfo { pub fn validate( &self, node_ids: &TypedKeyGroup, - crypto: Crypto, + crypto: &Crypto, ) -> VeilidAPIResult { let node_info_bytes = Self::make_signature_bytes(&self.node_info, self.timestamp)?; @@ -54,7 +54,7 @@ impl SignedDirectNodeInfo { } pub fn make_signatures( - crypto: Crypto, + crypto: &Crypto, typed_key_pairs: Vec, node_info: NodeInfo, ) -> VeilidAPIResult { diff --git a/veilid-core/src/routing_table/types/signed_node_info.rs b/veilid-core/src/routing_table/types/signed_node_info.rs index e4adfc1e..b59e5e67 100644 --- a/veilid-core/src/routing_table/types/signed_node_info.rs +++ b/veilid-core/src/routing_table/types/signed_node_info.rs @@ -27,7 +27,7 @@ impl SignedNodeInfo { pub fn validate( &self, node_ids: &TypedKeyGroup, - crypto: Crypto, + crypto: &Crypto, ) -> VeilidAPIResult { match self { SignedNodeInfo::Direct(d) => d.validate(node_ids, crypto), diff --git a/veilid-core/src/routing_table/types/signed_relayed_node_info.rs b/veilid-core/src/routing_table/types/signed_relayed_node_info.rs index 275cb0d0..a0950912 100644 --- a/veilid-core/src/routing_table/types/signed_relayed_node_info.rs +++ b/veilid-core/src/routing_table/types/signed_relayed_node_info.rs @@ -49,7 +49,7 @@ impl SignedRelayedNodeInfo { pub fn validate( &self, node_ids: &TypedKeyGroup, - crypto: Crypto, + crypto: &Crypto, ) -> VeilidAPIResult { // Ensure the relay info for the node has a superset of the crypto kinds of the node it is relaying if common_crypto_kinds( @@ -81,7 +81,7 @@ impl SignedRelayedNodeInfo { } pub fn make_signatures( - crypto: Crypto, + crypto: &Crypto, typed_key_pairs: Vec, node_info: NodeInfo, relay_ids: TypedKeyGroup, diff --git a/veilid-core/src/rpc_processor/fanout/fanout_call.rs b/veilid-core/src/rpc_processor/fanout/fanout_call.rs index 3daaca87..86b3d8e4 100644 --- a/veilid-core/src/rpc_processor/fanout/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout/fanout_call.rs @@ -91,14 +91,14 @@ pub fn capability_fanout_node_info_filter(caps: Vec) -> FanoutNodeIn /// If the algorithm times out, a Timeout result is returned, however operations will still have been performed and a /// timeout is not necessarily indicative of an algorithmic 'failure', just that no definitive stopping condition was found /// in the given time -pub(crate) struct FanoutCall +pub(crate) struct FanoutCall<'a, R, F, C, D> where R: Unpin, F: Future, C: Fn(NodeRef) -> F, D: Fn(&[NodeRef]) -> Option, { - routing_table: RoutingTable, + routing_table: &'a RoutingTable, node_id: TypedKey, context: Mutex>, node_count: usize, @@ -109,7 +109,7 @@ where check_done: D, } -impl FanoutCall +impl<'a, R, F, C, D> FanoutCall<'a, R, F, C, D> where R: Unpin, F: Future, @@ -118,7 +118,7 @@ where { #[allow(clippy::too_many_arguments)] pub fn new( - routing_table: RoutingTable, + routing_table: &'a RoutingTable, node_id: TypedKey, node_count: usize, fanout: usize, @@ -126,13 +126,13 @@ where node_info_filter: FanoutNodeInfoFilter, call_routine: C, check_done: D, - ) -> Arc { + ) -> Self { let context = Mutex::new(FanoutContext { fanout_queue: FanoutQueue::new(node_id.kind), result: None, }); - Arc::new(Self { + Self { routing_table, node_id, context, @@ -142,11 +142,11 @@ where node_info_filter, call_routine, check_done, - }) + } } #[instrument(level = "trace", target = "fanout", skip_all)] - fn evaluate_done(self: Arc, ctx: &mut FanoutContext) -> bool { + fn evaluate_done(&self, ctx: &mut FanoutContext) -> bool { // If we have a result, then we're done if ctx.result.is_some() { return true; @@ -158,7 +158,7 @@ where } #[instrument(level = "trace", target = "fanout", skip_all)] - fn add_to_fanout_queue(self: Arc, new_nodes: &[NodeRef]) { + fn add_to_fanout_queue(&self, new_nodes: &[NodeRef]) { event!(target: "fanout", Level::DEBUG, "FanoutCall::add_to_fanout_queue:\n new_nodes={{\n{}}}\n", new_nodes @@ -169,24 +169,23 @@ where ); let ctx = &mut *self.context.lock(); - let this = self.clone(); ctx.fanout_queue.add(new_nodes, |current_nodes| { - let mut current_nodes_vec = this + let mut current_nodes_vec = self .routing_table - .sort_and_clean_closest_noderefs(this.node_id, current_nodes); + .sort_and_clean_closest_noderefs(self.node_id, current_nodes); current_nodes_vec.truncate(self.node_count); current_nodes_vec }); } #[instrument(level = "trace", target = "fanout", skip_all)] - async fn fanout_processor(self: Arc) -> bool { + async fn fanout_processor(&self) -> bool { // Loop until we have a result or are done loop { // Get the closest node we haven't processed yet if we're not done yet let next_node = { let mut ctx = self.context.lock(); - if self.clone().evaluate_done(&mut ctx) { + if self.evaluate_done(&mut ctx) { break true; } ctx.fanout_queue.next() @@ -221,7 +220,7 @@ where let new_nodes = self .routing_table .register_nodes_with_peer_info_list(filtered_v); - self.clone().add_to_fanout_queue(&new_nodes); + self.add_to_fanout_queue(&new_nodes); } #[allow(unused_variables)] Ok(x) => { @@ -239,10 +238,10 @@ where } #[instrument(level = "trace", target = "fanout", skip_all)] - fn init_closest_nodes(self: Arc) -> Result<(), RPCError> { + fn init_closest_nodes(&self) -> Result<(), RPCError> { // Get the 'node_count' closest nodes to the key out of our routing table let closest_nodes = { - let routing_table = self.routing_table.clone(); + let routing_table = self.routing_table; let node_info_filter = self.node_info_filter.clone(); let filter = Box::new( move |rti: &RoutingTableInner, opt_entry: Option>| { @@ -253,7 +252,7 @@ where let entry = opt_entry.unwrap(); // Filter entries - entry.with(rti, |_rti, e| { + entry.with(routing_table, rti, |_rt, _rti, e| { let Some(signed_node_info) = e.signed_node_info(RoutingDomain::PublicInternet) else { @@ -277,20 +276,20 @@ where let filters = VecDeque::from([filter]); let transform = |_rti: &RoutingTableInner, v: Option>| { - NodeRef::new(routing_table.clone(), v.unwrap().clone()) + NodeRef::new(routing_table.registry(), v.unwrap().clone()) }; routing_table .find_preferred_closest_nodes(self.node_count, self.node_id, filters, transform) .map_err(RPCError::invalid_format)? }; - self.clone().add_to_fanout_queue(&closest_nodes); + self.add_to_fanout_queue(&closest_nodes); Ok(()) } #[instrument(level = "trace", target = "fanout", skip_all)] pub async fn run( - self: Arc, + &self, init_fanout_queue: Vec, ) -> TimeoutOr, RPCError>> { // Get timeout in milliseconds @@ -302,17 +301,17 @@ where }; // Initialize closest nodes list - if let Err(e) = self.clone().init_closest_nodes() { + if let Err(e) = self.init_closest_nodes() { return TimeoutOr::value(Err(e)); } // Ensure we include the most recent nodes - self.clone().add_to_fanout_queue(&init_fanout_queue); + self.add_to_fanout_queue(&init_fanout_queue); // Do a quick check to see if we're already done { let mut ctx = self.context.lock(); - if self.clone().evaluate_done(&mut ctx) { + if self.evaluate_done(&mut ctx) { return TimeoutOr::value(ctx.result.take().transpose()); } } @@ -322,7 +321,7 @@ where { // Spin up 'fanout' tasks to process the fanout for _ in 0..self.fanout { - let h = self.clone().fanout_processor(); + let h = self.fanout_processor(); unord.push(h); } } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 5259ba47..14e65344 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -88,29 +88,27 @@ enum RPCKind { ///////////////////////////////////////////////////////////////////// +#[derive(Debug)] struct RPCProcessorInner { send_channel: Option>, stop_source: Option, worker_join_handles: Vec>, } -struct RPCProcessorUnlockedInner { - network_manager: NetworkManager, +#[derive(Debug)] +pub(crate) struct RPCProcessor { + registry: VeilidComponentRegistry, + inner: Arc>, timeout_us: TimestampDuration, queue_size: u32, concurrency: u32, max_route_hop_count: usize, - update_callback: UpdateCallback, waiting_rpc_table: OperationWaiter>, waiting_app_call_table: OperationWaiter, ()>, startup_lock: StartupLock, } -#[derive(Clone)] -pub(crate) struct RPCProcessor { - inner: Arc>, - unlocked_inner: Arc, -} +impl_veilid_component!(RPCProcessor); impl RPCProcessor { fn new_inner() -> RPCProcessorInner { @@ -120,13 +118,11 @@ impl RPCProcessor { worker_join_handles: Vec::new(), } } - fn new_unlocked_inner( - network_manager: NetworkManager, - update_callback: UpdateCallback, - ) -> RPCProcessorUnlockedInner { + + pub fn new(registry: VeilidComponentRegistry) -> Self { // make local copy of node id for easy access let (concurrency, queue_size, max_route_hop_count, timeout_us) = { - let config = network_manager.config(); + let config = registry.config(); let c = config.get(); // set up channel @@ -146,50 +142,20 @@ impl RPCProcessor { (concurrency, queue_size, max_route_hop_count, timeout_us) }; - RPCProcessorUnlockedInner { - network_manager, + Self { + registry, + inner: Arc::new(Mutex::new(Self::new_inner())), timeout_us, queue_size, concurrency, max_route_hop_count, - update_callback, waiting_rpc_table: OperationWaiter::new(), waiting_app_call_table: OperationWaiter::new(), startup_lock: StartupLock::new(), } } - pub fn new(network_manager: NetworkManager, update_callback: UpdateCallback) -> Self { - Self { - inner: Arc::new(Mutex::new(Self::new_inner())), - unlocked_inner: Arc::new(Self::new_unlocked_inner(network_manager, update_callback)), - } - } - pub fn network_manager(&self) -> NetworkManager { - self.unlocked_inner.network_manager.clone() - } - - pub fn crypto(&self) -> Crypto { - self.unlocked_inner.network_manager.crypto() - } - - pub fn event_bus(&self) -> EventBus { - self.unlocked_inner.network_manager.event_bus() - } - - pub fn routing_table(&self) -> RoutingTable { - self.unlocked_inner.network_manager.routing_table() - } - - pub fn storage_manager(&self) -> StorageManager { - self.unlocked_inner.network_manager.storage_manager() - } - - pub fn with_config R>(&self, func: F) -> R { - let config = self.unlocked_inner.network_manager.config(); - let c = config.get(); - func(&c) - } +xxx continue here ////////////////////////////////////////////////////////////////////// diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index 7e854c4e..f80da4a2 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -28,13 +28,13 @@ impl StorageManager { #[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_get_value( &self, - rpc_processor: RPCProcessor, + rpc_processor: &RPCProcessor, key: TypedKey, subkey: ValueSubkey, safety_selection: SafetySelection, last_get_result: GetResult, ) -> VeilidAPIResult>> { - let routing_table = rpc_processor.routing_table(); + let routing_table = self.routing_table(); let routing_domain = RoutingDomain::PublicInternet; // Get the DHT parameters for 'GetValue' @@ -84,12 +84,13 @@ impl StorageManager { // Routine to call to generate fanout let call_routine = { let context = context.clone(); - let rpc_processor = rpc_processor.clone(); + let registry = self.registry(); move |next_node: NodeRef| { let context = context.clone(); - let rpc_processor = rpc_processor.clone(); + let registry = registry.clone(); let last_descriptor = last_get_result.opt_descriptor.clone(); async move { + let rpc_processor = registry.lookup::().unwrap(); let gva = network_result_try!( rpc_processor .clone() @@ -300,13 +301,14 @@ impl StorageManager { subkey: ValueSubkey, last_seq: ValueSeqNum, ) { - let this = self.clone(); - inner.process_deferred_results( + let registry = self.registry(); + Self::process_deferred_results_inner(inner, res_rx, Box::new( move |result: VeilidAPIResult| -> SendPinBoxFuture { - let this = this.clone(); + let registry=registry.clone(); Box::pin(async move { + let this = registry.lookup::().unwrap(); let result = match result { Ok(v) => v, Err(e) => { @@ -361,24 +363,27 @@ impl StorageManager { }; // Keep the list of nodes that returned a value for later reference - let mut inner = self.lock().await?; + let mut inner = self.inner.lock().await; - inner.process_fanout_results( + Self::process_fanout_results_inner( + &mut *inner, key, core::iter::once((subkey, &result.fanout_result)), false, + self.config() + .with(|c| c.network.dht.set_value_count as usize), ); // If we got a new value back then write it to the opened record if Some(get_result_value.value_data().seq()) != opt_last_seq { - inner - .handle_set_local_value( - key, - subkey, - get_result_value.clone(), - WatchUpdateMode::UpdateAll, - ) - .await?; + Self::handle_set_local_value( + &mut *inner, + key, + subkey, + get_result_value.clone(), + WatchUpdateMode::UpdateAll, + ) + .await?; } Ok(Some(get_result_value.value_data().clone())) } @@ -391,12 +396,13 @@ impl StorageManager { subkey: ValueSubkey, want_descriptor: bool, ) -> VeilidAPIResult> { - let mut inner = self.lock().await?; + let mut inner = self.inner.lock().await; // See if this is a remote or local value let (_is_local, last_get_result) = { // See if the subkey we are getting has a last known local value - let mut last_get_result = inner.handle_get_local_value(key, subkey, true).await?; + let mut last_get_result = + Self::handle_get_local_value_inner(&mut *inner, key, subkey, true).await?; // If this is local, it must have a descriptor already if last_get_result.opt_descriptor.is_some() { if !want_descriptor { @@ -405,9 +411,9 @@ impl StorageManager { (true, last_get_result) } else { // See if the subkey we are getting has a last known remote value - let last_get_result = inner - .handle_get_remote_value(key, subkey, want_descriptor) - .await?; + let last_get_result = + Self::handle_get_remote_value_inner(&mut *inner, key, subkey, want_descriptor) + .await?; (false, last_get_result) } }; diff --git a/veilid-core/src/storage_manager/inspect_value.rs b/veilid-core/src/storage_manager/inspect_value.rs index d92f9868..fc28cdc3 100644 --- a/veilid-core/src/storage_manager/inspect_value.rs +++ b/veilid-core/src/storage_manager/inspect_value.rs @@ -52,14 +52,14 @@ impl StorageManager { #[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_inspect_value( &self, - rpc_processor: RPCProcessor, + rpc_processor: &RPCProcessor, key: TypedKey, subkeys: ValueSubkeyRangeSet, safety_selection: SafetySelection, local_inspect_result: InspectResult, use_set_scope: bool, ) -> VeilidAPIResult { - let routing_table = rpc_processor.routing_table(); + let routing_table = self.routing_table(); let routing_domain = RoutingDomain::PublicInternet; // Get the DHT parameters for 'InspectValue' diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 7a94dd6d..e380bfa6 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -241,8 +241,9 @@ impl StorageManager { let mut inner = self.inner.lock().await; // Schedule tick + let registry = self.registry(); let tick_future = interval("storage manager tick", 1000, move || { - let registry = self.registry(); + let registry = registry.clone(); async move { let this = registry.lookup::().unwrap(); if let Err(e) = this.tick().await { @@ -306,7 +307,7 @@ impl StorageManager { log_stor!(debug "finished storage manager shutdown"); } - async fn save_metadata(inner: &StorageManagerInner) -> EyreResult<()> { + async fn save_metadata(inner: &mut StorageManagerInner) -> EyreResult<()> { if let Some(metadata_db) = &inner.metadata_db { let tx = metadata_db.transact(); tx.store_json(0, OFFLINE_SUBKEY_WRITES, &inner.offline_subkey_writes)?; @@ -417,8 +418,7 @@ impl StorageManager { // Now that the record is made we should always succeed to open the existing record // The initial writer is the owner of the record - inner - .open_existing_record(key, Some(owner), safety_selection) + Self::open_existing_record_inner(&mut *inner, key, Some(owner), safety_selection) .await .map(|r| r.unwrap()) } @@ -431,12 +431,11 @@ impl StorageManager { writer: Option, safety_selection: SafetySelection, ) -> VeilidAPIResult { - let mut inner = self.lock().await?; + let mut inner = self.inner.lock().await; // See if we have a local record already or not - if let Some(res) = inner - .open_existing_record(key, writer, safety_selection) - .await? + if let Some(res) = + Self::open_existing_record_inner(&mut *inner, key, writer, safety_selection).await? { return Ok(res); } @@ -444,7 +443,7 @@ impl StorageManager { // No record yet, try to get it from the network // Get rpc processor and drop mutex so we don't block while getting the value from the network - let Some(rpc_processor) = Self::get_ready_rpc_processor(&inner) else { + let Some(rpc_processor) = self.get_ready_rpc_processor() else { apibail_try_again!("offline, try again later"); }; @@ -456,7 +455,7 @@ impl StorageManager { let subkey: ValueSubkey = 0; let res_rx = self .outbound_get_value( - rpc_processor, + &rpc_processor, key, subkey, safety_selection, @@ -513,10 +512,10 @@ impl StorageManager { #[instrument(level = "trace", target = "stor", skip_all)] pub async fn close_record(&self, key: TypedKey) -> VeilidAPIResult<()> { let (opt_opened_record, opt_rpc_processor) = { - let mut inner = self.lock().await?; + let mut inner = self.inner.lock().await; ( - inner.close_record(key)?, - Self::get_ready_rpc_processor(&inner), + Self::close_record_inner(&mut *inner, key)?, + self.get_ready_rpc_processor(), ) }; @@ -571,7 +570,7 @@ impl StorageManager { self.close_record(key).await?; // Get record from the local store - let mut inner = self.lock().await?; + let mut inner = self.inner.lock().await; let Some(local_record_store) = inner.local_record_store.as_mut() else { apibail_not_initialized!(); }; @@ -588,7 +587,7 @@ impl StorageManager { subkey: ValueSubkey, force_refresh: bool, ) -> VeilidAPIResult> { - let mut inner = self.lock().await?; + let mut inner = self.inner.lock().await; let safety_selection = { let Some(opened_record) = inner.opened_records.get(&key) else { apibail_generic!("record not open"); @@ -597,7 +596,8 @@ impl StorageManager { }; // See if the requested subkey is our local record store - let last_get_result = inner.handle_get_local_value(key, subkey, true).await?; + let last_get_result = + Self::handle_get_local_value_inner(&mut *inner, key, subkey, true).await?; // Return the existing value if we have one unless we are forcing a refresh if !force_refresh { @@ -609,7 +609,7 @@ impl StorageManager { // Refresh if we can // Get rpc processor and drop mutex so we don't block while getting the value from the network - let Some(rpc_processor) = Self::get_ready_rpc_processor(&inner) else { + let Some(rpc_processor) = self.get_ready_rpc_processor() else { // Return the existing value if we have one if we aren't online if let Some(last_get_result_value) = last_get_result.opt_value { return Ok(Some(last_get_result_value.value_data().clone())); @@ -628,7 +628,7 @@ impl StorageManager { .map(|v| v.value_data().seq()); let res_rx = self .outbound_get_value( - rpc_processor, + &rpc_processor, key, subkey, safety_selection, @@ -651,7 +651,7 @@ impl StorageManager { if let Some(out) = &out { // If there's more to process, do it in the background if partial { - let mut inner = self.lock().await?; + let mut inner = self.inner.lock().await; self.process_deferred_outbound_get_value_result_inner( &mut inner, res_rx, @@ -1514,6 +1514,7 @@ impl StorageManager { key: TypedKey, subkey_results_iter: I, is_set: bool, + consensus_count: usize, ) { // Get local record store let local_record_store = inner.local_record_store.as_mut().unwrap(); @@ -1545,7 +1546,7 @@ impl StorageManager { .collect::>(); nodes_ts.sort_by(|a, b| b.1.cmp(&a.1)); - for dead_node_key in nodes_ts.iter().skip(self.set_consensus_count) { + for dead_node_key in nodes_ts.iter().skip(consensus_count) { d.nodes.remove(&dead_node_key.0); } }); diff --git a/veilid-core/src/storage_manager/tasks/check_active_watches.rs b/veilid-core/src/storage_manager/tasks/check_active_watches.rs index 4d6727cd..5b6acb45 100644 --- a/veilid-core/src/storage_manager/tasks/check_active_watches.rs +++ b/veilid-core/src/storage_manager/tasks/check_active_watches.rs @@ -13,7 +13,9 @@ impl StorageManager { let mut inner = self.inner.lock().await; let routing_table = self.routing_table(); - let rss = routing_table.route_spec_store(); + let Some(rss) = routing_table.route_spec_store() else { + return Ok(()); + }; let update_callback = self.update_callback(); diff --git a/veilid-core/src/storage_manager/tasks/check_watched_records.rs b/veilid-core/src/storage_manager/tasks/check_watched_records.rs index 792e5df5..c784555f 100644 --- a/veilid-core/src/storage_manager/tasks/check_watched_records.rs +++ b/veilid-core/src/storage_manager/tasks/check_watched_records.rs @@ -4,7 +4,7 @@ impl StorageManager { // Check if server-side watches have expired #[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn check_watched_records_task_routine( - self, + &self, _stop_token: StopToken, _last_ts: Timestamp, _cur_ts: Timestamp, diff --git a/veilid-core/src/storage_manager/tasks/mod.rs b/veilid-core/src/storage_manager/tasks/mod.rs index 928184c6..0e4f8020 100644 --- a/veilid-core/src/storage_manager/tasks/mod.rs +++ b/veilid-core/src/storage_manager/tasks/mod.rs @@ -10,64 +10,48 @@ impl StorageManager { pub(super) fn setup_tasks(&self) { // Set flush records tick task log_stor!(debug "starting flush record stores task"); - { - let registry = self.registry(); - self.flush_record_stores_task.set_routine(move |s, l, t| { - Box::pin(async move { - let this = registry.lookup::().unwrap(); - this.flush_record_stores_task_routine(s, Timestamp::new(l), Timestamp::new(t)) - .await - }) - }); - } + impl_setup_task!( + self, + Self, + flush_record_stores_task, + flush_record_stores_task_routine + ); + // Set offline subkey writes tick task log_stor!(debug "starting offline subkey writes task"); - { - let registry = self.registry(); - self.offline_subkey_writes_task.set_routine(move |s, l, t| { - Box::pin(async move { - let this = registry.lookup::().unwrap(); - this.offline_subkey_writes_task_routine(s, Timestamp::new(l), Timestamp::new(t)) - .await - }) - }); - } + impl_setup_task!( + self, + Self, + offline_subkey_writes_task, + offline_subkey_writes_task_routine + ); + // Set send value changes tick task log_stor!(debug "starting send value changes task"); - { - let registry = self.registry(); - self.send_value_changes_task.set_routine(move |s, l, t| { - Box::pin(async move { - let this = registry.lookup::().unwrap(); - this.send_value_changes_task_routine(s, Timestamp::new(l), Timestamp::new(t)) - .await - }) - }); - } + impl_setup_task!( + self, + Self, + send_value_changes_task, + send_value_changes_task_routine + ); + // Set check active watches tick task log_stor!(debug "starting check active watches task"); - { - let registry = self.registry(); - self.check_active_watches_task.set_routine(move |s, l, t| { - Box::pin(async move { - let this = registry.lookup::().unwrap(); - this.check_active_watches_task_routine(s, Timestamp::new(l), Timestamp::new(t)) - .await - }) - }); - } + impl_setup_task!( + self, + Self, + check_active_watches_task, + check_active_watches_task_routine + ); + // Set check watched records tick task log_stor!(debug "starting checked watched records task"); - { - let registry = self.registry(); - self.check_watched_records_task.set_routine(move |s, l, t| { - Box::pin(async move { - let this = registry.lookup::().unwrap(); - this.check_watched_records_task_routine(s, Timestamp::new(l), Timestamp::new(t)) - .await - }) - }); - } + impl_setup_task!( + self, + Self, + check_watched_records_task, + check_watched_records_task_routine + ); } #[instrument(parent = None, level = "trace", target = "stor", name = "StorageManager::tick", skip_all, err)] diff --git a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs index abb2722f..c6b91c11 100644 --- a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs +++ b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs @@ -186,7 +186,13 @@ impl StorageManager { // Process all results #[instrument(level = "trace", target = "stor", skip_all)] - fn process_single_result_inner(inner: &mut StorageManagerInner, result: WorkItemResult) { + async fn process_single_result(&self, result: WorkItemResult) { + let consensus_count = self + .config() + .with(|c| c.network.dht.set_value_count as usize); + + let mut inner = self.inner.lock().await; + // Debug print the result log_stor!(debug "Offline write result: {:?}", result); @@ -218,10 +224,11 @@ impl StorageManager { // Keep the list of nodes that returned a value for later reference Self::process_fanout_results_inner( - inner, + &mut *inner, result.key, result.fanout_results.iter().map(|x| (x.0, &x.1)), true, + consensus_count, ); } @@ -240,8 +247,7 @@ impl StorageManager { .process_work_item(stop_token.clone(), work_item) .await?; { - let mut inner = self.inner.lock().await; - Self::process_single_result_inner(&mut inner, result); + self.process_single_result(result).await; } } @@ -258,7 +264,7 @@ impl StorageManager { ) -> EyreResult<()> { // Operate on a copy of the offline subkey writes map let work_items = { - let mut inner = self.lock().await?; + let mut inner = self.inner.lock().await; // Move the current set of writes to 'in flight' for osw in &mut inner.offline_subkey_writes { osw.1.subkeys_in_flight = mem::take(&mut osw.1.subkeys); @@ -277,7 +283,7 @@ impl StorageManager { // Ensure nothing is left in-flight when returning even due to an error { - let mut inner = self.lock().await?; + let mut inner = self.inner.lock().await; for osw in &mut inner.offline_subkey_writes { osw.1.subkeys = osw .1 diff --git a/veilid-core/src/storage_manager/tasks/send_value_changes.rs b/veilid-core/src/storage_manager/tasks/send_value_changes.rs index 98a43c24..08e13486 100644 --- a/veilid-core/src/storage_manager/tasks/send_value_changes.rs +++ b/veilid-core/src/storage_manager/tasks/send_value_changes.rs @@ -6,7 +6,7 @@ impl StorageManager { // Send value change notifications across the network #[instrument(level = "trace", target = "stor", skip_all, err)] pub(super) async fn send_value_changes_task_routine( - self, + &self, stop_token: StopToken, _last_ts: Timestamp, _cur_ts: Timestamp, @@ -31,10 +31,9 @@ impl StorageManager { // Add a future for each value change for vc in value_changes { - let this = self.clone(); unord.push( async move { - if let Err(e) = this.send_value_change(vc).await { + if let Err(e) = self.send_value_change(vc).await { log_stor!(debug "Failed to send value change: {}", e); } } diff --git a/veilid-core/src/tests/common/test_dht.rs b/veilid-core/src/tests/common/test_dht.rs index f2aeb20b..4143dc82 100644 --- a/veilid-core/src/tests/common/test_dht.rs +++ b/veilid-core/src/tests/common/test_dht.rs @@ -104,7 +104,8 @@ pub async fn test_get_dht_record_key(api: VeilidAPI) { .with_safety(SafetySelection::Unsafe(Sequencing::EnsureOrdered)) .unwrap(); - let cs = api.crypto().unwrap().get(CRYPTO_KIND_VLD0).unwrap(); + let crypto = api.crypto(); + let cs = crypto.get(CRYPTO_KIND_VLD0).unwrap(); let owner_keypair = cs.generate_keypair(); let schema = DHTSchema::dflt(1).unwrap(); @@ -117,7 +118,6 @@ pub async fn test_get_dht_record_key(api: VeilidAPI) { // recreate the record key from the metadata alone let key = rc .get_dht_record_key(schema.clone(), &owner_keypair.key, Some(CRYPTO_KIND_VLD0)) - .await .unwrap(); // keys should be the same diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index c6ea7044..bc339578 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -616,7 +616,7 @@ impl VeilidAPI { 0, "debug_keypair", "kind", - get_crypto_system_version(crypto.clone()), + get_crypto_system_version(&crypto), ) .unwrap_or_else(|_| crypto.best()); diff --git a/veilid-core/src/veilid_api/routing_context.rs b/veilid-core/src/veilid_api/routing_context.rs index ec00593f..c13211a3 100644 --- a/veilid-core/src/veilid_api/routing_context.rs +++ b/veilid-core/src/veilid_api/routing_context.rs @@ -227,7 +227,7 @@ impl RoutingContext { /// Builds the record key for a given schema and owner public key #[instrument(target = "veilid_api", level = "debug", ret, err)] - pub async fn get_dht_record_key( + pub fn get_dht_record_key( &self, schema: DHTSchema, owner_key: &PublicKey, @@ -240,9 +240,7 @@ impl RoutingContext { let kind = kind.unwrap_or(best_crypto_kind()); Crypto::validate_crypto_kind(kind)?; let storage_manager = self.api.storage_manager()?; - storage_manager - .get_record_key(kind, schema, owner_key) - .await + storage_manager.get_record_key(kind, schema, owner_key) } /// Creates a new DHT record