diff --git a/veilid-core/src/core_context.rs b/veilid-core/src/core_context.rs index 4824ce12..aaa09162 100644 --- a/veilid-core/src/core_context.rs +++ b/veilid-core/src/core_context.rs @@ -136,39 +136,39 @@ impl VeilidCoreContext { ///////////////////////////////////////////////////////////////////////////// pub trait RegisteredComponents { - fn protected_store(&self) -> VeilidComponentGuard<'_, ProtectedStore>; - fn crypto(&self) -> VeilidComponentGuard<'_, Crypto>; - fn table_store(&self) -> VeilidComponentGuard<'_, TableStore>; - fn storage_manager(&self) -> VeilidComponentGuard<'_, StorageManager>; - fn routing_table(&self) -> VeilidComponentGuard<'_, RoutingTable>; - fn network_manager(&self) -> VeilidComponentGuard<'_, NetworkManager>; - fn rpc_processor(&self) -> VeilidComponentGuard<'_, RPCProcessor>; - fn attachment_manager(&self) -> VeilidComponentGuard<'_, AttachmentManager>; + fn protected_store<'a>(&self) -> VeilidComponentGuard<'a, ProtectedStore>; + fn crypto<'a>(&self) -> VeilidComponentGuard<'a, Crypto>; + fn table_store<'a>(&self) -> VeilidComponentGuard<'a, TableStore>; + fn storage_manager<'a>(&self) -> VeilidComponentGuard<'a, StorageManager>; + fn routing_table<'a>(&self) -> VeilidComponentGuard<'a, RoutingTable>; + fn network_manager<'a>(&self) -> VeilidComponentGuard<'a, NetworkManager>; + fn rpc_processor<'a>(&self) -> VeilidComponentGuard<'a, RPCProcessor>; + fn attachment_manager<'a>(&self) -> VeilidComponentGuard<'a, AttachmentManager>; } impl RegisteredComponents for T { - fn protected_store(&self) -> VeilidComponentGuard<'_, ProtectedStore> { + fn protected_store<'a>(&self) -> VeilidComponentGuard<'a, ProtectedStore> { self.registry().lookup::().unwrap() } - fn crypto(&self) -> VeilidComponentGuard<'_, Crypto> { + fn crypto<'a>(&self) -> VeilidComponentGuard<'a, Crypto> { self.registry().lookup::().unwrap() } - fn table_store(&self) -> VeilidComponentGuard<'_, TableStore> { + fn table_store<'a>(&self) -> VeilidComponentGuard<'a, TableStore> { self.registry().lookup::().unwrap() } - fn storage_manager(&self) -> VeilidComponentGuard<'_, StorageManager> { + fn storage_manager<'a>(&self) -> VeilidComponentGuard<'a, StorageManager> { self.registry().lookup::().unwrap() } - fn routing_table(&self) -> VeilidComponentGuard<'_, RoutingTable> { + fn routing_table<'a>(&self) -> VeilidComponentGuard<'a, RoutingTable> { self.registry().lookup::().unwrap() } - fn network_manager(&self) -> VeilidComponentGuard<'_, NetworkManager> { + fn network_manager<'a>(&self) -> VeilidComponentGuard<'a, NetworkManager> { self.registry().lookup::().unwrap() } - fn rpc_processor(&self) -> VeilidComponentGuard<'_, RPCProcessor> { + fn rpc_processor<'a>(&self) -> VeilidComponentGuard<'a, RPCProcessor> { self.registry().lookup::().unwrap() } - fn attachment_manager(&self) -> VeilidComponentGuard<'_, AttachmentManager> { + fn attachment_manager<'a>(&self) -> VeilidComponentGuard<'a, AttachmentManager> { self.registry().lookup::().unwrap() } } diff --git a/veilid-core/src/network_manager/address_filter.rs b/veilid-core/src/network_manager/address_filter.rs index 2d3c0d7c..b3655adb 100644 --- a/veilid-core/src/network_manager/address_filter.rs +++ b/veilid-core/src/network_manager/address_filter.rs @@ -80,7 +80,7 @@ impl AddressFilter { inner.dial_info_failures.clear(); } - fn purge_old_timestamps(&self, inner: &mut AddressFilterInner, cur_ts: Timestamp) { + fn purge_old_timestamps_inner(&self, inner: &mut AddressFilterInner, cur_ts: Timestamp) { // v4 { let mut dead_keys = Vec::::new(); @@ -304,14 +304,14 @@ impl AddressFilter { #[instrument(parent = None, level = "trace", skip_all, err)] pub async fn address_filter_task_routine( - self, + &self, _stop_token: StopToken, _last_ts: Timestamp, cur_ts: Timestamp, ) -> EyreResult<()> { // let mut inner = self.inner.lock(); - self.purge_old_timestamps(&mut inner, cur_ts); + self.purge_old_timestamps_inner(&mut inner, cur_ts); self.purge_old_punishments(&mut inner, cur_ts); Ok(()) @@ -326,7 +326,7 @@ impl AddressFilter { } let ts = Timestamp::now(); - self.purge_old_timestamps(inner, ts); + self.purge_old_timestamps_inner(inner, ts); match ipblock { IpAddr::V4(v4) => { @@ -377,13 +377,13 @@ impl AddressFilter { Ok(()) } - pub fn remove_connection(&mut self, addr: IpAddr) -> Result<(), AddressNotInTableError> { + pub fn remove_connection(&self, addr: IpAddr) -> Result<(), AddressNotInTableError> { let mut inner = self.inner.lock(); let ipblock = ip_to_ipblock(self.max_connections_per_ip6_prefix_size, addr); let ts = Timestamp::now(); - self.purge_old_timestamps(&mut inner, ts); + self.purge_old_timestamps_inner(&mut inner, ts); match ipblock { IpAddr::V4(v4) => { diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index 0eb3bf88..1a899cc8 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -452,12 +452,11 @@ impl ConnectionManager { let network_manager = self.network_manager(); let prot_conn = network_result_try!(loop { - let address_filter = network_manager.address_filter(); let result_net_res = ProtocolNetworkConnection::connect( preferred_local_address, &dial_info, self.arc.connection_initial_timeout_ms, - &*address_filter, + network_manager.address_filter(), ) .await; match result_net_res { diff --git a/veilid-core/src/network_manager/connection_table.rs b/veilid-core/src/network_manager/connection_table.rs index 40c65894..a838bb64 100644 --- a/veilid-core/src/network_manager/connection_table.rs +++ b/veilid-core/src/network_manager/connection_table.rs @@ -241,7 +241,8 @@ impl ConnectionTable { let ip_addr = flow.remote_address().ip_addr(); if let Err(e) = self .network_manager() - .with_address_filter_mut(|af| af.add_connection(ip_addr)) + .address_filter() + .add_connection(ip_addr) { // Return the connection in the error to be disposed of return Err(ConnectionTableAddError::address_filter( @@ -468,7 +469,8 @@ impl ConnectionTable { // address_filter let ip_addr = remote.socket_addr().ip(); self.network_manager() - .with_address_filter_mut(|af| af.remove_connection(ip_addr)) + .address_filter() + .remove_connection(ip_addr) .expect("Inconsistency in connection table"); conn } diff --git a/veilid-core/src/network_manager/direct_boot.rs b/veilid-core/src/network_manager/direct_boot.rs index 608af287..a095970f 100644 --- a/veilid-core/src/network_manager/direct_boot.rs +++ b/veilid-core/src/network_manager/direct_boot.rs @@ -35,7 +35,7 @@ impl NetworkManager { // Direct bootstrap request #[instrument(level = "trace", target = "net", err, skip(self))] pub async fn boot_request(&self, dial_info: DialInfo) -> EyreResult>> { - let timeout_ms = self.with_config(|c| c.network.rpc.timeout_ms); + let timeout_ms = self.config().with(|c| c.network.rpc.timeout_ms); // Send boot magic to requested peer address let data = BOOT_MAGIC.to_vec(); diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index a26c3d14..3ad46a71 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -132,13 +132,12 @@ struct NetworkManagerInner { socket_address_change_subscription: Option, } -#[derive(Debug)] pub(crate) struct NetworkManager { registry: VeilidComponentRegistry, inner: Mutex, // Address filter - address_filter: RwLock, + address_filter: AddressFilter, // Accessors components: RwLock>, @@ -218,7 +217,7 @@ impl NetworkManager { let this = Self { registry, inner: Mutex::new(inner), - address_filter: RwLock::new(address_filter), + address_filter, components: RwLock::new(None), rolling_transfers_task: TickTask::new( "rolling_transfers_task", @@ -237,24 +236,8 @@ impl NetworkManager { this } - pub fn with_address_filter_mut(&self, callback: F) -> R - where - F: FnOnce(&mut AddressFilter) -> R, - { - let mut af = self.address_filter.write(); - callback(&mut *af) - } - - pub fn with_address_filter(&self, callback: F) -> R - where - F: FnOnce(&AddressFilter) -> R, - { - let af = self.address_filter.read(); - callback(&*af) - } - - pub fn address_filter<'a>(&self) -> RwLockReadGuard<'a, AddressFilter> { - self.address_filter.read() + pub fn address_filter(&self) -> &AddressFilter { + &self.address_filter } fn net(&self) -> Network { @@ -289,9 +272,6 @@ impl NetworkManager { #[instrument(level = "debug", skip_all, err)] async fn init_async(&self) -> EyreResult<()> { - let address_filter = AddressFilter::new(self.registry()); - *self.address_filter.write() = Some(address_filter); - Ok(()) } @@ -302,9 +282,7 @@ impl NetworkManager { async fn pre_terminate_async(&self) {} #[instrument(level = "debug", skip_all)] - async fn terminate_async(&self) { - *self.address_filter.write() = None; - } + async fn terminate_async(&self) {} #[instrument(level = "debug", skip_all, err)] pub async fn internal_startup(&self) -> EyreResult { @@ -314,11 +292,11 @@ impl NetworkManager { } // Clean address filter for things that should not be persistent - self.address_filter().restart(); + self.address_filter.restart(); // Create network components let connection_manager = ConnectionManager::new(self.registry()); - let net = Network::new(self.registry(), connection_manager.clone()); + let net = Network::new(self.registry()); let receipt_manager = ReceiptManager::new(); *self.components.write() = Some(NetworkComponents { net: net.clone(), @@ -527,7 +505,7 @@ impl NetworkManager { let crypto = self.crypto(); // Generate receipt and serialized form to return - let vcrypto = self.crypto().best(); + let vcrypto = crypto.best(); let nonce = vcrypto.random_nonce(); let node_id = routing_table.node_id(vcrypto.kind()); @@ -567,7 +545,7 @@ impl NetworkManager { let crypto = self.crypto(); // Generate receipt and serialized form to return - let vcrypto = self.crypto().best(); + let vcrypto = crypto.best(); let nonce = vcrypto.random_nonce(); let node_id = routing_table.node_id(vcrypto.kind()); diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index 82078818..9777d51e 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -307,8 +307,7 @@ impl NetworkConnection { flow ); - let network_manager = connection_manager.network_manager(); - let address_filter = network_manager.address_filter(); + let registry = connection_manager.registry(); let mut unord = FuturesUnordered::new(); let mut need_receiver = true; let mut need_sender = true; @@ -364,14 +363,17 @@ impl NetworkConnection { // Add another message receiver future if necessary if need_receiver { need_receiver = false; + let registry = registry.clone(); let receiver_fut = Self::recv_internal(&protocol_connection, stats.clone()) .then(|res| async { + let registry = registry; + let network_manager = registry.network_manager(); match res { Ok(v) => { let peer_address = protocol_connection.flow().remote(); // Check to see if it is punished - if address_filter.is_ip_addr_punished(peer_address.socket_addr().ip()) { + if network_manager.address_filter().is_ip_addr_punished(peer_address.socket_addr().ip()) { return RecvLoopAction::Finish; } @@ -383,7 +385,7 @@ impl NetworkConnection { // Punish invalid framing (tcp framing or websocket framing) if v.is_invalid_message() { - address_filter.punish_ip_addr(peer_address.socket_addr().ip(), PunishmentReason::InvalidFraming); + network_manager.address_filter().punish_ip_addr(peer_address.socket_addr().ip(), PunishmentReason::InvalidFraming); return RecvLoopAction::Finish; } diff --git a/veilid-core/src/network_manager/send_data.rs b/veilid-core/src/network_manager/send_data.rs index 7577ef4b..8979f057 100644 --- a/veilid-core/src/network_manager/send_data.rs +++ b/veilid-core/src/network_manager/send_data.rs @@ -40,9 +40,10 @@ impl NetworkManager { destination_node_ref: FilteredNodeRef, data: Vec, ) -> SendPinBoxFuture>> { - let this = self.clone(); + let registry = self.registry(); Box::pin( async move { + let this = registry.network_manager(); // If we need to relay, do it let (contact_method, target_node_ref, opt_relayed_contact_method) = match possibly_relayed_contact_method.clone() { @@ -652,17 +653,14 @@ impl NetworkManager { data: Vec, ) -> EyreResult> { // Detect if network is stopping so we can break out of this - let Some(stop_token) = self.unlocked_inner.startup_lock.stop_token() else { + let Some(stop_token) = self.startup_lock.stop_token() else { return Ok(NetworkResult::service_unavailable("network is stopping")); }; // Build a return receipt for the signal let receipt_timeout = TimestampDuration::new_ms( - self.unlocked_inner - .config - .get() - .network - .reverse_connection_receipt_time_ms as u64, + self.config() + .with(|c| c.network.reverse_connection_receipt_time_ms as u64), ); let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?; @@ -763,7 +761,7 @@ impl NetworkManager { data: Vec, ) -> EyreResult> { // Detect if network is stopping so we can break out of this - let Some(stop_token) = self.unlocked_inner.startup_lock.stop_token() else { + let Some(stop_token) = self.startup_lock.stop_token() else { return Ok(NetworkResult::service_unavailable("network is stopping")); }; @@ -776,11 +774,8 @@ impl NetworkManager { // Build a return receipt for the signal let receipt_timeout = TimestampDuration::new_ms( - self.unlocked_inner - .config - .get() - .network - .hole_punch_receipt_time_ms as u64, + self.config() + .with(|c| c.network.hole_punch_receipt_time_ms as u64), ); let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?; diff --git a/veilid-core/src/network_manager/stats.rs b/veilid-core/src/network_manager/stats.rs index 8694c4b3..dbb92d88 100644 --- a/veilid-core/src/network_manager/stats.rs +++ b/veilid-core/src/network_manager/stats.rs @@ -117,11 +117,9 @@ impl NetworkManager { } pub(super) fn send_network_update(&self) { - let update_cb = self.unlocked_inner.update_callback.read().clone(); - if update_cb.is_none() { - return; - } + let update_cb = self.update_callback(); + let state = self.get_veilid_state(); - (update_cb.unwrap())(VeilidUpdate::Network(state)); + update_cb(VeilidUpdate::Network(state)); } } diff --git a/veilid-core/src/network_manager/tasks/mod.rs b/veilid-core/src/network_manager/tasks/mod.rs index 03735377..e8167650 100644 --- a/veilid-core/src/network_manager/tasks/mod.rs +++ b/veilid-core/src/network_manager/tasks/mod.rs @@ -14,14 +14,16 @@ impl NetworkManager { // Set address filter task { - let this = self.clone(); + let registry = self.registry(); self.address_filter_task.set_routine(move |s, l, t| { - xxx continue here - Box::pin(this.address_filter().address_filter_task_routine( - s, - Timestamp::new(l), - Timestamp::new(t), - )) + let registry = registry.clone(); + Box::pin(async move { + registry + .network_manager() + .address_filter() + .address_filter_task_routine(s, Timestamp::new(l), Timestamp::new(t)) + .await + }) }); } } diff --git a/veilid-core/src/network_manager/tasks/rolling_transfers.rs b/veilid-core/src/network_manager/tasks/rolling_transfers.rs index d0d7d0b8..eba2def7 100644 --- a/veilid-core/src/network_manager/tasks/rolling_transfers.rs +++ b/veilid-core/src/network_manager/tasks/rolling_transfers.rs @@ -4,7 +4,7 @@ impl NetworkManager { // Compute transfer statistics for the low level network #[instrument(level = "trace", skip(self), err)] pub async fn rolling_transfers_task_routine( - self, + &self, _stop_token: StopToken, last_ts: Timestamp, cur_ts: Timestamp, diff --git a/veilid-core/src/network_manager/tests/test_connection_table.rs b/veilid-core/src/network_manager/tests/test_connection_table.rs index 1f20df65..c7044500 100644 --- a/veilid-core/src/network_manager/tests/test_connection_table.rs +++ b/veilid-core/src/network_manager/tests/test_connection_table.rs @@ -1,13 +1,11 @@ use super::*; use super::connection_table::*; -use crate::tests::common::test_veilid_config::*; use crate::tests::mock_routing_table; pub async fn test_add_get_remove() { - let config = get_config(); - let address_filter = AddressFilter::new(config.clone(), mock_routing_table()); - let table = ConnectionTable::new(config, address_filter); + let routing_table = mock_routing_table().await; + let table = ConnectionTable::new(routing_table.registry()); let a1 = Flow::new_no_local(PeerAddress::new( SocketAddress::new(Address::IPV4(Ipv4Addr::new(192, 168, 0, 1)), 8080), diff --git a/veilid-core/src/network_manager/tests/test_signed_node_info.rs b/veilid-core/src/network_manager/tests/test_signed_node_info.rs index 1bcc2bb0..b9f2d255 100644 --- a/veilid-core/src/network_manager/tests/test_signed_node_info.rs +++ b/veilid-core/src/network_manager/tests/test_signed_node_info.rs @@ -30,7 +30,7 @@ pub async fn test_signed_node_info() { // Test correct validation let keypair = vcrypto.generate_keypair(); let sni = SignedDirectNodeInfo::make_signatures( - crypto.clone(), + &crypto, vec![TypedKeyPair::new(ck, keypair)], node_info.clone(), ) @@ -42,7 +42,7 @@ pub async fn test_signed_node_info() { sni.timestamp(), sni.signatures().to_vec(), ); - let tks_validated = sdni.validate(&tks, crypto.clone()).unwrap(); + let tks_validated = sdni.validate(&tks, &crypto).unwrap(); assert_eq!(tks_validated.len(), oldtkslen); assert_eq!(tks_validated.len(), sni.signatures().len()); @@ -54,7 +54,7 @@ pub async fn test_signed_node_info() { sni.timestamp(), sni.signatures().to_vec(), ); - sdni.validate(&tks1, crypto.clone()).unwrap_err(); + sdni.validate(&tks1, &crypto).unwrap_err(); // Test unsupported cryptosystem validation let fake_crypto_kind: CryptoKind = FourCC::from([0, 1, 2, 3]); @@ -65,7 +65,7 @@ pub async fn test_signed_node_info() { tksfake.add(TypedKey::new(ck, keypair.key)); let sdnifake = SignedDirectNodeInfo::new(node_info.clone(), sni.timestamp(), sigsfake.clone()); - let tksfake_validated = sdnifake.validate(&tksfake, crypto.clone()).unwrap(); + let tksfake_validated = sdnifake.validate(&tksfake, &crypto).unwrap(); assert_eq!(tksfake_validated.len(), 1); assert_eq!(sdnifake.signatures().len(), sigsfake.len()); @@ -89,7 +89,7 @@ pub async fn test_signed_node_info() { let oldtks2len = tks2.len(); let sni2 = SignedRelayedNodeInfo::make_signatures( - crypto.clone(), + &crypto, vec![TypedKeyPair::new(ck, keypair2)], node_info2.clone(), tks.clone(), @@ -103,7 +103,7 @@ pub async fn test_signed_node_info() { sni2.timestamp(), sni2.signatures().to_vec(), ); - let tks2_validated = srni.validate(&tks2, crypto.clone()).unwrap(); + let tks2_validated = srni.validate(&tks2, &crypto).unwrap(); assert_eq!(tks2_validated.len(), oldtks2len); assert_eq!(tks2_validated.len(), sni2.signatures().len()); @@ -119,7 +119,7 @@ pub async fn test_signed_node_info() { sni2.timestamp(), sni2.signatures().to_vec(), ); - srni.validate(&tks3, crypto.clone()).unwrap_err(); + srni.validate(&tks3, &crypto).unwrap_err(); // Test unsupported cryptosystem validation let fake_crypto_kind: CryptoKind = FourCC::from([0, 1, 2, 3]); @@ -135,7 +135,7 @@ pub async fn test_signed_node_info() { sni2.timestamp(), sigsfake3.clone(), ); - let tksfake3_validated = srnifake.validate(&tksfake3, crypto.clone()).unwrap(); + let tksfake3_validated = srnifake.validate(&tksfake3, &crypto).unwrap(); assert_eq!(tksfake3_validated.len(), 1); assert_eq!(srnifake.signatures().len(), sigsfake3.len()); } diff --git a/veilid-core/src/network_manager/types/signal_info.rs b/veilid-core/src/network_manager/types/signal_info.rs index 2f88a0df..dc11d788 100644 --- a/veilid-core/src/network_manager/types/signal_info.rs +++ b/veilid-core/src/network_manager/types/signal_info.rs @@ -21,7 +21,7 @@ pub(crate) enum SignalInfo { } impl SignalInfo { - pub fn validate(&self, crypto: Crypto) -> Result<(), RPCError> { + pub fn validate(&self, crypto: &Crypto) -> Result<(), RPCError> { match self { SignalInfo::HolePunch { receipt, peer_info } => { if receipt.len() < MIN_RECEIPT_SIZE { diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 9df8cba2..b59aaacb 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -95,6 +95,8 @@ pub(crate) struct RoutingTable { registry: VeilidComponentRegistry, inner: RwLock, + /// Route spec store + route_spec_store: RouteSpecStore, /// The current node's public DHT keys node_id: TypedKeyGroup, /// The current node's public DHT secrets @@ -145,9 +147,11 @@ impl RoutingTable { let config = registry.config(); let c = config.get(); let inner = RwLock::new(RoutingTableInner::new(registry.clone())); + let route_spec_store = RouteSpecStore::new(registry.clone()); let this = Self { registry, inner, + route_spec_store, node_id: c.network.routing_table.node_id.clone(), node_id_secret: c.network.routing_table.node_id_secret.clone(), kick_queue: Mutex::new(BTreeSet::default()), @@ -221,20 +225,12 @@ impl RoutingTable { // Set up routespecstore log_rtab!(debug "starting route spec store init"); - let route_spec_store = match RouteSpecStore::load(self.registry()).await { - Ok(v) => v, - Err(e) => { - log_rtab!(debug "Error loading route spec store: {:#?}. Resetting.", e); - RouteSpecStore::new(self.registry()) - } + if let Err(e) = self.route_spec_store().load().await { + log_rtab!(debug "Error loading route spec store: {:#?}. Resetting.", e); + self.route_spec_store().reset(); }; log_rtab!(debug "finished route spec store init"); - { - let mut inner = self.inner.write(); - inner.route_spec_store = Some(route_spec_store); - } - log_rtab!(debug "finished routing table init"); Ok(()) } @@ -507,14 +503,8 @@ impl RoutingTable { self.inner.read().routing_domain_for_address(address) } - pub fn route_spec_store(&self) -> Option> { - let inner = self.inner.read(); - if inner.route_spec_store.is_none() { - return None; - } - Some(RwLockReadGuard::map(inner, |x| { - x.route_spec_store.as_ref().unwrap() - })) + pub fn route_spec_store(&self) -> &RouteSpecStore { + &self.route_spec_store } pub fn relay_node(&self, domain: RoutingDomain) -> Option { @@ -627,7 +617,7 @@ impl RoutingTable { ) -> Vec { self.inner .read() - .get_nodes_needing_ping(self.registry(), routing_domain, cur_ts) + .get_nodes_needing_ping(routing_domain, cur_ts) } fn queue_bucket_kicks(&self, node_ids: TypedKeyGroup) { diff --git a/veilid-core/src/routing_table/node_ref/filtered_node_ref.rs b/veilid-core/src/routing_table/node_ref/filtered_node_ref.rs index 4ee38d6c..a6c21e07 100644 --- a/veilid-core/src/routing_table/node_ref/filtered_node_ref.rs +++ b/veilid-core/src/routing_table/node_ref/filtered_node_ref.rs @@ -9,11 +9,7 @@ pub(crate) struct FilteredNodeRef { track_id: usize, } -impl VeilidComponentRegistryAccessor for FilteredNodeRef { - fn registry(&self) -> VeilidComponentRegistry { - self.registry.clone() - } -} +impl_veilid_component_registry_accessor!(FilteredNodeRef); impl FilteredNodeRef { pub fn new( diff --git a/veilid-core/src/routing_table/node_ref/mod.rs b/veilid-core/src/routing_table/node_ref/mod.rs index aa58666b..b168e2e5 100644 --- a/veilid-core/src/routing_table/node_ref/mod.rs +++ b/veilid-core/src/routing_table/node_ref/mod.rs @@ -22,11 +22,7 @@ pub(crate) struct NodeRef { track_id: usize, } -impl VeilidComponentRegistryAccessor for NodeRef { - fn registry(&self) -> VeilidComponentRegistry { - self.registry.clone() - } -} +impl_veilid_component_registry_accessor!(NodeRef); impl NodeRef { pub fn new(registry: VeilidComponentRegistry, entry: Arc) -> Self { diff --git a/veilid-core/src/routing_table/route_spec_store/mod.rs b/veilid-core/src/routing_table/route_spec_store/mod.rs index 3a37076e..568c1964 100644 --- a/veilid-core/src/routing_table/route_spec_store/mod.rs +++ b/veilid-core/src/routing_table/route_spec_store/mod.rs @@ -64,43 +64,41 @@ impl RouteSpecStore { } } - #[instrument(level = "trace", target = "route", skip_all, err)] - pub async fn load(registry: VeilidComponentRegistry) -> EyreResult { - let (max_route_hop_count, default_route_hop_count) = { - let config = registry.config(); - let c = config.get(); - ( - c.network.rpc.max_route_hop_count as usize, - c.network.rpc.default_route_hop_count as usize, - ) - }; - - // Get frozen blob from table store - let table_store = registry.lookup::().unwrap(); - let routing_table = registry.lookup::().unwrap(); - - let content = RouteSpecStoreContent::load(&table_store, &routing_table).await?; - - let mut inner = RouteSpecStoreInner { - content, + #[instrument(level = "trace", target = "route", skip_all)] + pub fn reset(&self) { + *self.inner.lock() = RouteSpecStoreInner { + content: RouteSpecStoreContent::new(), cache: Default::default(), }; + } - // Rebuild the routespecstore cache - let rti = &*routing_table.inner.read(); - for (_, rssd) in inner.content.iter_details() { - inner.cache.add_to_cache(rti, rssd); - } + #[instrument(level = "trace", target = "route", skip_all, err)] + pub async fn load(&self) -> EyreResult<()> { + let inner = { + let table_store = self.table_store(); + let routing_table = self.routing_table(); - // Return the loaded RouteSpecStore - let rss = RouteSpecStore { - registry, - inner: Arc::new(Mutex::new(inner)), - max_route_hop_count, - default_route_hop_count, + // Get frozen blob from table store + let content = RouteSpecStoreContent::load(&table_store, &routing_table).await?; + + let mut inner = RouteSpecStoreInner { + content, + cache: Default::default(), + }; + + // Rebuild the routespecstore cache + let rti = &*routing_table.inner.read(); + for (_, rssd) in inner.content.iter_details() { + inner.cache.add_to_cache(rti, rssd); + } + + inner }; - Ok(rss) + // Return the loaded RouteSpecStore + *self.inner.lock() = inner; + + Ok(()) } #[instrument(level = "trace", target = "route", skip(self), err)] @@ -111,7 +109,7 @@ impl RouteSpecStore { }; // Save our content - let table_store = self.registry.lookup::().unwrap(); + let table_store = self.table_store(); content.save(&table_store).await?; Ok(()) @@ -140,7 +138,7 @@ impl RouteSpecStore { /// Purge the route spec store pub async fn purge(&self) -> VeilidAPIResult<()> { // Briefly pause routing table ticker while changes are made - let routing_table = self.registry.lookup::().unwrap(); + let routing_table = self.routing_table(); let _tick_guard = routing_table.pause_tasks().await; routing_table.cancel_tasks().await; @@ -901,7 +899,8 @@ impl RouteSpecStore { }; // Remove from hop cache - let rti = &*self.routing_table().inner.read(); + let routing_table = self.routing_table(); + let rti = &*routing_table.inner.read(); if !inner.cache.remove_from_cache(rti, id, &rssd) { panic!("hop cache should have contained cache key"); } diff --git a/veilid-core/src/routing_table/routing_table_inner/mod.rs b/veilid-core/src/routing_table/routing_table_inner/mod.rs index e399d17e..508f467c 100644 --- a/veilid-core/src/routing_table/routing_table_inner/mod.rs +++ b/veilid-core/src/routing_table/routing_table_inner/mod.rs @@ -460,7 +460,6 @@ impl RoutingTableInner { // Collect all entries that are 'needs_ping' and have some node info making them reachable somehow pub(super) fn get_nodes_needing_ping( &self, - registry: VeilidComponentRegistry, routing_domain: RoutingDomain, cur_ts: Timestamp, ) -> Vec { @@ -603,6 +602,8 @@ impl RoutingTableInner { entry: Arc, node_ids: &[TypedKey], ) -> EyreResult<()> { + let routing_table = self.routing_table(); + entry.with_mut_inner(|e| { let mut existing_node_ids = e.node_ids(); @@ -633,21 +634,21 @@ impl RoutingTableInner { if let Some(old_node_id) = e.add_node_id(*node_id)? { // Remove any old node id for this crypto kind if VALID_CRYPTO_KINDS.contains(&ck) { - let bucket_index = self.unlocked_inner.calculate_bucket_index(&old_node_id); + let bucket_index = routing_table.calculate_bucket_index(&old_node_id); let bucket = self.get_bucket_mut(bucket_index); bucket.remove_entry(&old_node_id.value); - self.unlocked_inner.kick_queue.lock().insert(bucket_index); + routing_table.kick_queue.lock().insert(bucket_index); } } // Bucket the entry appropriately if VALID_CRYPTO_KINDS.contains(&ck) { - let bucket_index = self.unlocked_inner.calculate_bucket_index(node_id); + let bucket_index = routing_table.calculate_bucket_index(node_id); let bucket = self.get_bucket_mut(bucket_index); bucket.add_existing_entry(node_id.value, entry.clone()); // Kick bucket - self.unlocked_inner.kick_queue.lock().insert(bucket_index); + routing_table.kick_queue.lock().insert(bucket_index); } } @@ -655,7 +656,7 @@ impl RoutingTableInner { for node_id in existing_node_ids.iter() { let ck = node_id.kind; if VALID_CRYPTO_KINDS.contains(&ck) { - let bucket_index = self.unlocked_inner.calculate_bucket_index(node_id); + let bucket_index = routing_table.calculate_bucket_index(node_id); let bucket = self.get_bucket_mut(bucket_index); bucket.remove_entry(&node_id.value); entry.with_mut_inner(|e| e.remove_node_id(ck)); diff --git a/veilid-core/src/routing_table/routing_table_inner/routing_domains/local_network/mod.rs b/veilid-core/src/routing_table/routing_table_inner/routing_domains/local_network/mod.rs index 09d51b32..98305320 100644 --- a/veilid-core/src/routing_table/routing_table_inner/routing_domains/local_network/mod.rs +++ b/veilid-core/src/routing_table/routing_table_inner/routing_domains/local_network/mod.rs @@ -144,11 +144,7 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail { pi }; - if let Err(e) = rti - .unlocked_inner - .event_bus - .post(PeerInfoChangeEvent { peer_info }) - { + if let Err(e) = rti.event_bus().post(PeerInfoChangeEvent { peer_info }) { log_rtab!(debug "Failed to post event: {}", e); } diff --git a/veilid-core/src/routing_table/routing_table_inner/routing_domains/mod.rs b/veilid-core/src/routing_table/routing_table_inner/routing_domains/mod.rs index 51e281eb..f5050003 100644 --- a/veilid-core/src/routing_table/routing_table_inner/routing_domains/mod.rs +++ b/veilid-core/src/routing_table/routing_table_inner/routing_domains/mod.rs @@ -312,6 +312,9 @@ impl RoutingDomainDetailCommon { // Internal functions fn make_peer_info(&self, rti: &RoutingTableInner) -> PeerInfo { + let crypto = rti.crypto(); + let routing_table = rti.routing_table(); + let node_info = NodeInfo::new( self.network_class().unwrap_or(NetworkClass::Invalid), self.outbound_protocols, @@ -343,8 +346,8 @@ impl RoutingDomainDetailCommon { let signed_node_info = match relay_info { Some((relay_ids, relay_sdni)) => SignedNodeInfo::Relayed( SignedRelayedNodeInfo::make_signatures( - rti.unlocked_inner.crypto(), - rti.unlocked_inner.node_id_typed_key_pairs(), + &crypto, + routing_table.node_id_typed_key_pairs(), node_info, relay_ids, relay_sdni, @@ -353,8 +356,8 @@ impl RoutingDomainDetailCommon { ), None => SignedNodeInfo::Direct( SignedDirectNodeInfo::make_signatures( - rti.unlocked_inner.crypto(), - rti.unlocked_inner.node_id_typed_key_pairs(), + &crypto, + routing_table.node_id_typed_key_pairs(), node_info, ) .unwrap(), @@ -363,7 +366,7 @@ impl RoutingDomainDetailCommon { PeerInfo::new( self.routing_domain, - rti.unlocked_inner.node_ids(), + routing_table.node_ids(), signed_node_info, ) } diff --git a/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/editor.rs b/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/editor.rs index 3e9db0a2..b3c75323 100644 --- a/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/editor.rs +++ b/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/editor.rs @@ -263,9 +263,7 @@ impl<'a> RoutingDomainEditorCommonTrait for RoutingDomainEditorPublicInternet<'a if changed { // Clear the routespecstore cache if our PublicInternet dial info has changed - if let Some(rss) = self.routing_table.route_spec_store() { - rss.reset_cache(); - } + self.routing_table.route_spec_store().reset_cache(); } } diff --git a/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/mod.rs b/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/mod.rs index 44fa267c..261f845c 100644 --- a/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/mod.rs +++ b/veilid-core/src/routing_table/routing_table_inner/routing_domains/public_internet/mod.rs @@ -122,11 +122,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { pi }; - if let Err(e) = rti - .unlocked_inner - .event_bus - .post(PeerInfoChangeEvent { peer_info }) - { + if let Err(e) = rti.event_bus().post(PeerInfoChangeEvent { peer_info }) { log_rtab!(debug "Failed to post event: {}", e); } @@ -167,11 +163,8 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail { dif_sort: Option>, ) -> ContactMethod { let ip6_prefix_size = rti - .unlocked_inner - .config - .get() - .network - .max_connections_per_ip6_prefix_size as usize; + .config() + .with(|c| c.network.max_connections_per_ip6_prefix_size as usize); // Get the nodeinfos for convenience let node_a = peer_a.signed_node_info().node_info(); diff --git a/veilid-core/src/routing_table/tasks/bootstrap.rs b/veilid-core/src/routing_table/tasks/bootstrap.rs index d5cb6621..8ceea68a 100644 --- a/veilid-core/src/routing_table/tasks/bootstrap.rs +++ b/veilid-core/src/routing_table/tasks/bootstrap.rs @@ -280,19 +280,20 @@ impl RoutingTable { for crypto_kind in crypto_kinds { // Bootstrap this crypto kind let nr = nr.unfiltered(); - let routing_table = self.clone(); unord.push(Box::pin( async move { + let network_manager = nr.network_manager(); + let routing_table = nr.routing_table(); + // Get what contact method would be used for contacting the bootstrap - let bsdi = match routing_table - .network_manager() + let bsdi = match network_manager .get_node_contact_method(nr.default_filtered()) { Ok(NodeContactMethod::Direct(v)) => v, Ok(v) => { log_rtab!(debug "invalid contact method for bootstrap, ignoring peer: {:?}", v); - // let _ = routing_table - // .network_manager() + // let _ = + // network_manager // .get_node_contact_method(nr.clone()); return; } @@ -312,7 +313,7 @@ impl RoutingTable { log_rtab!(debug "bootstrap server is not responding for dialinfo: {}", bsdi); // Try a different dialinfo next time - routing_table.network_manager().address_filter().set_dial_info_failed(bsdi); + network_manager.address_filter().set_dial_info_failed(bsdi); } else { // otherwise this bootstrap is valid, lets ask it to find ourselves now routing_table.reverse_find_node(crypto_kind, nr, true, vec![]).await diff --git a/veilid-core/src/routing_table/tasks/ping_validator.rs b/veilid-core/src/routing_table/tasks/ping_validator.rs index d932a725..e44edc1f 100644 --- a/veilid-core/src/routing_table/tasks/ping_validator.rs +++ b/veilid-core/src/routing_table/tasks/ping_validator.rs @@ -289,11 +289,10 @@ impl RoutingTable { for nr in node_refs { let nr = nr.sequencing_clone(Sequencing::PreferOrdered); - let rpc = rpc.clone(); - // Just do a single ping with the best protocol for all the nodes futurequeue.push_back( async move { + let rpc = nr.rpc_processor(); #[cfg(feature = "verbose-tracing")] log_rtab!(debug "--> LocalNetwork Validator ping to {:?}", nr); let _ = rpc.rpc_call_status(Destination::direct(nr)).await?; diff --git a/veilid-core/src/routing_table/tasks/private_route_management.rs b/veilid-core/src/routing_table/tasks/private_route_management.rs index d485b081..942d3c39 100644 --- a/veilid-core/src/routing_table/tasks/private_route_management.rs +++ b/veilid-core/src/routing_table/tasks/private_route_management.rs @@ -49,13 +49,10 @@ impl RoutingTable { .config() .with(|c| c.network.rpc.default_route_hop_count as usize); - let Some(rss) = self.route_spec_store() else { - return vec![]; - }; let mut must_test_routes = Vec::::new(); let mut unpublished_routes = Vec::<(RouteId, u64)>::new(); let mut expired_routes = Vec::::new(); - rss.list_allocated_routes(|k, v| { + self.route_spec_store().list_allocated_routes(|k, v| { let stats = v.get_stats(); // Ignore nodes that don't need testing if !stats.needs_testing(cur_ts) { @@ -99,7 +96,7 @@ impl RoutingTable { // Process dead routes for r in expired_routes { log_rtab!(debug "Expired route: {}", r); - rss.release_route(r); + self.route_spec_store().release_route(r); } // return routes to test @@ -118,11 +115,6 @@ impl RoutingTable { } log_rtab!("Testing routes: {:?}", routes_needing_testing); - // Test all the routes that need testing at the same time - let Some(rss) = self.route_spec_store() else { - return Ok(()); - }; - #[derive(Default, Debug)] struct TestRouteContext { dead_routes: Vec, @@ -135,9 +127,7 @@ impl RoutingTable { let ctx = ctx.clone(); unord.push( async move { - let rss = self.route_spec_store().unwrap(); - - let success = match rss.test_route(r).await { + let success = match self.route_spec_store().test_route(r).await { // Test had result Ok(Some(v)) => v, // Test could not be performed at this time @@ -168,7 +158,7 @@ impl RoutingTable { let ctx = Arc::try_unwrap(ctx).unwrap().into_inner(); for r in ctx.dead_routes { log_rtab!(debug "Dead route failed to test: {}", r); - rss.release_route(r); + self.route_spec_store().release_route(r); } Ok(()) @@ -195,10 +185,8 @@ impl RoutingTable { .config() .with(|c| c.network.rpc.default_route_hop_count as usize); let mut local_unpublished_route_count = 0usize; - let Some(rss) = self.route_spec_store() else { - return Ok(()); - }; - rss.list_allocated_routes(|_k, v| { + + self.route_spec_store().list_allocated_routes(|_k, v| { if !v.is_published() && v.hop_count() == default_route_hop_count && v.get_route_set_keys().kinds() == VALID_CRYPTO_KINDS @@ -224,7 +212,7 @@ impl RoutingTable { stability: Stability::Reliable, sequencing: Sequencing::PreferOrdered, }; - match rss.allocate_route( + match self.route_spec_store().allocate_route( &VALID_CRYPTO_KINDS, &safety_spec, DirectionSet::all(), @@ -249,7 +237,7 @@ impl RoutingTable { } // Test remote routes next - let remote_routes_needing_testing = rss.list_remote_routes(|k, v| { + let remote_routes_needing_testing = self.route_spec_store().list_remote_routes(|k, v| { let stats = v.get_stats(); if stats.needs_testing(cur_ts) { Some(*k) @@ -263,7 +251,7 @@ impl RoutingTable { } // Send update (also may send updates for released routes done by other parts of the program) - rss.send_route_update(); + self.route_spec_store().send_route_update(); Ok(()) } diff --git a/veilid-core/src/routing_table/tasks/relay_management.rs b/veilid-core/src/routing_table/tasks/relay_management.rs index 6c283a60..9d35d911 100644 --- a/veilid-core/src/routing_table/tasks/relay_management.rs +++ b/veilid-core/src/routing_table/tasks/relay_management.rs @@ -162,11 +162,8 @@ impl RoutingTable { .node_info() .clone(); let ip6_prefix_size = self - .unlocked_inner - .config - .get() - .network - .max_connections_per_ip6_prefix_size as usize; + .config() + .with(|c| c.network.max_connections_per_ip6_prefix_size as usize); move |e: &BucketEntryInner| { // Ensure this node is not on the local network and is on the public internet @@ -285,6 +282,6 @@ impl RoutingTable { Option::<()>::None }); // Return the best inbound relay noderef - best_inbound_relay.map(|e| NodeRef::new(self.clone(), e)) + best_inbound_relay.map(|e| NodeRef::new(self.registry(), e)) } } diff --git a/veilid-core/src/routing_table/tasks/update_statistics.rs b/veilid-core/src/routing_table/tasks/update_statistics.rs index 92cf6681..8706ec88 100644 --- a/veilid-core/src/routing_table/tasks/update_statistics.rs +++ b/veilid-core/src/routing_table/tasks/update_statistics.rs @@ -27,9 +27,7 @@ impl RoutingTable { } // Roll all route transfers - if let Some(rss) = self.route_spec_store() { - rss.roll_transfers(last_ts, cur_ts); - } + self.route_spec_store().roll_transfers(last_ts, cur_ts); Ok(()) } diff --git a/veilid-core/src/routing_table/tests/mod.rs b/veilid-core/src/routing_table/tests/mod.rs index 467694ba..260e607f 100644 --- a/veilid-core/src/routing_table/tests/mod.rs +++ b/veilid-core/src/routing_table/tests/mod.rs @@ -13,6 +13,7 @@ pub(crate) async fn mock_routing_table<'a>() -> VeilidComponentGuard<'a, Routing registry.register(Crypto::new); registry.register(StorageManager::new); registry.register(RoutingTable::new); + registry.register(NetworkManager::new); registry.init().await.unwrap(); registry.lookup::().unwrap() } diff --git a/veilid-core/src/rpc_processor/coders/mod.rs b/veilid-core/src/rpc_processor/coders/mod.rs index c1fb6ff7..84542843 100644 --- a/veilid-core/src/rpc_processor/coders/mod.rs +++ b/veilid-core/src/rpc_processor/coders/mod.rs @@ -68,10 +68,10 @@ pub enum QuestionContext { #[derive(Clone)] pub struct RPCValidateContext { - pub crypto: Crypto, - // pub rpc_processor: RPCProcessor, + pub registry: VeilidComponentRegistry, pub question_context: Option, } +impl_veilid_component_registry_accessor!(RPCValidateContext); #[derive(Clone)] pub struct RPCDecodeContext { diff --git a/veilid-core/src/rpc_processor/coders/operations/operation.rs b/veilid-core/src/rpc_processor/coders/operations/operation.rs index 4a1c2947..4ce3916d 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation.rs @@ -100,8 +100,9 @@ impl RPCOperation { pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { // Validate sender peer info if let Some(sender_peer_info) = &self.sender_peer_info.opt_peer_info { + let crypto = validate_context.crypto(); sender_peer_info - .validate(validate_context.crypto.clone()) + .validate(&crypto) .map_err(RPCError::protocol)?; } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs b/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs index cba032ca..8e01280b 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_find_node.rs @@ -95,7 +95,8 @@ impl RPCOperationFindNodeA { } pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { - PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone()); + let crypto = validate_context.crypto(); + PeerInfo::validate_vec(&mut self.peers, &crypto); Ok(()) } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs index 35366884..3dfce124 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_get_value.rs @@ -106,6 +106,8 @@ impl RPCOperationGetValueA { } pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { + let crypto = validate_context.crypto(); + let question_context = validate_context .question_context .as_ref() @@ -157,7 +159,7 @@ impl RPCOperationGetValueA { } } - PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone()); + PeerInfo::validate_vec(&mut self.peers, &crypto); Ok(()) } diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs b/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs index 79fc9b63..b0534157 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation_signal.rs @@ -10,7 +10,8 @@ impl RPCOperationSignal { Self { signal_info } } pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { - self.signal_info.validate(validate_context.crypto.clone()) + let crypto = validate_context.crypto(); + self.signal_info.validate(&crypto) } // pub fn signal_info(&self) -> &SignalInfo { // &self.signal_info diff --git a/veilid-core/src/rpc_processor/coders/operations/question.rs b/veilid-core/src/rpc_processor/coders/operations/question.rs index da9bd51c..4434ba8d 100644 --- a/veilid-core/src/rpc_processor/coders/operations/question.rs +++ b/veilid-core/src/rpc_processor/coders/operations/question.rs @@ -11,7 +11,8 @@ impl RPCQuestion { Self { respond_to, detail } } pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> { - self.respond_to.validate(validate_context.crypto.clone())?; + let crypto = validate_context.crypto(); + self.respond_to.validate(&crypto)?; self.detail.validate(validate_context) } pub fn respond_to(&self) -> &RespondTo { diff --git a/veilid-core/src/rpc_processor/coders/operations/respond_to.rs b/veilid-core/src/rpc_processor/coders/operations/respond_to.rs index dbe13649..2cf3e053 100644 --- a/veilid-core/src/rpc_processor/coders/operations/respond_to.rs +++ b/veilid-core/src/rpc_processor/coders/operations/respond_to.rs @@ -7,7 +7,7 @@ pub(in crate::rpc_processor) enum RespondTo { } impl RespondTo { - pub fn validate(&mut self, crypto: Crypto) -> Result<(), RPCError> { + pub fn validate(&mut self, crypto: &Crypto) -> Result<(), RPCError> { match self { RespondTo::Sender => Ok(()), RespondTo::PrivateRoute(pr) => pr.validate(crypto).map_err(RPCError::protocol), diff --git a/veilid-core/src/rpc_processor/destination.rs b/veilid-core/src/rpc_processor/destination.rs index 77425ed7..b2a84b78 100644 --- a/veilid-core/src/rpc_processor/destination.rs +++ b/veilid-core/src/rpc_processor/destination.rs @@ -298,9 +298,11 @@ impl RPCProcessor { } Target::PrivateRoute(rsid) => { // Get remote private route - let rss = self.routing_table().route_spec_store(); - - let Some(private_route) = rss.best_remote_private_route(&rsid) else { + let Some(private_route) = self + .routing_table() + .route_spec_store() + .best_remote_private_route(&rsid) + else { return Err(RPCError::network("could not get remote private route")); }; diff --git a/veilid-core/src/rpc_processor/fanout/fanout_call.rs b/veilid-core/src/rpc_processor/fanout/fanout_call.rs index 86b3d8e4..0b793673 100644 --- a/veilid-core/src/rpc_processor/fanout/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout/fanout_call.rs @@ -241,7 +241,6 @@ where fn init_closest_nodes(&self) -> Result<(), RPCError> { // Get the 'node_count' closest nodes to the key out of our routing table let closest_nodes = { - let routing_table = self.routing_table; let node_info_filter = self.node_info_filter.clone(); let filter = Box::new( move |rti: &RoutingTableInner, opt_entry: Option>| { @@ -252,7 +251,7 @@ where let entry = opt_entry.unwrap(); // Filter entries - entry.with(routing_table, rti, |_rt, _rti, e| { + entry.with(rti, |_rti, e| { let Some(signed_node_info) = e.signed_node_info(RoutingDomain::PublicInternet) else { @@ -276,10 +275,10 @@ where let filters = VecDeque::from([filter]); let transform = |_rti: &RoutingTableInner, v: Option>| { - NodeRef::new(routing_table.registry(), v.unwrap().clone()) + NodeRef::new(self.routing_table.registry(), v.unwrap().clone()) }; - routing_table + self.routing_table .find_preferred_closest_nodes(self.node_count, self.node_id, filters, transform) .map_err(RPCError::invalid_format)? }; diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 851e0ae5..3cee25c4 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -674,7 +674,7 @@ impl StorageManager { data: Vec, writer: Option, ) -> VeilidAPIResult> { - let mut inner = self.lock().await?; + let mut inner = self.inner.lock().await; // Get cryptosystem let crypto = self.crypto(); @@ -701,7 +701,8 @@ impl StorageManager { }; // See if the subkey we are modifying has a last known local value - let last_get_result = inner.handle_get_local_value(key, subkey, true).await?; + let last_get_result = + Self::handle_get_local_value_inner(&mut *inner, key, subkey, true).await?; // Get the descriptor and schema for the key let Some(descriptor) = last_get_result.opt_descriptor else { @@ -741,20 +742,20 @@ impl StorageManager { // Write the value locally first log_stor!(debug "Writing subkey locally: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() ); - inner - .handle_set_local_value( - key, - subkey, - signed_value_data.clone(), - WatchUpdateMode::NoUpdate, - ) - .await?; + Self::handle_set_local_value_inner( + &mut *inner, + key, + subkey, + signed_value_data.clone(), + WatchUpdateMode::NoUpdate, + ) + .await?; // Get rpc processor and drop mutex so we don't block while getting the value from the network - let Some(rpc_processor) = Self::get_ready_rpc_processor(&inner) else { + let Some(rpc_processor) = self.get_ready_rpc_processor() else { log_stor!(debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() ); // Add to offline writes to flush - inner.add_offline_subkey_write(key, subkey, safety_selection); + Self::add_offline_subkey_write_inner(&mut *inner, key, subkey, safety_selection); return Ok(None); }; @@ -766,7 +767,7 @@ impl StorageManager { // Use the safety selection we opened the record with let res_rx = match self .outbound_set_value( - rpc_processor, + &rpc_processor, key, subkey, safety_selection, @@ -778,8 +779,8 @@ impl StorageManager { Ok(v) => v, Err(e) => { // Failed to write, try again later - let mut inner = self.lock().await?; - inner.add_offline_subkey_write(key, subkey, safety_selection); + let mut inner = self.inner.lock().await; + Self::add_offline_subkey_write_inner(&mut *inner, key, subkey, safety_selection); return Err(e); } }; diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index e45b8c80..e55b73ed 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -28,14 +28,15 @@ impl StorageManager { #[instrument(level = "trace", target = "dht", skip_all, err)] pub(super) async fn outbound_set_value( &self, - rpc_processor: &RPCProcessor, key: TypedKey, subkey: ValueSubkey, safety_selection: SafetySelection, value: Arc, descriptor: Arc, ) -> VeilidAPIResult>> { - let routing_table = rpc_processor.routing_table(); + + xxx switch this to registry mechanism + let routing_domain = RoutingDomain::PublicInternet; // Get the DHT parameters for 'SetValue' @@ -53,8 +54,7 @@ impl StorageManager { // Get the nodes we know are caching this value to seed the fanout let init_fanout_queue = { let inner = self.inner.lock().await; - inner - .get_value_nodes(key)? + Self::get_value_nodes_inner(&mut *inner, key)? .unwrap_or_default() .into_iter() .filter(|x| { @@ -301,15 +301,17 @@ impl StorageManager { last_value_data: ValueData, safety_selection: SafetySelection, ) { - let this = self.clone(); + let registry = self.registry(); let last_value_data = Arc::new(Mutex::new(last_value_data)); - inner.process_deferred_results( + Self::process_deferred_results_inner(inner, res_rx, Box::new( move |result: VeilidAPIResult| -> SendPinBoxFuture { - let this = this.clone(); + let registry = registry.clone(); let last_value_data = last_value_data.clone(); Box::pin(async move { + let this = registry.storage_manager(); + let result = match result { Ok(v) => v, Err(e) => {