wasm startup lock

This commit is contained in:
Christien Rioux 2024-07-19 14:33:31 -04:00
parent c207e498b4
commit fdead16fc8

View File

@ -58,6 +58,9 @@ struct NetworkInner {
}
struct NetworkUnlockedInner {
// Startup lock
startup_lock: StartupLock,
// Accessors
routing_table: RoutingTable,
network_manager: NetworkManager,
@ -86,6 +89,7 @@ impl Network {
connection_manager: ConnectionManager,
) -> NetworkUnlockedInner {
NetworkUnlockedInner {
startup_lock: StartupLock::new(),
network_manager,
routing_table,
connection_manager,
@ -121,7 +125,7 @@ impl Network {
/////////////////////////////////////////////////////////////////
// Record DialInfo failures
pub async fn record_dial_info_failure<T, F: Future<Output = EyreResult<NetworkResult<T>>>>(
async fn record_dial_info_failure<T, F: Future<Output = EyreResult<NetworkResult<T>>>>(
&self,
dial_info: DialInfo,
fut: F,
@ -135,12 +139,18 @@ impl Network {
Ok(network_result)
}
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
// Send data to a dial info, unbound, using a new connection from a random port
// This creates a short-lived connection in the case of connection-oriented protocols
// for the purpose of sending this one message.
// This bypasses the connection table as it is not a 'node to node' connection.
#[instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len()))]
pub async fn send_data_unbound_to_dial_info(
&self,
dial_info: DialInfo,
data: Vec<u8>,
) -> EyreResult<NetworkResult<()>> {
let _guard = self.unlocked_inner.startup_lock.enter()?;
self.record_dial_info_failure(dial_info.clone(), async move {
let data_len = data.len();
let timeout_ms = {
@ -187,13 +197,15 @@ impl Network {
// This creates a short-lived connection in the case of connection-oriented protocols
// for the purpose of sending this one message.
// This bypasses the connection table as it is not a 'node to node' connection.
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
#[instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len()))]
pub async fn send_recv_data_unbound_to_dial_info(
&self,
dial_info: DialInfo,
data: Vec<u8>,
timeout_ms: u32,
) -> EyreResult<NetworkResult<Vec<u8>>> {
let _guard = self.unlocked_inner.startup_lock.enter()?;
self.record_dial_info_failure(dial_info.clone(), async move {
let data_len = data.len();
let connect_timeout_ms = {
@ -247,12 +259,14 @@ impl Network {
.await
}
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
#[instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len()))]
pub async fn send_data_to_existing_flow(
&self,
flow: Flow,
data: Vec<u8>,
) -> EyreResult<SendDataToExistingFlowResult> {
let _guard = self.unlocked_inner.startup_lock.enter()?;
let data_len = data.len();
match flow.protocol_type() {
ProtocolType::UDP => {
@ -292,12 +306,16 @@ impl Network {
Ok(SendDataToExistingFlowResult::NotSent(data))
}
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
// Send data directly to a dial info, possibly without knowing which node it is going to
// Returns a flow for the connection used to send the data
#[instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len()))]
pub async fn send_data_to_dial_info(
&self,
dial_info: DialInfo,
data: Vec<u8>,
) -> EyreResult<NetworkResult<UniqueFlow>> {
let _guard = self.unlocked_inner.startup_lock.enter()?;
self.record_dial_info_failure(dial_info.clone(), async move {
let data_len = data.len();
if dial_info.protocol_type() == ProtocolType::UDP {
@ -399,23 +417,22 @@ impl Network {
Ok(StartupDisposition::Success)
}
#[instrument(level = "debug", err, skip_all)]
pub async fn startup(&self) -> EyreResult<StartupDisposition> {
self.inner.lock().network_started = None;
let guard = self.unlocked_inner.startup_lock.startup()?;
match self.startup_internal().await {
Ok(StartupDisposition::Success) => {
info!("network started");
self.inner.lock().network_started = Some(true);
guard.success();
Ok(StartupDisposition::Success)
}
Ok(StartupDisposition::BindRetry) => {
debug!("network bind retry");
self.inner.lock().network_started = Some(false);
Ok(StartupDisposition::BindRetry)
}
Err(e) => {
debug!("network failed to start");
self.inner.lock().network_started = Some(false);
Err(e)
}
}
@ -425,16 +442,22 @@ impl Network {
self.inner.lock().network_needs_restart
}
pub fn is_started(&self) -> Option<bool> {
self.inner.lock().network_started
pub fn is_started(&self) -> bool {
self.unlocked_inner.startup_lock.is_started()
}
#[instrument(level = "debug", skip_all)]
pub fn restart_network(&self) {
self.inner.lock().network_needs_restart = true;
}
#[instrument(level = "debug", skip_all)]
pub async fn shutdown(&self) {
log_net!(debug "stopping network");
log_net!(debug "starting low level network shutdown");
let Ok(guard) = self.unlocked_inner.startup_lock.shutdown().await else {
log_net!(debug "low level network is already shut down");
return;
};
// Reset state
let routing_table = self.routing_table();
@ -451,7 +474,8 @@ impl Network {
// Cancels all async background tasks by dropping join handles
*self.inner.lock() = Self::new_inner();
log_net!(debug "network stopped");
guard.success();
log_net!(debug "finished low level network shutdown");
}
pub fn get_preferred_local_address(&self, _dial_info: &DialInfo) -> Option<SocketAddr> {
@ -472,15 +496,28 @@ impl Network {
&self,
_punishment: Option<Box<dyn FnOnce() + Send + 'static>>,
) {
//
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return;
};
}
pub fn needs_public_dial_info_check(&self) -> bool {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return false;
};
false
}
//////////////////////////////////////////
pub async fn tick(&self) -> EyreResult<()> {
pub(crate) async fn tick(&self) -> EyreResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return Ok(());
};
Ok(())
}
}