From c207e498b4ece9b763ffe4b875f6a981983cb055 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Fri, 19 Jul 2024 14:17:45 -0400 Subject: [PATCH] more startup locks --- .../src/network_manager/connection_manager.rs | 29 ++++++++- veilid-core/src/network_manager/mod.rs | 6 +- veilid-core/src/network_manager/native/mod.rs | 62 ++++++++++++------- .../src/network_manager/receipt_manager.rs | 37 ++++++++++- veilid-core/src/veilid_api/debug.rs | 6 +- 5 files changed, 110 insertions(+), 30 deletions(-) diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index ba9e7a68..e87429ce 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -53,6 +53,7 @@ struct ConnectionManagerArc { connection_inactivity_timeout_ms: u32, connection_table: ConnectionTable, address_lock_table: AsyncTagLockTable, + startup_lock: StartupLock, inner: Mutex>, } impl core::fmt::Debug for ConnectionManagerArc { @@ -98,6 +99,7 @@ impl ConnectionManager { connection_inactivity_timeout_ms, connection_table: ConnectionTable::new(config, address_filter), address_lock_table: AsyncTagLockTable::new(), + startup_lock: StartupLock::new(), inner: Mutex::new(None), } } @@ -115,8 +117,11 @@ impl ConnectionManager { self.arc.connection_inactivity_timeout_ms } - pub async fn startup(&self) { + pub async fn startup(&self) -> EyreResult<()> { + let guard = self.arc.startup_lock.startup()?; + log_net!(debug "startup connection manager"); + let mut inner = self.arc.inner.lock(); if inner.is_some() { panic!("shouldn't start connection manager twice without shutting it down first"); @@ -133,10 +138,19 @@ impl ConnectionManager { // Store in the inner object *inner = Some(Self::new_inner(stop_source, sender, async_processor)); + + guard.success(); + + Ok(()) } pub async fn shutdown(&self) { log_net!(debug "starting connection manager shutdown"); + let Ok(guard) = self.arc.startup_lock.shutdown().await else { + log_net!(debug "connection manager is already shut down"); + return; + }; + // Remove the inner from the lock let mut inner = { let mut inner_lock = self.arc.inner.lock(); @@ -158,6 +172,8 @@ impl ConnectionManager { // Wait for the connections to complete log_net!(debug "waiting for connection handlers to complete"); self.arc.connection_table.join().await; + + guard.success(); log_net!(debug "finished connection manager shutdown"); } @@ -263,6 +279,9 @@ impl ConnectionManager { // Returns a network connection if one already is established pub fn get_connection(&self, flow: Flow) -> Option { + let Ok(_guard) = self.arc.startup_lock.enter() else { + return None; + }; self.arc.connection_table.peek_connection_by_flow(flow) } @@ -276,6 +295,9 @@ impl ConnectionManager { self.arc.connection_table.ref_connection_by_id(id, kind) } pub fn try_connection_ref_scope(&self, id: NetworkConnectionId) -> Option { + let Ok(_guard) = self.arc.startup_lock.enter() else { + return None; + }; ConnectionRefScope::try_new(self.clone(), id) } @@ -288,6 +310,11 @@ impl ConnectionManager { &self, dial_info: DialInfo, ) -> EyreResult> { + let Ok(_guard) = self.arc.startup_lock.enter() else { + return Ok(NetworkResult::service_unavailable( + "connection manager is not started", + )); + }; let peer_address = dial_info.peer_address(); let remote_addr = peer_address.socket_addr(); let mut preferred_local_address = self diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index e32dc018..b83b7c44 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -428,7 +428,7 @@ impl NetworkManager { }); // Start network components - connection_manager.startup().await; + connection_manager.startup().await?; match net.startup().await? { StartupDisposition::Success => {} StartupDisposition::BindRetry => { @@ -546,9 +546,7 @@ impl NetworkManager { } pub fn network_is_started(&self) -> bool { - self.opt_net() - .and_then(|net| net.is_started()) - .unwrap_or(false) + self.opt_net().map(|net| net.is_started()).unwrap_or(false) } pub fn generate_node_status(&self, _routing_domain: RoutingDomain) -> NodeStatus { diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 815ed96f..a0ba7162 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -72,8 +72,6 @@ pub const MAX_CAPABILITIES: usize = 64; ///////////////////////////////////////////////////////////////// struct NetworkInner { - /// Some(true) if the low-level network is running, Some(false) if it is not, None if it is in transit - network_started: Option, /// set if the network needs to be restarted due to a low level configuration change /// such as dhcp release or change of address or interfaces being added or removed network_needs_restart: bool, @@ -114,6 +112,9 @@ struct NetworkInner { } struct NetworkUnlockedInner { + // Startup lock + startup_lock: StartupLock, + // Accessors routing_table: RoutingTable, network_manager: NetworkManager, @@ -139,7 +140,6 @@ pub(in crate::network_manager) struct Network { impl Network { fn new_inner() -> NetworkInner { NetworkInner { - network_started: Some(false), network_needs_restart: false, needs_public_dial_info_check: false, network_already_cleared: false, @@ -168,6 +168,7 @@ impl Network { ) -> NetworkUnlockedInner { let config = network_manager.config(); NetworkUnlockedInner { + startup_lock: StartupLock::new(), network_manager, routing_table, connection_manager, @@ -335,12 +336,12 @@ impl Network { inner.preferred_local_addresses.get(&key).copied() } - pub fn is_stable_interface_address(&self, addr: IpAddr) -> bool { + pub(crate) fn is_stable_interface_address(&self, addr: IpAddr) -> bool { let stable_addrs = self.get_stable_interface_addresses(); stable_addrs.contains(&addr) } - pub fn get_stable_interface_addresses(&self) -> Vec { + pub(crate) fn get_stable_interface_addresses(&self) -> Vec { let addrs = self.unlocked_inner.interfaces.stable_addresses(); let mut addrs: Vec = addrs .into_iter() @@ -377,7 +378,7 @@ impl Network { //////////////////////////////////////////////////////////// // Record DialInfo failures - pub async fn record_dial_info_failure>>>( + async fn record_dial_info_failure>>>( &self, dial_info: DialInfo, fut: F, @@ -401,6 +402,8 @@ impl Network { dial_info: DialInfo, data: Vec, ) -> EyreResult> { + let _guard = self.unlocked_inner.startup_lock.enter()?; + self.record_dial_info_failure( dial_info.clone(), async move { @@ -476,6 +479,8 @@ impl Network { data: Vec, timeout_ms: u32, ) -> EyreResult>> { + let _guard = self.unlocked_inner.startup_lock.enter()?; + self.record_dial_info_failure( dial_info.clone(), async move { @@ -590,6 +595,8 @@ impl Network { flow: Flow, data: Vec, ) -> EyreResult { + let _guard = self.unlocked_inner.startup_lock.enter()?; + let data_len = data.len(); // Handle connectionless protocol @@ -655,6 +662,8 @@ impl Network { dial_info: DialInfo, data: Vec, ) -> EyreResult> { + let _guard = self.unlocked_inner.startup_lock.enter()?; + self.record_dial_info_failure( dial_info.clone(), async move { @@ -922,22 +931,20 @@ impl Network { #[instrument(level = "debug", err, skip_all)] pub async fn startup(&self) -> EyreResult { - self.inner.lock().network_started = None; + let guard = self.unlocked_inner.startup_lock.startup()?; match self.startup_internal().await { Ok(StartupDisposition::Success) => { info!("network started"); - self.inner.lock().network_started = Some(true); + guard.success(); Ok(StartupDisposition::Success) } Ok(StartupDisposition::BindRetry) => { debug!("network bind retry"); - self.inner.lock().network_started = Some(false); Ok(StartupDisposition::BindRetry) } Err(e) => { debug!("network failed to start"); - self.inner.lock().network_started = Some(false); Err(e) } } @@ -947,8 +954,8 @@ impl Network { self.inner.lock().network_needs_restart } - pub fn is_started(&self) -> Option { - self.inner.lock().network_started + pub fn is_started(&self) -> bool { + self.unlocked_inner.startup_lock.is_started() } #[instrument(level = "debug", skip_all)] @@ -959,8 +966,10 @@ impl Network { #[instrument(level = "debug", skip_all)] pub async fn shutdown(&self) { log_net!(debug "starting low level network shutdown"); - - self.inner.lock().network_started = None; + let Ok(guard) = self.unlocked_inner.startup_lock.shutdown().await else { + log_net!(debug "low level network is already shut down"); + return; + }; let routing_table = self.routing_table(); @@ -1006,6 +1015,7 @@ impl Network { // Reset state including network class *self.inner.lock() = Self::new_inner(); + guard.success(); log_net!(debug "finished low level network shutdown"); } @@ -1014,12 +1024,20 @@ impl Network { &self, punishment: Option>, ) { + let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { + log_net!(debug "ignoring due to not started up"); + return; + }; let mut inner = self.inner.lock(); inner.needs_public_dial_info_check = true; inner.public_dial_info_check_punishment = punishment; } pub fn needs_public_dial_info_check(&self) -> bool { + let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { + log_net!(debug "ignoring due to not started up"); + return false; + }; let inner = self.inner.lock(); inner.needs_public_dial_info_check } @@ -1027,7 +1045,7 @@ impl Network { ////////////////////////////////////////// #[instrument(level = "trace", target = "net", skip_all, err)] - pub async fn network_interfaces_task_routine( + async fn network_interfaces_task_routine( self, _stop_token: StopToken, _l: u64, @@ -1039,12 +1057,7 @@ impl Network { } #[instrument(level = "trace", target = "net", skip_all, err)] - pub async fn upnp_task_routine( - self, - _stop_token: StopToken, - _l: u64, - _t: u64, - ) -> EyreResult<()> { + async fn upnp_task_routine(self, _stop_token: StopToken, _l: u64, _t: u64) -> EyreResult<()> { if !self.unlocked_inner.igd_manager.tick().await? { info!("upnp failed, restarting local network"); let mut inner = self.inner.lock(); @@ -1055,7 +1068,12 @@ impl Network { } #[instrument(level = "trace", target = "net", skip_all, err)] - pub async fn tick(&self) -> EyreResult<()> { + pub(crate) async fn tick(&self) -> EyreResult<()> { + let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { + log_net!(debug "ignoring due to not started up"); + return Ok(()); + }; + let (detect_address_changes, upnp) = { let config = self.network_manager().config(); let c = config.get(); diff --git a/veilid-core/src/network_manager/receipt_manager.rs b/veilid-core/src/network_manager/receipt_manager.rs index 50109a6a..ae8bf083 100644 --- a/veilid-core/src/network_manager/receipt_manager.rs +++ b/veilid-core/src/network_manager/receipt_manager.rs @@ -158,9 +158,14 @@ struct ReceiptManagerInner { timeout_task: MustJoinSingleFuture<()>, } +struct ReceiptManagerUnlockedInner { + startup_lock: StartupLock, +} + #[derive(Clone)] pub(super) struct ReceiptManager { inner: Arc>, + unlocked_inner: Arc, } impl ReceiptManager { @@ -177,6 +182,9 @@ impl ReceiptManager { pub fn new(network_manager: NetworkManager) -> Self { Self { inner: Arc::new(Mutex::new(Self::new_inner(network_manager))), + unlocked_inner: Arc::new(ReceiptManagerUnlockedInner { + startup_lock: StartupLock::new(), + }), } } @@ -185,6 +193,7 @@ impl ReceiptManager { } pub async fn startup(&self) -> EyreResult<()> { + let guard = self.unlocked_inner.startup_lock.startup()?; log_net!(debug "startup receipt manager"); // Retrieve config @@ -195,6 +204,7 @@ impl ReceiptManager { inner.stop_source = Some(StopSource::new()); } + guard.success(); Ok(()) } @@ -223,7 +233,7 @@ impl ReceiptManager { } #[instrument(level = "trace", target = "receipt", skip_all)] - pub async fn timeout_task_routine(self, now: Timestamp, stop_token: StopToken) { + async fn timeout_task_routine(self, now: Timestamp, stop_token: StopToken) { // Go through all receipts and build a list of expired nonces let mut new_next_oldest_ts: Option = None; let mut expired_records = Vec::new(); @@ -273,6 +283,10 @@ impl ReceiptManager { #[instrument(level = "trace", target = "receipt", skip_all, err)] pub async fn tick(&self) -> EyreResult<()> { + let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { + return Ok(()); + }; + let (next_oldest_ts, timeout_task, stop_token) = { let inner = self.inner.lock(); let stop_token = match inner.stop_source.as_ref() { @@ -303,6 +317,11 @@ impl ReceiptManager { pub async fn shutdown(&self) { log_net!(debug "starting receipt manager shutdown"); + let Ok(guard) = self.unlocked_inner.startup_lock.shutdown().await else { + log_net!(debug "receipt manager is already shut down"); + return; + }; + let network_manager = self.network_manager(); // Stop all tasks @@ -320,6 +339,8 @@ impl ReceiptManager { } *self.inner.lock() = Self::new_inner(network_manager); + + guard.success(); log_net!(debug "finished receipt manager shutdown"); } @@ -332,6 +353,10 @@ impl ReceiptManager { expected_returns: u32, callback: impl ReceiptCallback, ) { + let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { + log_net!(debug "ignoring due to not started up"); + return; + }; let receipt_nonce = receipt.get_nonce(); event!(target: "receipt", Level::DEBUG, "== New Multiple Receipt ({}) {} ", expected_returns, receipt_nonce.encode()); let record = Arc::new(Mutex::new(ReceiptRecord::new( @@ -353,6 +378,10 @@ impl ReceiptManager { expiration: Timestamp, eventual: ReceiptSingleShotType, ) { + let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { + log_net!(debug "ignoring due to not started up"); + return; + }; let receipt_nonce = receipt.get_nonce(); event!(target: "receipt", Level::DEBUG, "== New SingleShot Receipt {}", receipt_nonce.encode()); @@ -385,6 +414,8 @@ impl ReceiptManager { pub async fn cancel_receipt(&self, nonce: &Nonce) -> EyreResult<()> { event!(target: "receipt", Level::DEBUG, "== Cancel Receipt {}", nonce.encode()); + let _guard = self.unlocked_inner.startup_lock.enter()?; + // Remove the record let record = { let mut inner = self.inner.lock(); @@ -417,6 +448,10 @@ impl ReceiptManager { receipt: Receipt, receipt_returned: ReceiptReturned, ) -> NetworkResult<()> { + let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { + return NetworkResult::service_unavailable("receipt manager not started"); + }; + let receipt_nonce = receipt.get_nonce(); let extra_data = receipt.get_extra_data(); diff --git a/veilid-core/src/veilid_api/debug.rs b/veilid-core/src/veilid_api/debug.rs index 05720f2b..d862a594 100644 --- a/veilid-core/src/veilid_api/debug.rs +++ b/veilid-core/src/veilid_api/debug.rs @@ -867,9 +867,11 @@ impl VeilidAPI { .purge_last_connections(); if let Some(connection_manager) = &opt_connection_manager { - connection_manager.startup().await; + connection_manager + .startup() + .await + .map_err(VeilidAPIError::internal)?; } - Ok("Connections purged".to_owned()) } else if args[0] == "routes" { // Purge route spec store