routing table editing atomic

This commit is contained in:
John Smith 2022-09-22 20:25:39 -04:00
parent d160344a77
commit 51b509221c
7 changed files with 224 additions and 170 deletions

View File

@ -184,7 +184,7 @@ impl ConnectionTable {
Some(out.get_handle())
}
pub fn get_connection_ids_by_remote(&self, remote: PeerAddress) -> Vec<NetworkConnectionId> {
pub fn _get_connection_ids_by_remote(&self, remote: PeerAddress) -> Vec<NetworkConnectionId> {
let inner = self.inner.lock();
inner
.ids_by_remote

View File

@ -34,32 +34,56 @@ pub const PEEK_DETECT_LEN: usize = 64;
/////////////////////////////////////////////////////////////////
struct NetworkInner {
/// true if the low-level network is running
network_started: bool,
/// set if the network needs to be restarted due to a low level configuration change
/// such as dhcp release or change of address or interfaces being added or removed
network_needs_restart: bool,
/// the calculated protocol configuration for inbound/outbound protocols
protocol_config: Option<ProtocolConfig>,
/// set of statically configured protocols with public dialinfo
static_public_dialinfo: ProtocolTypeSet,
/// network class per routing domain
network_class: [Option<NetworkClass>; RoutingDomain::count()],
/// join handles for all the low level network background tasks
join_handles: Vec<MustJoinHandle<()>>,
/// stop source for shutting down the low level network background tasks
stop_source: Option<StopSource>,
/// port we are binding raw udp listen to
udp_port: u16,
/// port we are binding raw tcp listen to
tcp_port: u16,
/// port we are binding websocket listen to
ws_port: u16,
/// port we are binding secure websocket listen to
wss_port: u16,
/// does our network have ipv4 on any network?
enable_ipv4: bool,
/// does our network have ipv6 on the global internet?
enable_ipv6_global: bool,
/// does our network have ipv6 on the local network?
enable_ipv6_local: bool,
// public dial info check
/// set if we need to calculate our public dial info again
needs_public_dial_info_check: bool,
/// set during the actual execution of the public dial info check to ensure we don't do it more than once
doing_public_dial_info_check: bool,
/// the punishment closure to enax
public_dial_info_check_punishment: Option<Box<dyn FnOnce() + Send + 'static>>,
// udp
/// udp socket record for bound-first sockets, which are used to guarantee a port is available before
/// creating a 'reuseport' socket there. we don't want to pick ports that other programs are using
bound_first_udp: BTreeMap<u16, Option<(socket2::Socket, socket2::Socket)>>,
/// mapping of protocol handlers to accept messages from a set of bound socket addresses
inbound_udp_protocol_handlers: BTreeMap<SocketAddr, RawUdpProtocolHandler>,
/// outbound udp protocol handler for udpv4
outbound_udpv4_protocol_handler: Option<RawUdpProtocolHandler>,
/// outbound udp protocol handler for udpv6
outbound_udpv6_protocol_handler: Option<RawUdpProtocolHandler>,
//tcp
/// tcp socket record for bound-first sockets, which are used to guarantee a port is available before
/// creating a 'reuseport' socket there. we don't want to pick ports that other programs are using
bound_first_tcp: BTreeMap<u16, Option<(socket2::Socket, socket2::Socket)>>,
/// TLS handling socket controller
tls_acceptor: Option<TlsAcceptor>,
/// Multiplexer record for protocols on low level TCP sockets
listener_states: BTreeMap<SocketAddr, Arc<RwLock<ListenerState>>>,
}
@ -712,18 +736,32 @@ impl Network {
protocol_config
};
// Start editing routing table
let mut editor_public_internet = self
.unlocked_inner
.routing_table
.edit_routing_domain(RoutingDomain::PublicInternet);
let mut editor_local_network = self
.unlocked_inner
.routing_table
.edit_routing_domain(RoutingDomain::LocalNetwork);
// start listeners
if protocol_config.inbound.contains(ProtocolType::UDP) {
self.start_udp_listeners().await?;
self.start_udp_listeners(&mut editor_public_internet, &mut editor_local_network)
.await?;
}
if protocol_config.inbound.contains(ProtocolType::WS) {
self.start_ws_listeners().await?;
self.start_ws_listeners(&mut editor_public_internet, &mut editor_local_network)
.await?;
}
if protocol_config.inbound.contains(ProtocolType::WSS) {
self.start_wss_listeners().await?;
self.start_wss_listeners(&mut editor_public_internet, &mut editor_local_network)
.await?;
}
if protocol_config.inbound.contains(ProtocolType::TCP) {
self.start_tcp_listeners().await?;
self.start_tcp_listeners(&mut editor_public_internet, &mut editor_local_network)
.await?;
}
// release caches of available listener ports
@ -748,6 +786,10 @@ impl Network {
info!("network started");
self.inner.lock().network_started = true;
// commit routing table edits
editor_public_internet.commit().await;
editor_local_network.commit().await;
Ok(())
}
@ -792,9 +834,16 @@ impl Network {
while unord.next().await.is_some() {}
debug!("clearing dial info");
// Drop all dial info
routing_table.clear_dial_info_details(RoutingDomain::PublicInternet);
routing_table.clear_dial_info_details(RoutingDomain::LocalNetwork);
let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet);
editor.disable_node_info_updates();
editor.clear_dial_info_details();
editor.commit().await;
let mut editor = routing_table.edit_routing_domain(RoutingDomain::LocalNetwork);
editor.disable_node_info_updates();
editor.clear_dial_info_details();
editor.commit().await;
// Reset state including network class
*self.inner.lock() = Self::new_inner();

View File

@ -608,7 +608,6 @@ impl Network {
(protocol_config, existing_network_class, tcp_same_port)
};
let routing_table = self.routing_table();
let network_manager = self.network_manager();
// Process all protocol and address combinations
let mut futures = FuturesUnordered::new();
@ -771,6 +770,7 @@ impl Network {
// If a network class could be determined
// see about updating our public dial info
let mut changed = false;
let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet);
if new_network_class.is_some() {
// Get existing public dial info
let existing_public_dial_info: HashSet<DialInfoDetail> = routing_table
@ -814,13 +814,9 @@ impl Network {
// Is the public dial info different?
if existing_public_dial_info != new_public_dial_info {
// If so, clear existing public dial info and re-register the new public dial info
routing_table.clear_dial_info_details(RoutingDomain::PublicInternet);
editor.clear_dial_info_details();
for did in new_public_dial_info {
if let Err(e) = routing_table.register_dial_info(
RoutingDomain::PublicInternet,
did.dial_info,
did.class,
) {
if let Err(e) = editor.register_dial_info(did.dial_info, did.class) {
log_net!(error "Failed to register detected public dial info: {}", e);
}
}
@ -836,7 +832,7 @@ impl Network {
}
} else if existing_network_class.is_some() {
// Network class could not be determined
routing_table.clear_dial_info_details(RoutingDomain::PublicInternet);
editor.clear_dial_info_details();
self.inner.lock().network_class[RoutingDomain::PublicInternet as usize] = None;
changed = true;
log_net!(debug "network class cleared");
@ -849,9 +845,7 @@ impl Network {
}
} else {
// Send updates to everyone
network_manager
.send_node_info_updates(RoutingDomain::PublicInternet, true)
.await;
editor.commit().await;
}
Ok(())

View File

@ -250,7 +250,11 @@ impl Network {
/////////////////////////////////////////////////////
pub(super) async fn start_udp_listeners(&self) -> EyreResult<()> {
pub(super) async fn start_udp_listeners(
&self,
editor_public_internet: &mut RoutingDomainEditor,
editor_local_network: &mut RoutingDomainEditor,
) -> EyreResult<()> {
trace!("starting udp listeners");
let routing_table = self.routing_table();
let (listen_address, public_address, detect_address_changes) = {
@ -293,20 +297,12 @@ impl Network {
&& public_address.is_none()
&& routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &di)
{
routing_table.register_dial_info(
RoutingDomain::PublicInternet,
di.clone(),
DialInfoClass::Direct,
)?;
editor_public_internet.register_dial_info(di.clone(), DialInfoClass::Direct)?;
static_public = true;
}
// Register interface dial info as well since the address is on the local interface
routing_table.register_dial_info(
RoutingDomain::LocalNetwork,
di.clone(),
DialInfoClass::Direct,
)?;
editor_local_network.register_dial_info(di.clone(), DialInfoClass::Direct)?;
}
// Add static public dialinfo if it's configured
@ -322,11 +318,8 @@ impl Network {
// Register the public address
if !detect_address_changes {
routing_table.register_dial_info(
RoutingDomain::PublicInternet,
pdi.clone(),
DialInfoClass::Direct,
)?;
editor_public_internet
.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
static_public = true;
}
@ -341,8 +334,7 @@ impl Network {
})();
if !local_dial_info_list.contains(&pdi) && is_interface_address {
routing_table.register_dial_info(
RoutingDomain::LocalNetwork,
editor_local_network.register_dial_info(
DialInfo::udp_from_socketaddr(pdi_addr),
DialInfoClass::Direct,
)?;
@ -361,7 +353,11 @@ impl Network {
self.create_udp_listener_tasks().await
}
pub(super) async fn start_ws_listeners(&self) -> EyreResult<()> {
pub(super) async fn start_ws_listeners(
&self,
editor_public_internet: &mut RoutingDomainEditor,
editor_local_network: &mut RoutingDomainEditor,
) -> EyreResult<()> {
trace!("starting ws listeners");
let routing_table = self.routing_table();
let (listen_address, url, path, detect_address_changes) = {
@ -418,11 +414,8 @@ impl Network {
.wrap_err("try_ws failed")?;
if !detect_address_changes {
routing_table.register_dial_info(
RoutingDomain::PublicInternet,
pdi.clone(),
DialInfoClass::Direct,
)?;
editor_public_internet
.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
static_public = true;
}
@ -430,11 +423,7 @@ impl Network {
if !registered_addresses.contains(&gsa.ip())
&& self.is_usable_interface_address(gsa.ip())
{
routing_table.register_dial_info(
RoutingDomain::LocalNetwork,
pdi,
DialInfoClass::Direct,
)?;
editor_local_network.register_dial_info(pdi, DialInfoClass::Direct)?;
}
registered_addresses.insert(gsa.ip());
@ -455,20 +444,13 @@ impl Network {
&& routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &local_di)
{
// Register public dial info
routing_table.register_dial_info(
RoutingDomain::PublicInternet,
local_di.clone(),
DialInfoClass::Direct,
)?;
editor_public_internet
.register_dial_info(local_di.clone(), DialInfoClass::Direct)?;
static_public = true;
}
// Register local dial info
routing_table.register_dial_info(
RoutingDomain::LocalNetwork,
local_di,
DialInfoClass::Direct,
)?;
editor_local_network.register_dial_info(local_di, DialInfoClass::Direct)?;
}
if static_public {
@ -481,10 +463,13 @@ impl Network {
Ok(())
}
pub(super) async fn start_wss_listeners(&self) -> EyreResult<()> {
pub(super) async fn start_wss_listeners(
&self,
editor_public_internet: &mut RoutingDomainEditor,
editor_local_network: &mut RoutingDomainEditor,
) -> EyreResult<()> {
trace!("starting wss listeners");
let routing_table = self.routing_table();
let (listen_address, url, detect_address_changes) = {
let c = self.config.get();
(
@ -543,11 +528,8 @@ impl Network {
.wrap_err("try_wss failed")?;
if !detect_address_changes {
routing_table.register_dial_info(
RoutingDomain::PublicInternet,
pdi.clone(),
DialInfoClass::Direct,
)?;
editor_public_internet
.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
static_public = true;
}
@ -555,11 +537,7 @@ impl Network {
if !registered_addresses.contains(&gsa.ip())
&& self.is_usable_interface_address(gsa.ip())
{
routing_table.register_dial_info(
RoutingDomain::LocalNetwork,
pdi,
DialInfoClass::Direct,
)?;
editor_local_network.register_dial_info(pdi, DialInfoClass::Direct)?;
}
registered_addresses.insert(gsa.ip());
@ -578,7 +556,11 @@ impl Network {
Ok(())
}
pub(super) async fn start_tcp_listeners(&self) -> EyreResult<()> {
pub(super) async fn start_tcp_listeners(
&self,
editor_public_internet: &mut RoutingDomainEditor,
editor_local_network: &mut RoutingDomainEditor,
) -> EyreResult<()> {
trace!("starting tcp listeners");
let routing_table = self.routing_table();
@ -624,19 +606,11 @@ impl Network {
&& public_address.is_none()
&& routing_table.ensure_dial_info_is_valid(RoutingDomain::PublicInternet, &di)
{
routing_table.register_dial_info(
RoutingDomain::PublicInternet,
di.clone(),
DialInfoClass::Direct,
)?;
editor_public_internet.register_dial_info(di.clone(), DialInfoClass::Direct)?;
static_public = true;
}
// Register interface dial info
routing_table.register_dial_info(
RoutingDomain::LocalNetwork,
di.clone(),
DialInfoClass::Direct,
)?;
editor_local_network.register_dial_info(di.clone(), DialInfoClass::Direct)?;
registered_addresses.insert(socket_address.to_ip_addr());
}
@ -656,21 +630,14 @@ impl Network {
let pdi = DialInfo::tcp_from_socketaddr(pdi_addr);
if !detect_address_changes {
routing_table.register_dial_info(
RoutingDomain::PublicInternet,
pdi.clone(),
DialInfoClass::Direct,
)?;
editor_public_internet
.register_dial_info(pdi.clone(), DialInfoClass::Direct)?;
static_public = true;
}
// See if this public address is also a local interface address
if self.is_usable_interface_address(pdi_addr.ip()) {
routing_table.register_dial_info(
RoutingDomain::LocalNetwork,
pdi,
DialInfoClass::Direct,
)?;
editor_local_network.register_dial_info(pdi, DialInfoClass::Direct)?;
}
}
}

View File

@ -500,7 +500,9 @@ impl NetworkManager {
let routing_table = self.routing_table();
let node_info = routing_table.get_own_node_info(RoutingDomain::PublicInternet);
let network_class = self.get_network_class(RoutingDomain::PublicInternet);
let mut node_info_changed = false;
// Get routing domain editor
let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet);
// Do we know our network class yet?
if let Some(network_class) = network_class {
@ -511,16 +513,14 @@ impl NetworkManager {
// Relay node is dead or no longer needed
if matches!(state, BucketEntryState::Dead) {
info!("Relay node died, dropping relay {}", relay_node);
routing_table.set_relay_node(RoutingDomain::PublicInternet, None);
node_info_changed = true;
editor.clear_relay_node();
false
} else if !node_info.requires_relay() {
info!(
"Relay node no longer required, dropping relay {}",
relay_node
);
routing_table.set_relay_node(RoutingDomain::PublicInternet, None);
node_info_changed = true;
editor.clear_relay_node();
false
} else {
true
@ -544,8 +544,7 @@ impl NetworkManager {
false,
) {
info!("Outbound relay node selected: {}", nr);
routing_table.set_relay_node(RoutingDomain::PublicInternet, Some(nr));
node_info_changed = true;
editor.set_relay_node(nr);
}
}
// Otherwise we must need an inbound relay
@ -555,18 +554,14 @@ impl NetworkManager {
routing_table.find_inbound_relay(RoutingDomain::PublicInternet, cur_ts)
{
info!("Inbound relay node selected: {}", nr);
routing_table.set_relay_node(RoutingDomain::PublicInternet, Some(nr));
node_info_changed = true;
editor.set_relay_node(nr);
}
}
}
}
// Re-send our node info if we selected a relay
if node_info_changed {
self.send_node_info_updates(RoutingDomain::PublicInternet, true)
.await;
}
// Commit the changes
editor.commit().await;
Ok(())
}

View File

@ -192,11 +192,6 @@ impl RoutingTable {
Self::with_routing_domain(&*inner, domain, |rd| rd.relay_node())
}
pub fn set_relay_node(&self, domain: RoutingDomain, opt_relay_node: Option<NodeRef>) {
let mut inner = self.inner.write();
Self::with_routing_domain_mut(&mut *inner, domain, |rd| rd.set_relay_node(opt_relay_node));
}
pub fn has_dial_info(&self, domain: RoutingDomain) -> bool {
let inner = self.inner.read();
Self::with_routing_domain(&*inner, domain, |rd| !rd.dial_info_details().is_empty())
@ -299,46 +294,6 @@ impl RoutingTable {
RoutingDomainEditor::new(self.clone(), domain)
}
#[instrument(level = "debug", skip(self), err)]
pub fn register_dial_info(
&self,
domain: RoutingDomain,
dial_info: DialInfo,
class: DialInfoClass,
) -> EyreResult<()> {
if !self.ensure_dial_info_is_valid(domain, &dial_info) {
return Err(eyre!(
"dial info '{}' is not valid in routing domain '{:?}'",
dial_info,
domain
));
}
let mut inner = self.inner.write();
Self::with_routing_domain_mut(&mut *inner, domain, |rd| {
rd.add_dial_info_detail(DialInfoDetail {
dial_info: dial_info.clone(),
class,
});
});
info!(
"{:?} Dial Info: {}",
domain,
NodeDialInfo {
node_id: NodeId::new(inner.node_id),
dial_info
}
.to_string(),
);
debug!(" Class: {:?}", class);
Self::reset_all_seen_our_node_info(&mut *inner, domain);
Self::reset_all_updated_since_last_network_change(&mut *inner);
Ok(())
}
fn reset_all_seen_our_node_info(inner: &mut RoutingTableInner, routing_domain: RoutingDomain) {
let cur_ts = intf::get_timestamp();
Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_, v| {
@ -357,18 +312,6 @@ impl RoutingTable {
});
}
pub fn clear_dial_info_details(&self, routing_domain: RoutingDomain) {
trace!("clearing dial info domain: {:?}", routing_domain);
let mut inner = self.inner.write();
Self::with_routing_domain_mut(&mut *inner, routing_domain, |rd| {
rd.clear_dial_info_details();
});
// Public dial info changed, go through all nodes and reset their 'seen our node info' bit
Self::reset_all_seen_our_node_info(&mut *inner, routing_domain);
}
pub fn get_own_peer_info(&self, routing_domain: RoutingDomain) -> PeerInfo {
PeerInfo::new(
NodeId::new(self.node_id()),
@ -858,9 +801,6 @@ impl RoutingTable {
let mut dead = true;
if let Some(nr) = self.lookup_node_ref(*e) {
if let Some(last_connection) = nr.last_connection() {
out.push((*e, RecentPeersEntry { last_connection }));
dead = false;
}

View File

@ -1,11 +1,17 @@
use super::*;
enum RoutingDomainChange {}
enum RoutingDomainChange {
ClearDialInfoDetails,
ClearRelayNode,
SetRelayNode { relay_node: NodeRef },
AddDialInfoDetail { dial_info_detail: DialInfoDetail },
}
pub struct RoutingDomainEditor {
routing_table: RoutingTable,
routing_domain: RoutingDomain,
changes: Vec<RoutingDomainChange>,
send_node_info_updates: bool,
}
impl RoutingDomainEditor {
@ -14,8 +20,111 @@ impl RoutingDomainEditor {
routing_table,
routing_domain,
changes: Vec::new(),
send_node_info_updates: true,
}
}
#[instrument(level = "debug", skip(self))]
pub fn disable_node_info_updates(&mut self) {
self.send_node_info_updates = false;
}
pub fn commit(self) {}
#[instrument(level = "debug", skip(self))]
pub fn clear_dial_info_details(&mut self) {
self.changes.push(RoutingDomainChange::ClearDialInfoDetails);
}
#[instrument(level = "debug", skip(self))]
pub fn clear_relay_node(&mut self) {
self.changes.push(RoutingDomainChange::ClearRelayNode);
}
#[instrument(level = "debug", skip(self))]
pub fn set_relay_node(&mut self, relay_node: NodeRef) {
self.changes
.push(RoutingDomainChange::SetRelayNode { relay_node })
}
#[instrument(level = "debug", skip(self), err)]
pub fn register_dial_info(
&mut self,
dial_info: DialInfo,
class: DialInfoClass,
) -> EyreResult<()> {
if !self
.routing_table
.ensure_dial_info_is_valid(self.routing_domain, &dial_info)
{
return Err(eyre!(
"dial info '{}' is not valid in routing domain '{:?}'",
dial_info,
self.routing_domain
));
}
self.changes.push(RoutingDomainChange::AddDialInfoDetail {
dial_info_detail: DialInfoDetail {
dial_info: dial_info.clone(),
class,
},
});
Ok(())
}
#[instrument(level = "debug", skip(self))]
pub async fn commit(self) {
let mut changed = false;
{
let mut inner = self.routing_table.inner.write();
let inner = &mut *inner;
let node_id = inner.node_id;
RoutingTable::with_routing_domain_mut(inner, self.routing_domain, |detail| {
for change in self.changes {
match change {
RoutingDomainChange::ClearDialInfoDetails => {
debug!("[{:?}] cleared dial info details", self.routing_domain);
detail.clear_dial_info_details();
changed = true;
}
RoutingDomainChange::ClearRelayNode => {
debug!("[{:?}] cleared relay node", self.routing_domain);
detail.set_relay_node(None);
changed = true;
}
RoutingDomainChange::SetRelayNode { relay_node } => {
debug!("[{:?}] set relay node: {}", self.routing_domain, relay_node);
detail.set_relay_node(Some(relay_node));
changed = true;
}
RoutingDomainChange::AddDialInfoDetail { dial_info_detail } => {
debug!(
"[{:?}] add dial info detail: {:?}",
self.routing_domain, dial_info_detail
);
detail.add_dial_info_detail(dial_info_detail.clone());
info!(
"{:?} Dial Info: {}",
self.routing_domain,
NodeDialInfo {
node_id: NodeId::new(node_id),
dial_info: dial_info_detail.dial_info
}
.to_string(),
);
changed = true;
}
}
}
});
if changed {
RoutingTable::reset_all_seen_our_node_info(inner, self.routing_domain);
RoutingTable::reset_all_updated_since_last_network_change(inner);
}
}
if changed && self.send_node_info_updates {
let network_manager = self.routing_table.unlocked_inner.network_manager.clone();
network_manager
.send_node_info_updates(self.routing_domain, true)
.await;
}
}
}