more startup locks

This commit is contained in:
Christien Rioux 2024-07-19 14:17:45 -04:00
parent 1b34239eb8
commit c207e498b4
5 changed files with 110 additions and 30 deletions

View File

@ -53,6 +53,7 @@ struct ConnectionManagerArc {
connection_inactivity_timeout_ms: u32, connection_inactivity_timeout_ms: u32,
connection_table: ConnectionTable, connection_table: ConnectionTable,
address_lock_table: AsyncTagLockTable<SocketAddr>, address_lock_table: AsyncTagLockTable<SocketAddr>,
startup_lock: StartupLock,
inner: Mutex<Option<ConnectionManagerInner>>, inner: Mutex<Option<ConnectionManagerInner>>,
} }
impl core::fmt::Debug for ConnectionManagerArc { impl core::fmt::Debug for ConnectionManagerArc {
@ -98,6 +99,7 @@ impl ConnectionManager {
connection_inactivity_timeout_ms, connection_inactivity_timeout_ms,
connection_table: ConnectionTable::new(config, address_filter), connection_table: ConnectionTable::new(config, address_filter),
address_lock_table: AsyncTagLockTable::new(), address_lock_table: AsyncTagLockTable::new(),
startup_lock: StartupLock::new(),
inner: Mutex::new(None), inner: Mutex::new(None),
} }
} }
@ -115,8 +117,11 @@ impl ConnectionManager {
self.arc.connection_inactivity_timeout_ms self.arc.connection_inactivity_timeout_ms
} }
pub async fn startup(&self) { pub async fn startup(&self) -> EyreResult<()> {
let guard = self.arc.startup_lock.startup()?;
log_net!(debug "startup connection manager"); log_net!(debug "startup connection manager");
let mut inner = self.arc.inner.lock(); let mut inner = self.arc.inner.lock();
if inner.is_some() { if inner.is_some() {
panic!("shouldn't start connection manager twice without shutting it down first"); panic!("shouldn't start connection manager twice without shutting it down first");
@ -133,10 +138,19 @@ impl ConnectionManager {
// Store in the inner object // Store in the inner object
*inner = Some(Self::new_inner(stop_source, sender, async_processor)); *inner = Some(Self::new_inner(stop_source, sender, async_processor));
guard.success();
Ok(())
} }
pub async fn shutdown(&self) { pub async fn shutdown(&self) {
log_net!(debug "starting connection manager shutdown"); log_net!(debug "starting connection manager shutdown");
let Ok(guard) = self.arc.startup_lock.shutdown().await else {
log_net!(debug "connection manager is already shut down");
return;
};
// Remove the inner from the lock // Remove the inner from the lock
let mut inner = { let mut inner = {
let mut inner_lock = self.arc.inner.lock(); let mut inner_lock = self.arc.inner.lock();
@ -158,6 +172,8 @@ impl ConnectionManager {
// Wait for the connections to complete // Wait for the connections to complete
log_net!(debug "waiting for connection handlers to complete"); log_net!(debug "waiting for connection handlers to complete");
self.arc.connection_table.join().await; self.arc.connection_table.join().await;
guard.success();
log_net!(debug "finished connection manager shutdown"); log_net!(debug "finished connection manager shutdown");
} }
@ -263,6 +279,9 @@ impl ConnectionManager {
// Returns a network connection if one already is established // Returns a network connection if one already is established
pub fn get_connection(&self, flow: Flow) -> Option<ConnectionHandle> { pub fn get_connection(&self, flow: Flow) -> Option<ConnectionHandle> {
let Ok(_guard) = self.arc.startup_lock.enter() else {
return None;
};
self.arc.connection_table.peek_connection_by_flow(flow) self.arc.connection_table.peek_connection_by_flow(flow)
} }
@ -276,6 +295,9 @@ impl ConnectionManager {
self.arc.connection_table.ref_connection_by_id(id, kind) self.arc.connection_table.ref_connection_by_id(id, kind)
} }
pub fn try_connection_ref_scope(&self, id: NetworkConnectionId) -> Option<ConnectionRefScope> { pub fn try_connection_ref_scope(&self, id: NetworkConnectionId) -> Option<ConnectionRefScope> {
let Ok(_guard) = self.arc.startup_lock.enter() else {
return None;
};
ConnectionRefScope::try_new(self.clone(), id) ConnectionRefScope::try_new(self.clone(), id)
} }
@ -288,6 +310,11 @@ impl ConnectionManager {
&self, &self,
dial_info: DialInfo, dial_info: DialInfo,
) -> EyreResult<NetworkResult<ConnectionHandle>> { ) -> EyreResult<NetworkResult<ConnectionHandle>> {
let Ok(_guard) = self.arc.startup_lock.enter() else {
return Ok(NetworkResult::service_unavailable(
"connection manager is not started",
));
};
let peer_address = dial_info.peer_address(); let peer_address = dial_info.peer_address();
let remote_addr = peer_address.socket_addr(); let remote_addr = peer_address.socket_addr();
let mut preferred_local_address = self let mut preferred_local_address = self

View File

@ -428,7 +428,7 @@ impl NetworkManager {
}); });
// Start network components // Start network components
connection_manager.startup().await; connection_manager.startup().await?;
match net.startup().await? { match net.startup().await? {
StartupDisposition::Success => {} StartupDisposition::Success => {}
StartupDisposition::BindRetry => { StartupDisposition::BindRetry => {
@ -546,9 +546,7 @@ impl NetworkManager {
} }
pub fn network_is_started(&self) -> bool { pub fn network_is_started(&self) -> bool {
self.opt_net() self.opt_net().map(|net| net.is_started()).unwrap_or(false)
.and_then(|net| net.is_started())
.unwrap_or(false)
} }
pub fn generate_node_status(&self, _routing_domain: RoutingDomain) -> NodeStatus { pub fn generate_node_status(&self, _routing_domain: RoutingDomain) -> NodeStatus {

View File

@ -72,8 +72,6 @@ pub const MAX_CAPABILITIES: usize = 64;
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
struct NetworkInner { struct NetworkInner {
/// Some(true) if the low-level network is running, Some(false) if it is not, None if it is in transit
network_started: Option<bool>,
/// set if the network needs to be restarted due to a low level configuration change /// set if the network needs to be restarted due to a low level configuration change
/// such as dhcp release or change of address or interfaces being added or removed /// such as dhcp release or change of address or interfaces being added or removed
network_needs_restart: bool, network_needs_restart: bool,
@ -114,6 +112,9 @@ struct NetworkInner {
} }
struct NetworkUnlockedInner { struct NetworkUnlockedInner {
// Startup lock
startup_lock: StartupLock,
// Accessors // Accessors
routing_table: RoutingTable, routing_table: RoutingTable,
network_manager: NetworkManager, network_manager: NetworkManager,
@ -139,7 +140,6 @@ pub(in crate::network_manager) struct Network {
impl Network { impl Network {
fn new_inner() -> NetworkInner { fn new_inner() -> NetworkInner {
NetworkInner { NetworkInner {
network_started: Some(false),
network_needs_restart: false, network_needs_restart: false,
needs_public_dial_info_check: false, needs_public_dial_info_check: false,
network_already_cleared: false, network_already_cleared: false,
@ -168,6 +168,7 @@ impl Network {
) -> NetworkUnlockedInner { ) -> NetworkUnlockedInner {
let config = network_manager.config(); let config = network_manager.config();
NetworkUnlockedInner { NetworkUnlockedInner {
startup_lock: StartupLock::new(),
network_manager, network_manager,
routing_table, routing_table,
connection_manager, connection_manager,
@ -335,12 +336,12 @@ impl Network {
inner.preferred_local_addresses.get(&key).copied() inner.preferred_local_addresses.get(&key).copied()
} }
pub fn is_stable_interface_address(&self, addr: IpAddr) -> bool { pub(crate) fn is_stable_interface_address(&self, addr: IpAddr) -> bool {
let stable_addrs = self.get_stable_interface_addresses(); let stable_addrs = self.get_stable_interface_addresses();
stable_addrs.contains(&addr) stable_addrs.contains(&addr)
} }
pub fn get_stable_interface_addresses(&self) -> Vec<IpAddr> { pub(crate) fn get_stable_interface_addresses(&self) -> Vec<IpAddr> {
let addrs = self.unlocked_inner.interfaces.stable_addresses(); let addrs = self.unlocked_inner.interfaces.stable_addresses();
let mut addrs: Vec<IpAddr> = addrs let mut addrs: Vec<IpAddr> = addrs
.into_iter() .into_iter()
@ -377,7 +378,7 @@ impl Network {
//////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////
// Record DialInfo failures // Record DialInfo failures
pub async fn record_dial_info_failure<T, F: Future<Output = EyreResult<NetworkResult<T>>>>( async fn record_dial_info_failure<T, F: Future<Output = EyreResult<NetworkResult<T>>>>(
&self, &self,
dial_info: DialInfo, dial_info: DialInfo,
fut: F, fut: F,
@ -401,6 +402,8 @@ impl Network {
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<()>> { ) -> EyreResult<NetworkResult<()>> {
let _guard = self.unlocked_inner.startup_lock.enter()?;
self.record_dial_info_failure( self.record_dial_info_failure(
dial_info.clone(), dial_info.clone(),
async move { async move {
@ -476,6 +479,8 @@ impl Network {
data: Vec<u8>, data: Vec<u8>,
timeout_ms: u32, timeout_ms: u32,
) -> EyreResult<NetworkResult<Vec<u8>>> { ) -> EyreResult<NetworkResult<Vec<u8>>> {
let _guard = self.unlocked_inner.startup_lock.enter()?;
self.record_dial_info_failure( self.record_dial_info_failure(
dial_info.clone(), dial_info.clone(),
async move { async move {
@ -590,6 +595,8 @@ impl Network {
flow: Flow, flow: Flow,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<SendDataToExistingFlowResult> { ) -> EyreResult<SendDataToExistingFlowResult> {
let _guard = self.unlocked_inner.startup_lock.enter()?;
let data_len = data.len(); let data_len = data.len();
// Handle connectionless protocol // Handle connectionless protocol
@ -655,6 +662,8 @@ impl Network {
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<UniqueFlow>> { ) -> EyreResult<NetworkResult<UniqueFlow>> {
let _guard = self.unlocked_inner.startup_lock.enter()?;
self.record_dial_info_failure( self.record_dial_info_failure(
dial_info.clone(), dial_info.clone(),
async move { async move {
@ -922,22 +931,20 @@ impl Network {
#[instrument(level = "debug", err, skip_all)] #[instrument(level = "debug", err, skip_all)]
pub async fn startup(&self) -> EyreResult<StartupDisposition> { pub async fn startup(&self) -> EyreResult<StartupDisposition> {
self.inner.lock().network_started = None; let guard = self.unlocked_inner.startup_lock.startup()?;
match self.startup_internal().await { match self.startup_internal().await {
Ok(StartupDisposition::Success) => { Ok(StartupDisposition::Success) => {
info!("network started"); info!("network started");
self.inner.lock().network_started = Some(true); guard.success();
Ok(StartupDisposition::Success) Ok(StartupDisposition::Success)
} }
Ok(StartupDisposition::BindRetry) => { Ok(StartupDisposition::BindRetry) => {
debug!("network bind retry"); debug!("network bind retry");
self.inner.lock().network_started = Some(false);
Ok(StartupDisposition::BindRetry) Ok(StartupDisposition::BindRetry)
} }
Err(e) => { Err(e) => {
debug!("network failed to start"); debug!("network failed to start");
self.inner.lock().network_started = Some(false);
Err(e) Err(e)
} }
} }
@ -947,8 +954,8 @@ impl Network {
self.inner.lock().network_needs_restart self.inner.lock().network_needs_restart
} }
pub fn is_started(&self) -> Option<bool> { pub fn is_started(&self) -> bool {
self.inner.lock().network_started self.unlocked_inner.startup_lock.is_started()
} }
#[instrument(level = "debug", skip_all)] #[instrument(level = "debug", skip_all)]
@ -959,8 +966,10 @@ impl Network {
#[instrument(level = "debug", skip_all)] #[instrument(level = "debug", skip_all)]
pub async fn shutdown(&self) { pub async fn shutdown(&self) {
log_net!(debug "starting low level network shutdown"); log_net!(debug "starting low level network shutdown");
let Ok(guard) = self.unlocked_inner.startup_lock.shutdown().await else {
self.inner.lock().network_started = None; log_net!(debug "low level network is already shut down");
return;
};
let routing_table = self.routing_table(); let routing_table = self.routing_table();
@ -1006,6 +1015,7 @@ impl Network {
// Reset state including network class // Reset state including network class
*self.inner.lock() = Self::new_inner(); *self.inner.lock() = Self::new_inner();
guard.success();
log_net!(debug "finished low level network shutdown"); log_net!(debug "finished low level network shutdown");
} }
@ -1014,12 +1024,20 @@ impl Network {
&self, &self,
punishment: Option<Box<dyn FnOnce() + Send + 'static>>, punishment: Option<Box<dyn FnOnce() + Send + 'static>>,
) { ) {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return;
};
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
inner.needs_public_dial_info_check = true; inner.needs_public_dial_info_check = true;
inner.public_dial_info_check_punishment = punishment; inner.public_dial_info_check_punishment = punishment;
} }
pub fn needs_public_dial_info_check(&self) -> bool { pub fn needs_public_dial_info_check(&self) -> bool {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return false;
};
let inner = self.inner.lock(); let inner = self.inner.lock();
inner.needs_public_dial_info_check inner.needs_public_dial_info_check
} }
@ -1027,7 +1045,7 @@ impl Network {
////////////////////////////////////////// //////////////////////////////////////////
#[instrument(level = "trace", target = "net", skip_all, err)] #[instrument(level = "trace", target = "net", skip_all, err)]
pub async fn network_interfaces_task_routine( async fn network_interfaces_task_routine(
self, self,
_stop_token: StopToken, _stop_token: StopToken,
_l: u64, _l: u64,
@ -1039,12 +1057,7 @@ impl Network {
} }
#[instrument(level = "trace", target = "net", skip_all, err)] #[instrument(level = "trace", target = "net", skip_all, err)]
pub async fn upnp_task_routine( async fn upnp_task_routine(self, _stop_token: StopToken, _l: u64, _t: u64) -> EyreResult<()> {
self,
_stop_token: StopToken,
_l: u64,
_t: u64,
) -> EyreResult<()> {
if !self.unlocked_inner.igd_manager.tick().await? { if !self.unlocked_inner.igd_manager.tick().await? {
info!("upnp failed, restarting local network"); info!("upnp failed, restarting local network");
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
@ -1055,7 +1068,12 @@ impl Network {
} }
#[instrument(level = "trace", target = "net", skip_all, err)] #[instrument(level = "trace", target = "net", skip_all, err)]
pub async fn tick(&self) -> EyreResult<()> { pub(crate) async fn tick(&self) -> EyreResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return Ok(());
};
let (detect_address_changes, upnp) = { let (detect_address_changes, upnp) = {
let config = self.network_manager().config(); let config = self.network_manager().config();
let c = config.get(); let c = config.get();

View File

@ -158,9 +158,14 @@ struct ReceiptManagerInner {
timeout_task: MustJoinSingleFuture<()>, timeout_task: MustJoinSingleFuture<()>,
} }
struct ReceiptManagerUnlockedInner {
startup_lock: StartupLock,
}
#[derive(Clone)] #[derive(Clone)]
pub(super) struct ReceiptManager { pub(super) struct ReceiptManager {
inner: Arc<Mutex<ReceiptManagerInner>>, inner: Arc<Mutex<ReceiptManagerInner>>,
unlocked_inner: Arc<ReceiptManagerUnlockedInner>,
} }
impl ReceiptManager { impl ReceiptManager {
@ -177,6 +182,9 @@ impl ReceiptManager {
pub fn new(network_manager: NetworkManager) -> Self { pub fn new(network_manager: NetworkManager) -> Self {
Self { Self {
inner: Arc::new(Mutex::new(Self::new_inner(network_manager))), inner: Arc::new(Mutex::new(Self::new_inner(network_manager))),
unlocked_inner: Arc::new(ReceiptManagerUnlockedInner {
startup_lock: StartupLock::new(),
}),
} }
} }
@ -185,6 +193,7 @@ impl ReceiptManager {
} }
pub async fn startup(&self) -> EyreResult<()> { pub async fn startup(&self) -> EyreResult<()> {
let guard = self.unlocked_inner.startup_lock.startup()?;
log_net!(debug "startup receipt manager"); log_net!(debug "startup receipt manager");
// Retrieve config // Retrieve config
@ -195,6 +204,7 @@ impl ReceiptManager {
inner.stop_source = Some(StopSource::new()); inner.stop_source = Some(StopSource::new());
} }
guard.success();
Ok(()) Ok(())
} }
@ -223,7 +233,7 @@ impl ReceiptManager {
} }
#[instrument(level = "trace", target = "receipt", skip_all)] #[instrument(level = "trace", target = "receipt", skip_all)]
pub async fn timeout_task_routine(self, now: Timestamp, stop_token: StopToken) { async fn timeout_task_routine(self, now: Timestamp, stop_token: StopToken) {
// Go through all receipts and build a list of expired nonces // Go through all receipts and build a list of expired nonces
let mut new_next_oldest_ts: Option<Timestamp> = None; let mut new_next_oldest_ts: Option<Timestamp> = None;
let mut expired_records = Vec::new(); let mut expired_records = Vec::new();
@ -273,6 +283,10 @@ impl ReceiptManager {
#[instrument(level = "trace", target = "receipt", skip_all, err)] #[instrument(level = "trace", target = "receipt", skip_all, err)]
pub async fn tick(&self) -> EyreResult<()> { pub async fn tick(&self) -> EyreResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return Ok(());
};
let (next_oldest_ts, timeout_task, stop_token) = { let (next_oldest_ts, timeout_task, stop_token) = {
let inner = self.inner.lock(); let inner = self.inner.lock();
let stop_token = match inner.stop_source.as_ref() { let stop_token = match inner.stop_source.as_ref() {
@ -303,6 +317,11 @@ impl ReceiptManager {
pub async fn shutdown(&self) { pub async fn shutdown(&self) {
log_net!(debug "starting receipt manager shutdown"); log_net!(debug "starting receipt manager shutdown");
let Ok(guard) = self.unlocked_inner.startup_lock.shutdown().await else {
log_net!(debug "receipt manager is already shut down");
return;
};
let network_manager = self.network_manager(); let network_manager = self.network_manager();
// Stop all tasks // Stop all tasks
@ -320,6 +339,8 @@ impl ReceiptManager {
} }
*self.inner.lock() = Self::new_inner(network_manager); *self.inner.lock() = Self::new_inner(network_manager);
guard.success();
log_net!(debug "finished receipt manager shutdown"); log_net!(debug "finished receipt manager shutdown");
} }
@ -332,6 +353,10 @@ impl ReceiptManager {
expected_returns: u32, expected_returns: u32,
callback: impl ReceiptCallback, callback: impl ReceiptCallback,
) { ) {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return;
};
let receipt_nonce = receipt.get_nonce(); let receipt_nonce = receipt.get_nonce();
event!(target: "receipt", Level::DEBUG, "== New Multiple Receipt ({}) {} ", expected_returns, receipt_nonce.encode()); event!(target: "receipt", Level::DEBUG, "== New Multiple Receipt ({}) {} ", expected_returns, receipt_nonce.encode());
let record = Arc::new(Mutex::new(ReceiptRecord::new( let record = Arc::new(Mutex::new(ReceiptRecord::new(
@ -353,6 +378,10 @@ impl ReceiptManager {
expiration: Timestamp, expiration: Timestamp,
eventual: ReceiptSingleShotType, eventual: ReceiptSingleShotType,
) { ) {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return;
};
let receipt_nonce = receipt.get_nonce(); let receipt_nonce = receipt.get_nonce();
event!(target: "receipt", Level::DEBUG, "== New SingleShot Receipt {}", receipt_nonce.encode()); event!(target: "receipt", Level::DEBUG, "== New SingleShot Receipt {}", receipt_nonce.encode());
@ -385,6 +414,8 @@ impl ReceiptManager {
pub async fn cancel_receipt(&self, nonce: &Nonce) -> EyreResult<()> { pub async fn cancel_receipt(&self, nonce: &Nonce) -> EyreResult<()> {
event!(target: "receipt", Level::DEBUG, "== Cancel Receipt {}", nonce.encode()); event!(target: "receipt", Level::DEBUG, "== Cancel Receipt {}", nonce.encode());
let _guard = self.unlocked_inner.startup_lock.enter()?;
// Remove the record // Remove the record
let record = { let record = {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
@ -417,6 +448,10 @@ impl ReceiptManager {
receipt: Receipt, receipt: Receipt,
receipt_returned: ReceiptReturned, receipt_returned: ReceiptReturned,
) -> NetworkResult<()> { ) -> NetworkResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
return NetworkResult::service_unavailable("receipt manager not started");
};
let receipt_nonce = receipt.get_nonce(); let receipt_nonce = receipt.get_nonce();
let extra_data = receipt.get_extra_data(); let extra_data = receipt.get_extra_data();

View File

@ -867,9 +867,11 @@ impl VeilidAPI {
.purge_last_connections(); .purge_last_connections();
if let Some(connection_manager) = &opt_connection_manager { if let Some(connection_manager) = &opt_connection_manager {
connection_manager.startup().await; connection_manager
.startup()
.await
.map_err(VeilidAPIError::internal)?;
} }
Ok("Connections purged".to_owned()) Ok("Connections purged".to_owned())
} else if args[0] == "routes" { } else if args[0] == "routes" {
// Purge route spec store // Purge route spec store