Better relay detection and protections for flows

This commit is contained in:
Christien Rioux 2024-10-18 00:11:59 +00:00
parent 3625225014
commit 6ecf623d7e
25 changed files with 585 additions and 256 deletions

View File

@ -25,6 +25,15 @@
- If set_value is partial / in-flight, it should still be in offline_subkey_writes - If set_value is partial / in-flight, it should still be in offline_subkey_writes
- Make inflight_subkey_writes list and probably some bit for 'written_while_inflight' so we dont clear the offline_subkey_writes until they're really written - Make inflight_subkey_writes list and probably some bit for 'written_while_inflight' so we dont clear the offline_subkey_writes until they're really written
- Networking:
- Fix TIME_WAIT states on Windows
- Attempt to give priority to relaying flows
- UI:
- Make veilid-cli display the connection state when reconnecting, and exit more cleanly on ctrl-c
- Misc:
- Fixes for python DHT test
- API Additions: - API Additions:
- VeilidConfigInner::new parameteriztion for easier config from rust apps - VeilidConfigInner::new parameteriztion for easier config from rust apps

View File

@ -81,22 +81,65 @@ impl InteractiveUI {
CursiveUI::set_start_time(); CursiveUI::set_start_time();
// Wait for connection to be established // Wait for connection to be established
loop { let done2 = done.clone();
match connection_state_receiver.recv_async().await { let self2 = self.clone();
Ok(ConnectionState::ConnectedTCP(_, _)) let mut stdout2 = stdout.clone();
| Ok(ConnectionState::ConnectedIPC(_, _)) => { let connection_state_jh = spawn("connection state handler", async move {
break; loop {
} match connection_state_receiver
Ok(ConnectionState::RetryingTCP(_, _)) | Ok(ConnectionState::RetryingIPC(_, _)) => { .recv_async()
} .timeout_at(done2.clone())
Ok(ConnectionState::Disconnected) => {} .await
Err(e) => { {
eprintln!("Error: {:?}", e); Ok(Ok(ConnectionState::ConnectedTCP(sa, st))) => {
self.inner.lock().done.take(); let tstr = st
break; .duration_since(std::time::UNIX_EPOCH)
.map(|n| display_ts(n.as_micros() as u64))
.unwrap_or_else(|_| "???".to_string());
let _ = writeln!(stdout2, "Connected TCP: {} @ {}", sa, tstr);
}
Ok(Ok(ConnectionState::ConnectedIPC(pb, st))) => {
let tstr = st
.duration_since(std::time::UNIX_EPOCH)
.map(|n| display_ts(n.as_micros() as u64))
.unwrap_or_else(|_| "???".to_string());
let _ = writeln!(
stdout2,
"Connected IPC: {} @ {}",
pb.to_string_lossy(),
tstr
);
}
Ok(Ok(ConnectionState::RetryingTCP(sa, st))) => {
let tstr = st
.duration_since(std::time::UNIX_EPOCH)
.map(|n| display_ts(n.as_micros() as u64))
.unwrap_or_else(|_| "???".to_string());
let _ = writeln!(stdout2, "Retrying TCP: {} @ {}", sa, tstr);
}
Ok(Ok(ConnectionState::RetryingIPC(pb, st))) => {
let tstr = st
.duration_since(std::time::UNIX_EPOCH)
.map(|n| display_ts(n.as_micros() as u64))
.unwrap_or_else(|_| "???".to_string());
let _ =
writeln!(stdout2, "Retrying IPC: {} @ {}", pb.to_string_lossy(), tstr);
}
Ok(Ok(ConnectionState::Disconnected)) => {
let _ = writeln!(stdout2, "Disconnected");
}
Ok(Err(e)) => {
eprintln!("Error: {:?}", e);
self2.inner.lock().done.take();
break;
}
Err(_) => {
break;
}
} }
} }
} });
loop { loop {
if let Some(e) = self.inner.lock().error.clone() { if let Some(e) = self.inner.lock().error.clone() {
@ -227,6 +270,11 @@ impl InteractiveUI {
} }
} }
let _ = readline.flush(); let _ = readline.flush();
// Drop the stopper if we just broke out
let _ = self.inner.lock().done.take();
connection_state_jh.await;
} }
} }

View File

@ -65,7 +65,7 @@ impl Envelope {
} }
} }
#[instrument(level = "trace", target = "envelope", skip_all, err)] #[instrument(level = "trace", target = "envelope", skip_all)]
pub fn from_signed_data( pub fn from_signed_data(
crypto: Crypto, crypto: Crypto,
data: &[u8], data: &[u8],
@ -190,7 +190,7 @@ impl Envelope {
}) })
} }
#[instrument(level = "trace", target = "envelope", skip_all, err)] #[instrument(level = "trace", target = "envelope", skip_all)]
pub fn decrypt_body( pub fn decrypt_body(
&self, &self,
crypto: Crypto, crypto: Crypto,

View File

@ -81,10 +81,11 @@ where
} }
self.items.sort() self.items.sort()
} }
pub fn remove(&mut self, kind: CryptoKind) { pub fn remove(&mut self, kind: CryptoKind) -> Option<CryptoTyped<K>> {
if let Some(idx) = self.items.iter().position(|x| x.kind == kind) { if let Some(idx) = self.items.iter().position(|x| x.kind == kind) {
self.items.remove(idx); return Some(self.items.remove(idx));
} }
None
} }
pub fn remove_all(&mut self, kinds: &[CryptoKind]) { pub fn remove_all(&mut self, kinds: &[CryptoKind]) {
for k in kinds { for k in kinds {

View File

@ -6,6 +6,8 @@ use stop_token::future::FutureExt;
const PROTECTED_CONNECTION_DROP_SPAN: TimestampDuration = TimestampDuration::new_secs(10); const PROTECTED_CONNECTION_DROP_SPAN: TimestampDuration = TimestampDuration::new_secs(10);
const PROTECTED_CONNECTION_DROP_COUNT: usize = 3; const PROTECTED_CONNECTION_DROP_COUNT: usize = 3;
const NEW_CONNECTION_RETRY_COUNT: usize = 1;
const NEW_CONNECTION_RETRY_DELAY_MS: u32 = 500;
/////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////
// Connection manager // Connection manager
@ -381,10 +383,13 @@ impl ConnectionManager {
self.arc.connection_table.touch_connection_by_id(id) self.arc.connection_table.touch_connection_by_id(id)
} }
// Protects a network connection if one already is established /// Keep track of the number of things using a network connection if one already is established
/// to keep it from being removed from the table during use
fn connection_ref(&self, id: NetworkConnectionId, kind: ConnectionRefKind) -> bool { fn connection_ref(&self, id: NetworkConnectionId, kind: ConnectionRefKind) -> bool {
self.arc.connection_table.ref_connection_by_id(id, kind) self.arc.connection_table.ref_connection_by_id(id, kind)
} }
/// Scope guard for connection ref to keep connection alive when we're using it
pub fn try_connection_ref_scope(&self, id: NetworkConnectionId) -> Option<ConnectionRefScope> { pub fn try_connection_ref_scope(&self, id: NetworkConnectionId) -> Option<ConnectionRefScope> {
let Ok(_guard) = self.arc.startup_lock.enter() else { let Ok(_guard) = self.arc.startup_lock.enter() else {
return None; return None;
@ -446,7 +451,7 @@ impl ConnectionManager {
} }
// Attempt new connection // Attempt new connection
let mut retry_count = 1; let mut retry_count = NEW_CONNECTION_RETRY_COUNT;
let prot_conn = network_result_try!(loop { let prot_conn = network_result_try!(loop {
let result_net_res = ProtocolNetworkConnection::connect( let result_net_res = ProtocolNetworkConnection::connect(
@ -477,7 +482,7 @@ impl ConnectionManager {
// Release the preferred local address if things can't connect due to a low-level collision we dont have a record of // Release the preferred local address if things can't connect due to a low-level collision we dont have a record of
preferred_local_address = None; preferred_local_address = None;
sleep(500).await; sleep(NEW_CONNECTION_RETRY_DELAY_MS).await;
}); });
// Add to the connection table // Add to the connection table
@ -492,6 +497,15 @@ impl ConnectionManager {
self.on_new_protocol_network_connection(inner, prot_conn, Some(dial_info)) self.on_new_protocol_network_connection(inner, prot_conn, Some(dial_info))
} }
/// Register a flow as relaying through our node
pub fn add_relaying_flow(&self, flow: Flow) {
let Ok(_guard) = self.arc.startup_lock.enter() else {
return;
};
self.arc.connection_table.add_priority_flow(flow);
}
/////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////
/// Asynchronous Event Processor /// Asynchronous Event Processor

View File

@ -2,6 +2,10 @@ use super::*;
use futures_util::StreamExt; use futures_util::StreamExt;
use hashlink::LruCache; use hashlink::LruCache;
/// Allow 25% of the table size to be occupied by priority flows
/// that will not be subject to LRU termination.
const PRIORITY_FLOW_PERCENTAGE: usize = 25;
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
#[derive(ThisError, Debug)] #[derive(ThisError, Debug)]
pub(in crate::network_manager) enum ConnectionTableAddError { pub(in crate::network_manager) enum ConnectionTableAddError {
@ -41,6 +45,7 @@ struct ConnectionTableInner {
id_by_flow: BTreeMap<Flow, NetworkConnectionId>, id_by_flow: BTreeMap<Flow, NetworkConnectionId>,
ids_by_remote: BTreeMap<PeerAddress, Vec<NetworkConnectionId>>, ids_by_remote: BTreeMap<PeerAddress, Vec<NetworkConnectionId>>,
address_filter: AddressFilter, address_filter: AddressFilter,
priority_flows: Vec<LruCache<Flow, ()>>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -60,16 +65,19 @@ impl ConnectionTable {
}; };
Self { Self {
inner: Arc::new(Mutex::new(ConnectionTableInner { inner: Arc::new(Mutex::new(ConnectionTableInner {
max_connections, conn_by_id: max_connections
conn_by_id: vec![ .iter()
LruCache::new_unbounded(), .map(|_| LruCache::new_unbounded())
LruCache::new_unbounded(), .collect(),
LruCache::new_unbounded(),
],
protocol_index_by_id: BTreeMap::new(), protocol_index_by_id: BTreeMap::new(),
id_by_flow: BTreeMap::new(), id_by_flow: BTreeMap::new(),
ids_by_remote: BTreeMap::new(), ids_by_remote: BTreeMap::new(),
address_filter, address_filter,
priority_flows: max_connections
.iter()
.map(|x| LruCache::new(x * PRIORITY_FLOW_PERCENTAGE / 100))
.collect(),
max_connections,
})), })),
} }
} }
@ -144,6 +152,56 @@ impl ConnectionTable {
false false
} }
/// Add a priority flow, which is protected from eviction but without the
/// punishment expectations of a fully 'protected' connection.
/// This is an LRU set, so there is no removing the flows by hand, and
/// they are kept in a 'best effort' fashion.
/// If connections 'should' stay alive, use this mechanism.
/// If connections 'must' stay alive, use 'NetworkConnection::protect'.
pub fn add_priority_flow(&self, flow: Flow) {
let mut inner = self.inner.lock();
let protocol_index = Self::protocol_to_index(flow.protocol_type());
inner.priority_flows[protocol_index].insert(flow, ());
}
/// The mechanism for selecting which connections get evicted from the connection table
/// when it is getting full while adding a new connection.
/// Factored out into its own function for clarity.
fn lru_out_connection_inner(
inner: &mut ConnectionTableInner,
protocol_index: usize,
) -> Result<Option<NetworkConnection>, ()> {
// If nothing needs to be LRUd out right now, then just return
if inner.conn_by_id[protocol_index].len() < inner.max_connections[protocol_index] {
return Ok(None);
}
// Find a free connection to terminate to make room
let dead_k = {
let Some(lruk) = inner.conn_by_id[protocol_index].iter().find_map(|(k, v)| {
// Ensure anything being LRU evicted isn't protected somehow
// 1. connections that are 'in-use' are kept
// 2. connections with flows in the priority list are kept
// 3. connections that are protected are kept
if !v.is_in_use()
&& !inner.priority_flows[protocol_index].contains_key(&v.flow())
&& v.protected_node_ref().is_none()
{
Some(*k)
} else {
None
}
}) else {
// Can't make room, connection table is full
return Err(());
};
lruk
};
let dead_conn = Self::remove_connection_records(inner, dead_k);
Ok(Some(dead_conn))
}
#[instrument(level = "trace", skip(self), ret)] #[instrument(level = "trace", skip(self), ret)]
pub fn add_connection( pub fn add_connection(
&self, &self,
@ -190,26 +248,12 @@ impl ConnectionTable {
// if we have reached the maximum number of connections per protocol type // if we have reached the maximum number of connections per protocol type
// then drop the least recently used connection that is not protected or referenced // then drop the least recently used connection that is not protected or referenced
let mut out_conn = None; let out_conn = match Self::lru_out_connection_inner(&mut inner, protocol_index) {
if inner.conn_by_id[protocol_index].len() >= inner.max_connections[protocol_index] { Ok(v) => v,
// Find a free connection to terminate to make room Err(()) => {
let dead_k = { return Err(ConnectionTableAddError::table_full(network_connection));
let Some(lruk) = inner.conn_by_id[protocol_index].iter().find_map(|(k, v)| { }
if !v.is_in_use() && v.protected_node_ref().is_none() { };
Some(*k)
} else {
None
}
}) else {
// Can't make room, connection table is full
return Err(ConnectionTableAddError::table_full(network_connection));
};
lruk
};
let dead_conn = Self::remove_connection_records(&mut inner, dead_k);
out_conn = Some(dead_conn);
}
// Add the connection to the table // Add the connection to the table
let res = inner.conn_by_id[protocol_index].insert(id, network_connection); let res = inner.conn_by_id[protocol_index].insert(id, network_connection);
@ -450,7 +494,26 @@ impl ConnectionTable {
); );
for (_, conn) in &inner.conn_by_id[t] { for (_, conn) in &inner.conn_by_id[t] {
out += &format!(" {}\n", conn.debug_print(cur_ts)); let is_priority_flow = inner.priority_flows[t].contains_key(&conn.flow());
out += &format!(
" {}{}\n",
conn.debug_print(cur_ts),
if is_priority_flow { "PRIORITY" } else { "" }
);
}
}
for t in 0..inner.priority_flows.len() {
out += &format!(
" {} Priority Flows: ({}/{})\n",
Self::index_to_protocol(t),
inner.priority_flows[t].len(),
inner.priority_flows[t].capacity(),
);
for (flow, _) in &inner.priority_flows[t] {
out += &format!(" {}\n", flow);
} }
} }
out out

View File

@ -840,7 +840,7 @@ impl NetworkManager {
sleep(HOLE_PUNCH_DELAY_MS).await; sleep(HOLE_PUNCH_DELAY_MS).await;
// Set the hole punch as our 'last connection' to ensure we return the receipt over the direct hole punch // Set the hole punch as our 'last connection' to ensure we return the receipt over the direct hole punch
peer_nr.set_last_flow(unique_flow.flow, Timestamp::now()); self.set_last_flow(peer_nr.unfiltered(), unique_flow.flow, Timestamp::now());
// Return the receipt using the same dial info send the receipt to it // Return the receipt using the same dial info send the receipt to it
rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt) rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt)
@ -1185,11 +1185,10 @@ impl NetworkManager {
} }
}; };
// Cache the envelope information in the routing table // Add the node without its peer info
let source_noderef = match routing_table.register_node_with_existing_connection( let source_noderef = match routing_table.register_node_with_id(
routing_domain, routing_domain,
envelope.get_sender_typed_id(), envelope.get_sender_typed_id(),
flow,
ts, ts,
) { ) {
Ok(v) => v, Ok(v) => v,
@ -1199,8 +1198,13 @@ impl NetworkManager {
return Ok(false); return Ok(false);
} }
}; };
// Set the envelope version for the peer
source_noderef.add_envelope_version(envelope.get_version()); source_noderef.add_envelope_version(envelope.get_version());
// Set the last flow for the peer
self.set_last_flow(source_noderef.unfiltered(), flow, ts);
// Pass message to RPC system // Pass message to RPC system
if let Err(e) = if let Err(e) =
rpc.enqueue_direct_message(envelope, source_noderef, flow, routing_domain, body) rpc.enqueue_direct_message(envelope, source_noderef, flow, routing_domain, body)
@ -1214,12 +1218,36 @@ impl NetworkManager {
Ok(true) Ok(true)
} }
/// Record the last flow for a peer in the routing table and the connection table appropriately
pub(super) fn set_last_flow(&self, node_ref: NodeRef, flow: Flow, timestamp: Timestamp) {
// Get the routing domain for the flow
let Some(routing_domain) = self
.routing_table()
.routing_domain_for_address(flow.remote_address().address())
else {
error!(
"flow found with no routing domain: {} for {}",
flow, node_ref
);
return;
};
// Set the last flow on the routing table entry
node_ref.set_last_flow(flow, timestamp);
// Inform the connection table about the flow's priority
let is_relaying_flow = node_ref.is_relaying(routing_domain);
if is_relaying_flow && flow.protocol_type().is_ordered() {
self.connection_manager().add_relaying_flow(flow);
}
}
pub fn restart_network(&self) { pub fn restart_network(&self) {
self.net().restart_network(); self.net().restart_network();
} }
// If some other subsystem believes our dial info is no longer valid, this will trigger /// If some other subsystem believes our dial info is no longer valid, this will trigger
// a re-check of the dial info and network class /// a re-check of the dial info and network class
pub fn set_needs_dial_info_check(&self, routing_domain: RoutingDomain) { pub fn set_needs_dial_info_check(&self, routing_domain: RoutingDomain) {
match routing_domain { match routing_domain {
RoutingDomain::LocalNetwork => { RoutingDomain::LocalNetwork => {

View File

@ -281,10 +281,10 @@ impl DiscoveryContext {
{ {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
inner.external_info = external_address_infos; inner.external_info = external_address_infos;
log_net!(debug "External Addresses: ({:?}:{:?})[{}]", log_net!(debug "External Addresses ({:?}:{:?}):\n{}",
protocol_type, protocol_type,
address_type, address_type,
inner.external_info.iter().map(|x| format!("{} <- {}",x.address, x.node)).collect::<Vec<_>>().join(", ")); inner.external_info.iter().map(|x| format!(" {} <- {}",x.address, x.node)).collect::<Vec<_>>().join("\n"));
} }
true true

View File

@ -79,6 +79,9 @@ pub fn new_bound_default_udp_socket(local_address: SocketAddr) -> io::Result<Opt
#[instrument(level = "trace", ret)] #[instrument(level = "trace", ret)]
pub fn new_default_tcp_socket(domain: Domain) -> io::Result<Socket> { pub fn new_default_tcp_socket(domain: Domain) -> io::Result<Socket> {
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?; let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) {
log_net!(error "Couldn't set TCP linger: {}", e);
}
if let Err(e) = socket.set_nodelay(true) { if let Err(e) = socket.set_nodelay(true) {
log_net!(error "Couldn't set TCP nodelay: {}", e); log_net!(error "Couldn't set TCP nodelay: {}", e);
} }

View File

@ -4,7 +4,7 @@ use sockets::*;
pub struct RawTcpNetworkConnection { pub struct RawTcpNetworkConnection {
flow: Flow, flow: Flow,
stream: AsyncPeekStream, stream: Mutex<Option<AsyncPeekStream>>,
} }
impl fmt::Debug for RawTcpNetworkConnection { impl fmt::Debug for RawTcpNetworkConnection {
@ -15,7 +15,10 @@ impl fmt::Debug for RawTcpNetworkConnection {
impl RawTcpNetworkConnection { impl RawTcpNetworkConnection {
pub fn new(flow: Flow, stream: AsyncPeekStream) -> Self { pub fn new(flow: Flow, stream: AsyncPeekStream) -> Self {
Self { flow, stream } Self {
flow,
stream: Mutex::new(Some(stream)),
}
} }
pub fn flow(&self) -> Flow { pub fn flow(&self) -> Flow {
@ -24,24 +27,10 @@ impl RawTcpNetworkConnection {
#[instrument(level = "trace", target = "protocol", err, skip_all)] #[instrument(level = "trace", target = "protocol", err, skip_all)]
pub async fn close(&self) -> io::Result<NetworkResult<()>> { pub async fn close(&self) -> io::Result<NetworkResult<()>> {
let mut stream = self.stream.clone(); // Drop the stream, without calling close, which calls shutdown, which causes TIME_WAIT regardless of SO_LINGER settings
let _ = stream.close().await; drop(self.stream.lock().take());
// let _ = stream.close().await;
Ok(NetworkResult::value(())) Ok(NetworkResult::value(()))
// // Then shut down the write side of the socket to effect a clean close
// cfg_if! {
// if #[cfg(feature="rt-async-std")] {
// self.tcp_stream
// .shutdown(async_std::net::Shutdown::Write)
// } else if #[cfg(feature="rt-tokio")] {
// use tokio::io::AsyncWriteExt;
// self.tcp_stream.get_mut()
// .shutdown()
// .await
// } else {
// compile_error!("needs executor implementation");
// }
// }
} }
#[instrument(level = "trace", target = "protocol", err, skip_all)] #[instrument(level = "trace", target = "protocol", err, skip_all)]
@ -63,7 +52,9 @@ impl RawTcpNetworkConnection {
#[instrument(level="trace", target="protocol", err, skip(self, message), fields(network_result, message.len = message.len()))] #[instrument(level="trace", target="protocol", err, skip(self, message), fields(network_result, message.len = message.len()))]
pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> { pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> {
let mut stream = self.stream.clone(); let Some(mut stream) = self.stream.lock().clone() else {
bail_io_error_other!("already closed");
};
let out = Self::send_internal(&mut stream, message).await?; let out = Self::send_internal(&mut stream, message).await?;
#[cfg(feature = "verbose-tracing")] #[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("network_result", &tracing::field::display(&out)); tracing::Span::current().record("network_result", &tracing::field::display(&out));
@ -96,7 +87,9 @@ impl RawTcpNetworkConnection {
#[instrument(level = "trace", target = "protocol", err, skip_all)] #[instrument(level = "trace", target = "protocol", err, skip_all)]
pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> { pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> {
let mut stream = self.stream.clone(); let Some(mut stream) = self.stream.lock().clone() else {
bail_io_error_other!("already closed");
};
let out = Self::recv_internal(&mut stream).await?; let out = Self::recv_internal(&mut stream).await?;
#[cfg(feature = "verbose-tracing")] #[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("network_result", &tracing::field::display(&out)); tracing::Span::current().record("network_result", &tracing::field::display(&out));
@ -156,7 +149,7 @@ impl RawTcpProtocolHandler {
Ok(Some(conn)) Ok(Some(conn))
} }
#[instrument(level = "trace", target = "protocol", err, skip_all)] #[instrument(level = "trace", target = "protocol", err)]
pub async fn connect( pub async fn connect(
local_address: Option<SocketAddr>, local_address: Option<SocketAddr>,
socket_addr: SocketAddr, socket_addr: SocketAddr,

View File

@ -97,6 +97,7 @@ where
Err(e) => err_to_network_result(e), Err(e) => err_to_network_result(e),
}; };
// This close does not do a TCP shutdown so it is safe and will not cause TIME_WAIT
let _ = stream.close().await; let _ = stream.close().await;
Ok(out) Ok(out)

View File

@ -98,7 +98,7 @@ pub(in crate::network_manager) struct NetworkConnection {
established_time: Timestamp, established_time: Timestamp,
/// Statistics about network traffic /// Statistics about network traffic
stats: Arc<Mutex<NetworkConnectionStats>>, stats: Arc<Mutex<NetworkConnectionStats>>,
/// To send data out this connection, it is places in this channel /// To send data out this connection, it is placed in this channel
sender: flume::Sender<(Option<Id>, Vec<u8>)>, sender: flume::Sender<(Option<Id>, Vec<u8>)>,
/// Drop this when we want to drop the connection /// Drop this when we want to drop the connection
stop_source: Option<StopSource>, stop_source: Option<StopSource>,

View File

@ -90,7 +90,7 @@ impl NetworkManager {
this.send_data_ncm_signal_reverse(relay_nr.clone(), target_node_ref.clone(), data.clone()) this.send_data_ncm_signal_reverse(relay_nr.clone(), target_node_ref.clone(), data.clone())
.await?; .await?;
if matches!(nres, NetworkResult::Timeout) { if matches!(nres, NetworkResult::Timeout) {
// Failed to holepunch, fallback to inbound relay // Failed to reverse-connect, fallback to inbound relay
let success = REVERSE_CONNECT_SUCCESS.load(Ordering::Acquire); let success = REVERSE_CONNECT_SUCCESS.load(Ordering::Acquire);
let failure = REVERSE_CONNECT_FAILURE.fetch_add(1, Ordering::AcqRel) + 1; let failure = REVERSE_CONNECT_FAILURE.fetch_add(1, Ordering::AcqRel) + 1;
let rate = (success as f64 * 100.0) / ((success + failure) as f64); let rate = (success as f64 * 100.0) / ((success + failure) as f64);
@ -181,7 +181,7 @@ impl NetworkManager {
}; };
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
target_node_ref.set_last_flow(flow, Timestamp::now()); self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now());
Ok(NetworkResult::value(SendDataMethod { Ok(NetworkResult::value(SendDataMethod {
contact_method: NodeContactMethod::Existing, contact_method: NodeContactMethod::Existing,
@ -216,7 +216,7 @@ impl NetworkManager {
}; };
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
target_node_ref.set_last_flow(flow, Timestamp::now()); self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now());
Ok(NetworkResult::value(SendDataMethod { Ok(NetworkResult::value(SendDataMethod {
contact_method: NodeContactMethod::Existing, contact_method: NodeContactMethod::Existing,
@ -233,12 +233,28 @@ impl NetworkManager {
target_node_ref: FilteredNodeRef, target_node_ref: FilteredNodeRef,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<SendDataMethod>> { ) -> EyreResult<NetworkResult<SendDataMethod>> {
// Make a noderef that meets the sequencing requirements
// But is not protocol-specific, or address-family-specific
// as a signalled node gets to choose its own dial info for the reverse connection.
let (_sorted, seq_dif) = target_node_ref
.dial_info_filter()
.apply_sequencing(target_node_ref.sequencing());
let seq_target_node_ref = if seq_dif.is_ordered_only() {
target_node_ref
.unfiltered()
.sequencing_filtered(Sequencing::EnsureOrdered)
} else {
target_node_ref
.unfiltered()
.sequencing_filtered(Sequencing::NoPreference)
};
// First try to send data to the last socket we've seen this peer on // First try to send data to the last socket we've seen this peer on
let data = if let Some(flow) = target_node_ref.last_flow() { let data = if let Some(flow) = seq_target_node_ref.last_flow() {
match self.net().send_data_to_existing_flow(flow, data).await? { match self.net().send_data_to_existing_flow(flow, data).await? {
SendDataToExistingFlowResult::Sent(unique_flow) => { SendDataToExistingFlowResult::Sent(unique_flow) => {
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
target_node_ref.set_last_flow(flow, Timestamp::now()); self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now());
return Ok(NetworkResult::value(SendDataMethod { return Ok(NetworkResult::value(SendDataMethod {
contact_method: NodeContactMethod::Existing, contact_method: NodeContactMethod::Existing,
@ -254,6 +270,12 @@ impl NetworkManager {
} }
} else { } else {
// No last connection // No last connection
#[cfg(feature = "verbose-tracing")]
log_net!(debug
"No last flow in reverse connect for {:?}",
target_node_ref
);
data data
}; };
@ -281,7 +303,7 @@ impl NetworkManager {
match self.net().send_data_to_existing_flow(flow, data).await? { match self.net().send_data_to_existing_flow(flow, data).await? {
SendDataToExistingFlowResult::Sent(unique_flow) => { SendDataToExistingFlowResult::Sent(unique_flow) => {
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
target_node_ref.set_last_flow(flow, Timestamp::now()); self.set_last_flow(target_node_ref.unfiltered(), flow, Timestamp::now());
return Ok(NetworkResult::value(SendDataMethod { return Ok(NetworkResult::value(SendDataMethod {
contact_method: NodeContactMethod::Existing, contact_method: NodeContactMethod::Existing,
@ -297,6 +319,12 @@ impl NetworkManager {
} }
} else { } else {
// No last connection // No last connection
#[cfg(feature = "verbose-tracing")]
log_net!(debug
"No last flow in hole punch for {:?}",
target_node_ref
);
data data
}; };
@ -333,7 +361,7 @@ impl NetworkManager {
match self.net().send_data_to_existing_flow(flow, data).await? { match self.net().send_data_to_existing_flow(flow, data).await? {
SendDataToExistingFlowResult::Sent(unique_flow) => { SendDataToExistingFlowResult::Sent(unique_flow) => {
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
node_ref.set_last_flow(flow, Timestamp::now()); self.set_last_flow(node_ref.unfiltered(), flow, Timestamp::now());
return Ok(NetworkResult::value(SendDataMethod { return Ok(NetworkResult::value(SendDataMethod {
contact_method: NodeContactMethod::Existing, contact_method: NodeContactMethod::Existing,
@ -359,7 +387,7 @@ impl NetworkManager {
); );
// If we connected to this node directly, save off the last connection so we can use it again // If we connected to this node directly, save off the last connection so we can use it again
node_ref.set_last_flow(unique_flow.flow, Timestamp::now()); self.set_last_flow(node_ref.unfiltered(), unique_flow.flow, Timestamp::now());
Ok(NetworkResult::value(SendDataMethod { Ok(NetworkResult::value(SendDataMethod {
contact_method: NodeContactMethod::Direct(dial_info), contact_method: NodeContactMethod::Direct(dial_info),

View File

@ -60,6 +60,14 @@ impl DialInfoFilter {
} }
// return ordered sort and filter with ensure applied // return ordered sort and filter with ensure applied
} }
pub fn is_ordered_only(&self) -> bool {
for pt in self.protocol_type_set {
if !pt.is_ordered() {
return false;
}
}
true
}
} }
impl fmt::Debug for DialInfoFilter { impl fmt::Debug for DialInfoFilter {

View File

@ -158,7 +158,7 @@ impl Bucket {
} }
extra_entries -= 1; extra_entries -= 1;
// if this entry has references we can't drop it yet // if this entry has NodeRef references we can't drop it yet
if entry.1.ref_count.load(Ordering::Acquire) > 0 { if entry.1.ref_count.load(Ordering::Acquire) > 0 {
continue; continue;
} }
@ -170,11 +170,16 @@ impl Bucket {
// if no references, lets evict it // if no references, lets evict it
dead_node_ids.insert(entry.0); dead_node_ids.insert(entry.0);
// And remove the node id from the entry
entry.1.with_mut_inner(|e| e.remove_node_id(self.kind));
} }
// Now purge the dead node ids // Now purge the dead node ids
for id in &dead_node_ids { for id in &dead_node_ids {
// Remove the entry // Remove the entry
// The entry may not be completely gone after this happens
// because it may still be in another bucket for a different CryptoKind
self.remove_entry(id); self.remove_entry(id);
} }

View File

@ -299,6 +299,20 @@ impl BucketEntryInner {
node_ids.add(node_id); node_ids.add(node_id);
Ok(None) Ok(None)
} }
/// Remove a node id for a particular crypto kind.
/// Returns Some(node) any previous existing node id associated with that crypto kind
/// Returns None if no previous existing node id was associated with that crypto kind
pub fn remove_node_id(&mut self, crypto_kind: CryptoKind) -> Option<TypedKey> {
let node_ids = if VALID_CRYPTO_KINDS.contains(&crypto_kind) {
&mut self.validated_node_ids
} else {
&mut self.unsupported_node_ids
};
node_ids.remove(crypto_kind)
}
pub fn best_node_id(&self) -> TypedKey { pub fn best_node_id(&self) -> TypedKey {
self.validated_node_ids.best().unwrap() self.validated_node_ids.best().unwrap()
} }
@ -395,10 +409,13 @@ impl BucketEntryInner {
move |e1, e2| Self::cmp_fastest_reliable(cur_ts, e1, e2) move |e1, e2| Self::cmp_fastest_reliable(cur_ts, e1, e2)
} }
// xxx: if we ever implement a 'remove_signed_node_info' to take nodes out of a routing domain
// then we need to call 'on_entry_node_info_updated' with that removal. as of right now
// this never happens, because we only have one routing domain implemented.
pub fn update_signed_node_info( pub fn update_signed_node_info(
&mut self, &mut self,
routing_domain: RoutingDomain, routing_domain: RoutingDomain,
signed_node_info: SignedNodeInfo, signed_node_info: &SignedNodeInfo,
) -> bool { ) -> bool {
// Get the correct signed_node_info for the chosen routing domain // Get the correct signed_node_info for the chosen routing domain
let opt_current_sni = match routing_domain { let opt_current_sni = match routing_domain {
@ -439,7 +456,7 @@ impl BucketEntryInner {
// Update the signed node info // Update the signed node info
// Let the node try to live again but don't mark it as seen yet // Let the node try to live again but don't mark it as seen yet
*opt_current_sni = Some(Box::new(signed_node_info)); *opt_current_sni = Some(Box::new(signed_node_info.clone()));
self.set_envelope_support(envelope_support); self.set_envelope_support(envelope_support);
self.updated_since_last_network_change = true; self.updated_since_last_network_change = true;
self.make_not_dead(Timestamp::now()); self.make_not_dead(Timestamp::now());
@ -550,7 +567,7 @@ impl BucketEntryInner {
} }
// Stores a flow in this entry's table of last flows // Stores a flow in this entry's table of last flows
pub fn set_last_flow(&mut self, last_flow: Flow, timestamp: Timestamp) { pub(super) fn set_last_flow(&mut self, last_flow: Flow, timestamp: Timestamp) {
if self.punishment.is_some() { if self.punishment.is_some() {
// Don't record connection if this entry is currently punished // Don't record connection if this entry is currently punished
return; return;
@ -560,18 +577,18 @@ impl BucketEntryInner {
} }
// Removes a flow in this entry's table of last flows // Removes a flow in this entry's table of last flows
pub fn remove_last_flow(&mut self, last_flow: Flow) { pub(super) fn remove_last_flow(&mut self, last_flow: Flow) {
let key = self.flow_to_key(last_flow); let key = self.flow_to_key(last_flow);
self.last_flows.remove(&key); self.last_flows.remove(&key);
} }
// Clears the table of last flows to ensure we create new ones and drop any existing ones // Clears the table of last flows to ensure we create new ones and drop any existing ones
pub fn clear_last_flows(&mut self) { pub(super) fn clear_last_flows(&mut self) {
self.last_flows.clear(); self.last_flows.clear();
} }
// Clears the table of last flows except the most recent one // Clears the table of last flows except the most recent one
pub fn clear_last_flows_except_latest(&mut self) { pub(super) fn clear_last_flows_except_latest(&mut self) {
if self.last_flows.is_empty() { if self.last_flows.is_empty() {
// No last_connections // No last_connections
return; return;
@ -658,7 +675,7 @@ impl BucketEntryInner {
out out
} }
pub fn add_envelope_version(&mut self, envelope_version: u8) { pub(super) fn add_envelope_version(&mut self, envelope_version: u8) {
if self.envelope_support.contains(&envelope_version) { if self.envelope_support.contains(&envelope_version) {
return; return;
} }
@ -667,7 +684,7 @@ impl BucketEntryInner {
self.envelope_support.dedup(); self.envelope_support.dedup();
} }
pub fn set_envelope_support(&mut self, mut envelope_support: Vec<u8>) { pub(super) fn set_envelope_support(&mut self, mut envelope_support: Vec<u8>) {
envelope_support.sort(); envelope_support.sort();
envelope_support.dedup(); envelope_support.dedup();
self.envelope_support = envelope_support; self.envelope_support = envelope_support;

View File

@ -211,6 +211,8 @@ impl RoutingTable {
out += &format!("{:?}: {}: {}\n", routing_domain, crypto_kind, count); out += &format!("{:?}: {}: {}\n", routing_domain, crypto_kind, count);
} }
for ck in &VALID_CRYPTO_KINDS { for ck in &VALID_CRYPTO_KINDS {
let our_node_id = self.unlocked_inner.node_id(*ck);
let mut filtered_total = 0; let mut filtered_total = 0;
let mut b = 0; let mut b = 0;
let blen = inner.buckets[ck].len(); let blen = inner.buckets[ck].len();
@ -236,17 +238,28 @@ impl RoutingTable {
.relay_node(RoutingDomain::PublicInternet) .relay_node(RoutingDomain::PublicInternet)
.map(|r| r.same_bucket_entry(e.1)) .map(|r| r.same_bucket_entry(e.1))
.unwrap_or(false); .unwrap_or(false);
let relay_tag = if is_relay {
"R" let is_relaying =
} else if can_be_relay { e.1.with(inner, |_rti, e| {
"r" e.signed_node_info(RoutingDomain::PublicInternet)
} else { .map(|sni| sni.relay_ids().contains(&our_node_id))
"-" })
}; .unwrap_or(false);
let relay_tag = format!(
"{}{}",
if is_relay {
"R"
} else if can_be_relay {
"r"
} else {
"-"
},
if is_relaying { ">" } else { "-" }
);
out += " "; out += " ";
out += &e.1.with(inner, |_rti, e| { out += &e.1.with(inner, |_rti, e| {
Self::format_entry(cur_ts, TypedKey::new(*ck, node), e, relay_tag) Self::format_entry(cur_ts, TypedKey::new(*ck, node), e, &relay_tag)
}); });
out += "\n"; out += "\n";
} }
@ -267,7 +280,9 @@ impl RoutingTable {
) -> String { ) -> String {
let cur_ts = Timestamp::now(); let cur_ts = Timestamp::now();
let relay_node_filter = self.make_public_internet_relay_node_filter(); let relay_node_filter = self.make_public_internet_relay_node_filter();
let our_node_ids = self.unlocked_inner.node_ids();
let mut relay_count = 0usize; let mut relay_count = 0usize;
let mut relaying_count = 0usize;
let mut filters = VecDeque::new(); let mut filters = VecDeque::new();
filters.push_front( filters.push_front(
@ -297,28 +312,43 @@ impl RoutingTable {
.relay_node(RoutingDomain::PublicInternet) .relay_node(RoutingDomain::PublicInternet)
.map(|r| r.same_entry(&node)) .map(|r| r.same_entry(&node))
.unwrap_or(false); .unwrap_or(false);
let relay_tag = if is_relay {
"R" let is_relaying = node
} else if can_be_relay { .operate(|_rti, e| {
"r" e.signed_node_info(RoutingDomain::PublicInternet)
} else { .map(|sni| sni.relay_ids().contains_any(&our_node_ids))
"-" })
}; .unwrap_or(false);
let relay_tag = format!(
"{}{}",
if is_relay {
"R"
} else if can_be_relay {
"r"
} else {
"-"
},
if is_relaying { ">" } else { "-" }
);
if can_be_relay { if can_be_relay {
relay_count += 1; relay_count += 1;
} }
if is_relaying {
relaying_count += 1;
}
out += " "; out += " ";
out += &node out += &node
.operate(|_rti, e| Self::format_entry(cur_ts, node.best_node_id(), e, relay_tag)); .operate(|_rti, e| Self::format_entry(cur_ts, node.best_node_id(), e, &relay_tag));
out += "\n"; out += "\n";
} }
out += &format!( out += &format!(
"Entries: {} Relays: {} Relay %: {:.2}\n", "Entries: {}\nRelay Capable: {} Relay Capable %: {:.2}\nRelaying Through This Node: {}\n",
entry_count, entry_count,
relay_count, relay_count,
(relay_count as f64) * 100.0 / (entry_count as f64) (relay_count as f64) * 100.0 / (entry_count as f64),
relaying_count,
); );
out out

View File

@ -703,22 +703,18 @@ impl RoutingTable {
} }
/// Shortcut function to add a node to our routing table if it doesn't exist /// Shortcut function to add a node to our routing table if it doesn't exist
/// and add the last peer address we have for it, since that's pretty common /// Returns a noderef filtered to
/// the routing domain in which this node was registered for convenience.
#[instrument(level = "trace", skip_all, err)] #[instrument(level = "trace", skip_all, err)]
pub fn register_node_with_existing_connection( pub fn register_node_with_id(
&self, &self,
routing_domain: RoutingDomain, routing_domain: RoutingDomain,
node_id: TypedKey, node_id: TypedKey,
flow: Flow,
timestamp: Timestamp, timestamp: Timestamp,
) -> EyreResult<FilteredNodeRef> { ) -> EyreResult<FilteredNodeRef> {
self.inner.write().register_node_with_existing_connection( self.inner
self.clone(), .write()
routing_domain, .register_node_with_id(self.clone(), routing_domain, node_id, timestamp)
node_id,
flow,
timestamp,
)
} }
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////

View File

@ -245,6 +245,19 @@ pub trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait {
}) })
} }
fn is_relaying(&self, routing_domain: RoutingDomain) -> bool {
self.operate(|rti, e| {
let Some(relay_ids) = e
.signed_node_info(routing_domain)
.map(|sni| sni.relay_ids())
else {
return false;
};
let our_node_ids = rti.unlocked_inner.node_ids();
our_node_ids.contains_any(&relay_ids)
})
}
fn has_any_dial_info(&self) -> bool { fn has_any_dial_info(&self) -> bool {
self.operate(|_rti, e| { self.operate(|_rti, e| {
for rtd in RoutingDomain::all() { for rtd in RoutingDomain::all() {

View File

@ -334,10 +334,7 @@ impl RoutingTableInner {
/// Attempt to remove last_connections from entries /// Attempt to remove last_connections from entries
pub fn purge_last_connections(&mut self) { pub fn purge_last_connections(&mut self) {
log_rtab!( log_rtab!("Starting routing table last_connections purge.");
"Starting routing table last_connections purge. Table currently has {} nodes",
self.bucket_entry_count()
);
for ck in VALID_CRYPTO_KINDS { for ck in VALID_CRYPTO_KINDS {
for bucket in &self.buckets[&ck] { for bucket in &self.buckets[&ck] {
for entry in bucket.entries() { for entry in bucket.entries() {
@ -347,12 +344,7 @@ impl RoutingTableInner {
} }
} }
} }
self.all_entries.remove_expired(); log_rtab!(debug "Routing table last_connections purge complete.");
log_rtab!(debug
"Routing table last_connections purge complete. Routing table now has {} nodes",
self.bucket_entry_count()
);
} }
/// Attempt to settle buckets and remove entries down to the desired number /// Attempt to settle buckets and remove entries down to the desired number
@ -366,13 +358,6 @@ impl RoutingTableInner {
self.all_entries.remove_expired(); self.all_entries.remove_expired();
log_rtab!(debug "Bucket {}:{} kicked Routing table now has {} nodes\nKicked nodes:{:#?}", bucket_index.0, bucket_index.1, self.bucket_entry_count(), dead_node_ids); log_rtab!(debug "Bucket {}:{} kicked Routing table now has {} nodes\nKicked nodes:{:#?}", bucket_index.0, bucket_index.1, self.bucket_entry_count(), dead_node_ids);
// Now purge the routing table inner vectors
//let filter = |k: &DHTKey| dead_node_ids.contains(k);
//inner.closest_reliable_nodes.retain(filter);
//inner.fastest_reliable_nodes.retain(filter);
//inner.closest_nodes.retain(filter);
//inner.fastest_nodes.retain(filter);
} }
} }
@ -611,21 +596,38 @@ impl RoutingTableInner {
} }
// Update buckets with new node ids we may have learned belong to this entry // Update buckets with new node ids we may have learned belong to this entry
fn update_bucket_entries( fn update_bucket_entry_node_ids(
&mut self, &mut self,
entry: Arc<BucketEntry>, entry: Arc<BucketEntry>,
node_ids: &[TypedKey], node_ids: &[TypedKey],
) -> EyreResult<()> { ) -> EyreResult<()> {
entry.with_mut_inner(|e| { entry.with_mut_inner(|e| {
let existing_node_ids = e.node_ids(); let mut existing_node_ids = e.node_ids();
// Peer infos for all routing domains we have
let mut old_peer_infos = vec![];
for node_id in node_ids { for node_id in node_ids {
let ck = node_id.kind;
let is_existing_node_id = existing_node_ids.contains(node_id);
existing_node_ids.remove(ck);
// Skip node ids that exist already // Skip node ids that exist already
if existing_node_ids.contains(node_id) { if is_existing_node_id {
continue; continue;
} }
// New node id, get the old peer info if we don't have it yet
if old_peer_infos.is_empty() {
for rd in RoutingDomainSet::all() {
if let Some(old_peer_info) = e.make_peer_info(rd) {
old_peer_infos.push(old_peer_info);
}
}
}
// Add new node id to entry // Add new node id to entry
let ck = node_id.kind;
if let Some(old_node_id) = e.add_node_id(*node_id)? { if let Some(old_node_id) = e.add_node_id(*node_id)? {
// Remove any old node id for this crypto kind // Remove any old node id for this crypto kind
if VALID_CRYPTO_KINDS.contains(&ck) { if VALID_CRYPTO_KINDS.contains(&ck) {
@ -646,6 +648,35 @@ impl RoutingTableInner {
self.unlocked_inner.kick_queue.lock().insert(bucket_index); self.unlocked_inner.kick_queue.lock().insert(bucket_index);
} }
} }
// Remove from buckets if node id wasn't seen in new peer info list
for node_id in existing_node_ids.iter() {
let ck = node_id.kind;
if VALID_CRYPTO_KINDS.contains(&ck) {
let bucket_index = self.unlocked_inner.calculate_bucket_index(node_id);
let bucket = self.get_bucket_mut(bucket_index);
bucket.remove_entry(&node_id.value);
entry.with_mut_inner(|e| e.remove_node_id(ck));
}
}
// New node id, get the old peer info if we don't have it yet
if !old_peer_infos.is_empty() {
let mut new_peer_infos = vec![];
for rd in RoutingDomainSet::all() {
if let Some(new_peer_info) = e.make_peer_info(rd) {
new_peer_infos.push(new_peer_info);
}
}
// adding a node id should never change what routing domains peers are in
// so we should have a 1:1 ordered mapping here to update with the new nodeids
assert_eq!(old_peer_infos.len(), new_peer_infos.len());
for (old_pi, new_pi) in old_peer_infos.into_iter().zip(new_peer_infos.into_iter()) {
assert_eq!(old_pi.routing_domain(), new_pi.routing_domain());
self.on_entry_peer_info_updated(Some(old_pi), Some(new_pi));
}
}
Ok(()) Ok(())
}) })
} }
@ -694,7 +725,7 @@ impl RoutingTableInner {
// If the entry does exist already, update it // If the entry does exist already, update it
if let Some(best_entry) = best_entry { if let Some(best_entry) = best_entry {
// Update the entry with all of the node ids // Update the entry with all of the node ids
if let Err(e) = self.update_bucket_entries(best_entry.clone(), node_ids) { if let Err(e) = self.update_bucket_entry_node_ids(best_entry.clone(), node_ids) {
bail!("Not registering new ids for existing node: {}", e); bail!("Not registering new ids for existing node: {}", e);
} }
@ -717,7 +748,7 @@ impl RoutingTableInner {
self.unlocked_inner.kick_queue.lock().insert(bucket_entry); self.unlocked_inner.kick_queue.lock().insert(bucket_entry);
// Update the other bucket entries with the remaining node ids // Update the other bucket entries with the remaining node ids
if let Err(e) = self.update_bucket_entries(new_entry.clone(), node_ids) { if let Err(e) = self.update_bucket_entry_node_ids(new_entry.clone(), node_ids) {
bail!("Not registering new node: {}", e); bail!("Not registering new node: {}", e);
} }
@ -870,53 +901,99 @@ impl RoutingTableInner {
let (_routing_domain, node_ids, signed_node_info) = let (_routing_domain, node_ids, signed_node_info) =
Arc::unwrap_or_clone(peer_info).destructure(); Arc::unwrap_or_clone(peer_info).destructure();
let mut updated = false; let mut updated = false;
let mut old_peer_info = None;
let nr = self.create_node_ref(outer_self, &node_ids, |_rti, e| { let nr = self.create_node_ref(outer_self, &node_ids, |_rti, e| {
updated = e.update_signed_node_info(routing_domain, signed_node_info); old_peer_info = e.make_peer_info(routing_domain);
updated = e.update_signed_node_info(routing_domain, &signed_node_info);
})?; })?;
if updated { // Process any new or updated PeerInfo
// If this is our relay, then redo our own peerinfo because if old_peer_info.is_none() || updated {
// if we have relayed peerinfo, then changing the relay's peerinfo let new_peer_info = nr.locked(self).make_peer_info(routing_domain);
// changes our own peer info self.on_entry_peer_info_updated(old_peer_info, new_peer_info);
self.with_routing_domain(routing_domain, |rd| {
let opt_our_relay_node_ids = rd
.relay_node()
.map(|relay_nr| relay_nr.locked(self).node_ids());
if let Some(our_relay_node_ids) = opt_our_relay_node_ids {
if our_relay_node_ids.contains_any(&node_ids) {
rd.refresh();
rd.publish_peer_info(self);
}
}
});
} }
Ok(nr.custom_filtered(NodeRefFilter::new().with_routing_domain(routing_domain))) Ok(nr.custom_filtered(NodeRefFilter::new().with_routing_domain(routing_domain)))
} }
/// Shortcut function to add a node to our routing table if it doesn't exist /// Shortcut function to add a node to our routing table if it doesn't exist
/// and add the last peer address we have for it, since that's pretty common /// Returns a noderef filtered to
/// the routing domain in which this node was registered for convenience.
#[instrument(level = "trace", skip_all, err)] #[instrument(level = "trace", skip_all, err)]
pub fn register_node_with_existing_connection( pub fn register_node_with_id(
&mut self, &mut self,
outer_self: RoutingTable, outer_self: RoutingTable,
routing_domain: RoutingDomain, routing_domain: RoutingDomain,
node_id: TypedKey, node_id: TypedKey,
flow: Flow,
timestamp: Timestamp, timestamp: Timestamp,
) -> EyreResult<FilteredNodeRef> { ) -> EyreResult<FilteredNodeRef> {
let nr = self.create_node_ref(outer_self, &TypedKeyGroup::from(node_id), |_rti, e| { let nr = self.create_node_ref(outer_self, &TypedKeyGroup::from(node_id), |_rti, e| {
//e.make_not_dead(timestamp); //e.make_not_dead(timestamp);
e.touch_last_seen(timestamp); e.touch_last_seen(timestamp);
})?; })?;
// set the most recent node address for connection finding and udp replies
nr.locked_mut(self).set_last_flow(flow, timestamp);
// Enforce routing domain // Enforce routing domain
let nr = nr.custom_filtered(NodeRefFilter::new().with_routing_domain(routing_domain)); let nr = nr.custom_filtered(NodeRefFilter::new().with_routing_domain(routing_domain));
Ok(nr) Ok(nr)
} }
/// Called whenever a routing table entry is:
/// 1. created or updated with new peer information
/// 2. has a node id added or removed (per CryptoKind)
/// * by a new peer info showing up with a different overlapping node id list
/// * by a bucket kick removing an entry from a bucket for some cryptokind
/// 3. (todo) is removed from some routing domain (peer info gone)
///
/// It is not called when:
/// 1. nodes are registered by id for an existing connection but have no peer info yet
/// 2. nodes are removed that don't have any peer info
fn on_entry_peer_info_updated(
&mut self,
old_peer_info: Option<PeerInfo>,
new_peer_info: Option<PeerInfo>,
) {
let (routing_domain, node_ids) = match (old_peer_info.as_ref(), new_peer_info.as_ref()) {
(None, None) => {
return;
}
(None, Some(new_pi)) => (new_pi.routing_domain(), new_pi.node_ids().clone()),
(Some(old_pi), None) => (old_pi.routing_domain(), old_pi.node_ids().clone()),
(Some(old_pi), Some(new_pi)) => {
assert_eq!(
old_pi.routing_domain(),
new_pi.routing_domain(),
"routing domains should be the same here",
);
let mut node_ids = old_pi.node_ids().clone();
node_ids.add_all(new_pi.node_ids());
(new_pi.routing_domain(), node_ids)
}
};
// If this is our relay, then redo our own peerinfo because
// if we have relayed peerinfo, then changing the relay's peerinfo
// changes our own peer info
self.with_routing_domain(routing_domain, |rd| {
let opt_our_relay_node_ids = rd
.relay_node()
.map(|relay_nr| relay_nr.locked(self).node_ids());
if let Some(our_relay_node_ids) = opt_our_relay_node_ids {
if our_relay_node_ids.contains_any(&node_ids) {
rd.refresh();
rd.publish_peer_info(self);
}
}
});
// Update tables that use peer info
// if let Some(_old_pi) = old_peer_info {
// // Remove old info
// }
// if let Some(_new_pi) = new_peer_info {
// // Add new info
// }
}
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////
// Routing Table Health Metrics // Routing Table Health Metrics

View File

@ -99,7 +99,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
return false; return false;
} }
// If we need a relay and we don't have one, don't publish yet // If we need a relay and we don't have one, d on't publish yet
if let Some(_relay_kind) = self.requires_relay() { if let Some(_relay_kind) = self.requires_relay() {
if pi.signed_node_info().relay_ids().is_empty() { if pi.signed_node_info().relay_ids().is_empty() {
log_rtab!(debug "[PublicInternet] Not publishing peer info that wants relay until we have a relay"); log_rtab!(debug "[PublicInternet] Not publishing peer info that wants relay until we have a relay");

View File

@ -714,7 +714,6 @@ impl VeilidAPI {
async fn debug_nodeinfo(&self, _args: String) -> VeilidAPIResult<String> { async fn debug_nodeinfo(&self, _args: String) -> VeilidAPIResult<String> {
// Dump routing table entry // Dump routing table entry
let routing_table = self.network_manager()?.routing_table(); let routing_table = self.network_manager()?.routing_table();
let connection_manager = self.network_manager()?.connection_manager();
let nodeinfo = routing_table.debug_info_nodeinfo(); let nodeinfo = routing_table.debug_info_nodeinfo();
// Dump core state // Dump core state
@ -737,7 +736,12 @@ impl VeilidAPI {
} }
// Dump connection table // Dump connection table
let connman = connection_manager.debug_print().await; let connman =
if let Some(connection_manager) = self.network_manager()?.opt_connection_manager() {
connection_manager.debug_print().await
} else {
"Connection manager unavailable when detached".to_owned()
};
Ok(format!("{}\n{}\n{}\n", nodeinfo, peertable, connman)) Ok(format!("{}\n{}\n{}\n", nodeinfo, peertable, connman))
} }

View File

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. # This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand.
[[package]] [[package]]
name = "appdirs" name = "appdirs"
@ -43,83 +43,73 @@ files = [
[[package]] [[package]]
name = "coverage" name = "coverage"
version = "7.6.1" version = "7.6.3"
description = "Code coverage measurement for Python" description = "Code coverage measurement for Python"
optional = false optional = false
python-versions = ">=3.8" python-versions = ">=3.9"
files = [ files = [
{file = "coverage-7.6.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b06079abebbc0e89e6163b8e8f0e16270124c154dc6e4a47b413dd538859af16"}, {file = "coverage-7.6.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:6da42bbcec130b188169107ecb6ee7bd7b4c849d24c9370a0c884cf728d8e976"},
{file = "coverage-7.6.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:cf4b19715bccd7ee27b6b120e7e9dd56037b9c0681dcc1adc9ba9db3d417fa36"}, {file = "coverage-7.6.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c222958f59b0ae091f4535851cbb24eb57fc0baea07ba675af718fb5302dddb2"},
{file = "coverage-7.6.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e61c0abb4c85b095a784ef23fdd4aede7a2628478e7baba7c5e3deba61070a02"}, {file = "coverage-7.6.3-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ab84a8b698ad5a6c365b08061920138e7a7dd9a04b6feb09ba1bfae68346ce6d"},
{file = "coverage-7.6.1-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fd21f6ae3f08b41004dfb433fa895d858f3f5979e7762d052b12aef444e29afc"}, {file = "coverage-7.6.3-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:70a6756ce66cd6fe8486c775b30889f0dc4cb20c157aa8c35b45fd7868255c5c"},
{file = "coverage-7.6.1-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8f59d57baca39b32db42b83b2a7ba6f47ad9c394ec2076b084c3f029b7afca23"}, {file = "coverage-7.6.3-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3c2e6fa98032fec8282f6b27e3f3986c6e05702828380618776ad794e938f53a"},
{file = "coverage-7.6.1-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:a1ac0ae2b8bd743b88ed0502544847c3053d7171a3cff9228af618a068ed9c34"}, {file = "coverage-7.6.3-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:921fbe13492caf6a69528f09d5d7c7d518c8d0e7b9f6701b7719715f29a71e6e"},
{file = "coverage-7.6.1-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:e6a08c0be454c3b3beb105c0596ebdc2371fab6bb90c0c0297f4e58fd7e1012c"}, {file = "coverage-7.6.3-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:6d99198203f0b9cb0b5d1c0393859555bc26b548223a769baf7e321a627ed4fc"},
{file = "coverage-7.6.1-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:f5796e664fe802da4f57a168c85359a8fbf3eab5e55cd4e4569fbacecc903959"}, {file = "coverage-7.6.3-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:87cd2e29067ea397a47e352efb13f976eb1b03e18c999270bb50589323294c6e"},
{file = "coverage-7.6.1-cp310-cp310-win32.whl", hash = "sha256:7bb65125fcbef8d989fa1dd0e8a060999497629ca5b0efbca209588a73356232"}, {file = "coverage-7.6.3-cp310-cp310-win32.whl", hash = "sha256:a3328c3e64ea4ab12b85999eb0779e6139295bbf5485f69d42cf794309e3d007"},
{file = "coverage-7.6.1-cp310-cp310-win_amd64.whl", hash = "sha256:3115a95daa9bdba70aea750db7b96b37259a81a709223c8448fa97727d546fe0"}, {file = "coverage-7.6.3-cp310-cp310-win_amd64.whl", hash = "sha256:bca4c8abc50d38f9773c1ec80d43f3768df2e8576807d1656016b9d3eeaa96fd"},
{file = "coverage-7.6.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:7dea0889685db8550f839fa202744652e87c60015029ce3f60e006f8c4462c93"}, {file = "coverage-7.6.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:c51ef82302386d686feea1c44dbeef744585da16fcf97deea2a8d6c1556f519b"},
{file = "coverage-7.6.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:ed37bd3c3b063412f7620464a9ac1314d33100329f39799255fb8d3027da50d3"}, {file = "coverage-7.6.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:0ca37993206402c6c35dc717f90d4c8f53568a8b80f0bf1a1b2b334f4d488fba"},
{file = "coverage-7.6.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d85f5e9a5f8b73e2350097c3756ef7e785f55bd71205defa0bfdaf96c31616ff"}, {file = "coverage-7.6.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c77326300b839c44c3e5a8fe26c15b7e87b2f32dfd2fc9fee1d13604347c9b38"},
{file = "coverage-7.6.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9bc572be474cafb617672c43fe989d6e48d3c83af02ce8de73fff1c6bb3c198d"}, {file = "coverage-7.6.3-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6e484e479860e00da1f005cd19d1c5d4a813324e5951319ac3f3eefb497cc549"},
{file = "coverage-7.6.1-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0c0420b573964c760df9e9e86d1a9a622d0d27f417e1a949a8a66dd7bcee7bc6"}, {file = "coverage-7.6.3-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0c6c0f4d53ef603397fc894a895b960ecd7d44c727df42a8d500031716d4e8d2"},
{file = "coverage-7.6.1-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:1f4aa8219db826ce6be7099d559f8ec311549bfc4046f7f9fe9b5cea5c581c56"}, {file = "coverage-7.6.3-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:37be7b5ea3ff5b7c4a9db16074dc94523b5f10dd1f3b362a827af66a55198175"},
{file = "coverage-7.6.1-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:fc5a77d0c516700ebad189b587de289a20a78324bc54baee03dd486f0855d234"}, {file = "coverage-7.6.3-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:43b32a06c47539fe275106b376658638b418c7cfdfff0e0259fbf877e845f14b"},
{file = "coverage-7.6.1-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:b48f312cca9621272ae49008c7f613337c53fadca647d6384cc129d2996d1133"}, {file = "coverage-7.6.3-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:ee77c7bef0724165e795b6b7bf9c4c22a9b8468a6bdb9c6b4281293c6b22a90f"},
{file = "coverage-7.6.1-cp311-cp311-win32.whl", hash = "sha256:1125ca0e5fd475cbbba3bb67ae20bd2c23a98fac4e32412883f9bcbaa81c314c"}, {file = "coverage-7.6.3-cp311-cp311-win32.whl", hash = "sha256:43517e1f6b19f610a93d8227e47790722c8bf7422e46b365e0469fc3d3563d97"},
{file = "coverage-7.6.1-cp311-cp311-win_amd64.whl", hash = "sha256:8ae539519c4c040c5ffd0632784e21b2f03fc1340752af711f33e5be83a9d6c6"}, {file = "coverage-7.6.3-cp311-cp311-win_amd64.whl", hash = "sha256:04f2189716e85ec9192df307f7c255f90e78b6e9863a03223c3b998d24a3c6c6"},
{file = "coverage-7.6.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:95cae0efeb032af8458fc27d191f85d1717b1d4e49f7cb226cf526ff28179778"}, {file = "coverage-7.6.3-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:27bd5f18d8f2879e45724b0ce74f61811639a846ff0e5c0395b7818fae87aec6"},
{file = "coverage-7.6.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:5621a9175cf9d0b0c84c2ef2b12e9f5f5071357c4d2ea6ca1cf01814f45d2391"}, {file = "coverage-7.6.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:d546cfa78844b8b9c1c0533de1851569a13f87449897bbc95d698d1d3cb2a30f"},
{file = "coverage-7.6.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:260933720fdcd75340e7dbe9060655aff3af1f0c5d20f46b57f262ab6c86a5e8"}, {file = "coverage-7.6.3-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9975442f2e7a5cfcf87299c26b5a45266ab0696348420049b9b94b2ad3d40234"},
{file = "coverage-7.6.1-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:07e2ca0ad381b91350c0ed49d52699b625aab2b44b65e1b4e02fa9df0e92ad2d"}, {file = "coverage-7.6.3-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:583049c63106c0555e3ae3931edab5669668bbef84c15861421b94e121878d3f"},
{file = "coverage-7.6.1-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c44fee9975f04b33331cb8eb272827111efc8930cfd582e0320613263ca849ca"}, {file = "coverage-7.6.3-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2341a78ae3a5ed454d524206a3fcb3cec408c2a0c7c2752cd78b606a2ff15af4"},
{file = "coverage-7.6.1-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:877abb17e6339d96bf08e7a622d05095e72b71f8afd8a9fefc82cf30ed944163"}, {file = "coverage-7.6.3-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:a4fb91d5f72b7e06a14ff4ae5be625a81cd7e5f869d7a54578fc271d08d58ae3"},
{file = "coverage-7.6.1-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:3e0cadcf6733c09154b461f1ca72d5416635e5e4ec4e536192180d34ec160f8a"}, {file = "coverage-7.6.3-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:e279f3db904e3b55f520f11f983cc8dc8a4ce9b65f11692d4718ed021ec58b83"},
{file = "coverage-7.6.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:c3c02d12f837d9683e5ab2f3d9844dc57655b92c74e286c262e0fc54213c216d"}, {file = "coverage-7.6.3-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:aa23ce39661a3e90eea5f99ec59b763b7d655c2cada10729ed920a38bfc2b167"},
{file = "coverage-7.6.1-cp312-cp312-win32.whl", hash = "sha256:e05882b70b87a18d937ca6768ff33cc3f72847cbc4de4491c8e73880766718e5"}, {file = "coverage-7.6.3-cp312-cp312-win32.whl", hash = "sha256:52ac29cc72ee7e25ace7807249638f94c9b6a862c56b1df015d2b2e388e51dbd"},
{file = "coverage-7.6.1-cp312-cp312-win_amd64.whl", hash = "sha256:b5d7b556859dd85f3a541db6a4e0167b86e7273e1cdc973e5b175166bb634fdb"}, {file = "coverage-7.6.3-cp312-cp312-win_amd64.whl", hash = "sha256:40e8b1983080439d4802d80b951f4a93d991ef3261f69e81095a66f86cf3c3c6"},
{file = "coverage-7.6.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:a4acd025ecc06185ba2b801f2de85546e0b8ac787cf9d3b06e7e2a69f925b106"}, {file = "coverage-7.6.3-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:9134032f5aa445ae591c2ba6991d10136a1f533b1d2fa8f8c21126468c5025c6"},
{file = "coverage-7.6.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:a6d3adcf24b624a7b778533480e32434a39ad8fa30c315208f6d3e5542aeb6e9"}, {file = "coverage-7.6.3-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:99670790f21a96665a35849990b1df447993880bb6463a0a1d757897f30da929"},
{file = "coverage-7.6.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d0c212c49b6c10e6951362f7c6df3329f04c2b1c28499563d4035d964ab8e08c"}, {file = "coverage-7.6.3-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2dc7d6b380ca76f5e817ac9eef0c3686e7834c8346bef30b041a4ad286449990"},
{file = "coverage-7.6.1-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6e81d7a3e58882450ec4186ca59a3f20a5d4440f25b1cff6f0902ad890e6748a"}, {file = "coverage-7.6.3-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f7b26757b22faf88fcf232f5f0e62f6e0fd9e22a8a5d0d5016888cdfe1f6c1c4"},
{file = "coverage-7.6.1-cp313-cp313-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:78b260de9790fd81e69401c2dc8b17da47c8038176a79092a89cb2b7d945d060"}, {file = "coverage-7.6.3-cp313-cp313-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4c59d6a4a4633fad297f943c03d0d2569867bd5372eb5684befdff8df8522e39"},
{file = "coverage-7.6.1-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:a78d169acd38300060b28d600344a803628c3fd585c912cacc9ea8790fe96862"}, {file = "coverage-7.6.3-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:f263b18692f8ed52c8de7f40a0751e79015983dbd77b16906e5b310a39d3ca21"},
{file = "coverage-7.6.1-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:2c09f4ce52cb99dd7505cd0fc8e0e37c77b87f46bc9c1eb03fe3bc9991085388"}, {file = "coverage-7.6.3-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:79644f68a6ff23b251cae1c82b01a0b51bc40c8468ca9585c6c4b1aeee570e0b"},
{file = "coverage-7.6.1-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:6878ef48d4227aace338d88c48738a4258213cd7b74fd9a3d4d7582bb1d8a155"}, {file = "coverage-7.6.3-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:71967c35828c9ff94e8c7d405469a1fb68257f686bca7c1ed85ed34e7c2529c4"},
{file = "coverage-7.6.1-cp313-cp313-win32.whl", hash = "sha256:44df346d5215a8c0e360307d46ffaabe0f5d3502c8a1cefd700b34baf31d411a"}, {file = "coverage-7.6.3-cp313-cp313-win32.whl", hash = "sha256:e266af4da2c1a4cbc6135a570c64577fd3e6eb204607eaff99d8e9b710003c6f"},
{file = "coverage-7.6.1-cp313-cp313-win_amd64.whl", hash = "sha256:8284cf8c0dd272a247bc154eb6c95548722dce90d098c17a883ed36e67cdb129"}, {file = "coverage-7.6.3-cp313-cp313-win_amd64.whl", hash = "sha256:ea52bd218d4ba260399a8ae4bb6b577d82adfc4518b93566ce1fddd4a49d1dce"},
{file = "coverage-7.6.1-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:d3296782ca4eab572a1a4eca686d8bfb00226300dcefdf43faa25b5242ab8a3e"}, {file = "coverage-7.6.3-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:8d4c6ea0f498c7c79111033a290d060c517853a7bcb2f46516f591dab628ddd3"},
{file = "coverage-7.6.1-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:502753043567491d3ff6d08629270127e0c31d4184c4c8d98f92c26f65019962"}, {file = "coverage-7.6.3-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:331b200ad03dbaa44151d74daeb7da2cf382db424ab923574f6ecca7d3b30de3"},
{file = "coverage-7.6.1-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6a89ecca80709d4076b95f89f308544ec8f7b4727e8a547913a35f16717856cb"}, {file = "coverage-7.6.3-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:54356a76b67cf8a3085818026bb556545ebb8353951923b88292556dfa9f812d"},
{file = "coverage-7.6.1-cp313-cp313t-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a318d68e92e80af8b00fa99609796fdbcdfef3629c77c6283566c6f02c6d6704"}, {file = "coverage-7.6.3-cp313-cp313t-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ebec65f5068e7df2d49466aab9128510c4867e532e07cb6960075b27658dca38"},
{file = "coverage-7.6.1-cp313-cp313t-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:13b0a73a0896988f053e4fbb7de6d93388e6dd292b0d87ee51d106f2c11b465b"}, {file = "coverage-7.6.3-cp313-cp313t-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d33a785ea8354c480515e781554d3be582a86297e41ccbea627a5c632647f2cd"},
{file = "coverage-7.6.1-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:4421712dbfc5562150f7554f13dde997a2e932a6b5f352edcce948a815efee6f"}, {file = "coverage-7.6.3-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:f7ddb920106bbbbcaf2a274d56f46956bf56ecbde210d88061824a95bdd94e92"},
{file = "coverage-7.6.1-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:166811d20dfea725e2e4baa71fffd6c968a958577848d2131f39b60043400223"}, {file = "coverage-7.6.3-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:70d24936ca6c15a3bbc91ee9c7fc661132c6f4c9d42a23b31b6686c05073bde5"},
{file = "coverage-7.6.1-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:225667980479a17db1048cb2bf8bfb39b8e5be8f164b8f6628b64f78a72cf9d3"}, {file = "coverage-7.6.3-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:c30e42ea11badb147f0d2e387115b15e2bd8205a5ad70d6ad79cf37f6ac08c91"},
{file = "coverage-7.6.1-cp313-cp313t-win32.whl", hash = "sha256:170d444ab405852903b7d04ea9ae9b98f98ab6d7e63e1115e82620807519797f"}, {file = "coverage-7.6.3-cp313-cp313t-win32.whl", hash = "sha256:365defc257c687ce3e7d275f39738dcd230777424117a6c76043459db131dd43"},
{file = "coverage-7.6.1-cp313-cp313t-win_amd64.whl", hash = "sha256:b9f222de8cded79c49bf184bdbc06630d4c58eec9459b939b4a690c82ed05657"}, {file = "coverage-7.6.3-cp313-cp313t-win_amd64.whl", hash = "sha256:23bb63ae3f4c645d2d82fa22697364b0046fbafb6261b258a58587441c5f7bd0"},
{file = "coverage-7.6.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:6db04803b6c7291985a761004e9060b2bca08da6d04f26a7f2294b8623a0c1a0"}, {file = "coverage-7.6.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:da29ceabe3025a1e5a5aeeb331c5b1af686daab4ff0fb4f83df18b1180ea83e2"},
{file = "coverage-7.6.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:f1adfc8ac319e1a348af294106bc6a8458a0f1633cc62a1446aebc30c5fa186a"}, {file = "coverage-7.6.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:df8c05a0f574d480947cba11b947dc41b1265d721c3777881da2fb8d3a1ddfba"},
{file = "coverage-7.6.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a95324a9de9650a729239daea117df21f4b9868ce32e63f8b650ebe6cef5595b"}, {file = "coverage-7.6.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ec1e3b40b82236d100d259854840555469fad4db64f669ab817279eb95cd535c"},
{file = "coverage-7.6.1-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b43c03669dc4618ec25270b06ecd3ee4fa94c7f9b3c14bae6571ca00ef98b0d3"}, {file = "coverage-7.6.3-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b4adeb878a374126f1e5cf03b87f66279f479e01af0e9a654cf6d1509af46c40"},
{file = "coverage-7.6.1-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8929543a7192c13d177b770008bc4e8119f2e1f881d563fc6b6305d2d0ebe9de"}, {file = "coverage-7.6.3-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:43d6a66e33b1455b98fc7312b124296dad97a2e191c80320587234a77b1b736e"},
{file = "coverage-7.6.1-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:a09ece4a69cf399510c8ab25e0950d9cf2b42f7b3cb0374f95d2e2ff594478a6"}, {file = "coverage-7.6.3-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:1990b1f4e2c402beb317840030bb9f1b6a363f86e14e21b4212e618acdfce7f6"},
{file = "coverage-7.6.1-cp38-cp38-musllinux_1_2_i686.whl", hash = "sha256:9054a0754de38d9dbd01a46621636689124d666bad1936d76c0341f7d71bf569"}, {file = "coverage-7.6.3-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:12f9515d875859faedb4144fd38694a761cd2a61ef9603bf887b13956d0bbfbb"},
{file = "coverage-7.6.1-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:0dbde0f4aa9a16fa4d754356a8f2e36296ff4d83994b2c9d8398aa32f222f989"}, {file = "coverage-7.6.3-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:99ded130555c021d99729fabd4ddb91a6f4cc0707df4b1daf912c7850c373b13"},
{file = "coverage-7.6.1-cp38-cp38-win32.whl", hash = "sha256:da511e6ad4f7323ee5702e6633085fb76c2f893aaf8ce4c51a0ba4fc07580ea7"}, {file = "coverage-7.6.3-cp39-cp39-win32.whl", hash = "sha256:c3a79f56dee9136084cf84a6c7c4341427ef36e05ae6415bf7d787c96ff5eaa3"},
{file = "coverage-7.6.1-cp38-cp38-win_amd64.whl", hash = "sha256:3f1156e3e8f2872197af3840d8ad307a9dd18e615dc64d9ee41696f287c57ad8"}, {file = "coverage-7.6.3-cp39-cp39-win_amd64.whl", hash = "sha256:aac7501ae73d4a02f4b7ac8fcb9dc55342ca98ffb9ed9f2dfb8a25d53eda0e4d"},
{file = "coverage-7.6.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:abd5fd0db5f4dc9289408aaf34908072f805ff7792632250dcb36dc591d24255"}, {file = "coverage-7.6.3-pp39.pp310-none-any.whl", hash = "sha256:b9853509b4bf57ba7b1f99b9d866c422c9c5248799ab20e652bbb8a184a38181"},
{file = "coverage-7.6.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:547f45fa1a93154bd82050a7f3cddbc1a7a4dd2a9bf5cb7d06f4ae29fe94eaf8"}, {file = "coverage-7.6.3.tar.gz", hash = "sha256:bb7d5fe92bd0dc235f63ebe9f8c6e0884f7360f88f3411bfed1350c872ef2054"},
{file = "coverage-7.6.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:645786266c8f18a931b65bfcefdbf6952dd0dea98feee39bd188607a9d307ed2"},
{file = "coverage-7.6.1-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9e0b2df163b8ed01d515807af24f63de04bebcecbd6c3bfeff88385789fdf75a"},
{file = "coverage-7.6.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:609b06f178fe8e9f89ef676532760ec0b4deea15e9969bf754b37f7c40326dbc"},
{file = "coverage-7.6.1-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:702855feff378050ae4f741045e19a32d57d19f3e0676d589df0575008ea5004"},
{file = "coverage-7.6.1-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:2bdb062ea438f22d99cba0d7829c2ef0af1d768d1e4a4f528087224c90b132cb"},
{file = "coverage-7.6.1-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:9c56863d44bd1c4fe2abb8a4d6f5371d197f1ac0ebdee542f07f35895fc07f36"},
{file = "coverage-7.6.1-cp39-cp39-win32.whl", hash = "sha256:6e2cd258d7d927d09493c8df1ce9174ad01b381d4729a9d8d4e38670ca24774c"},
{file = "coverage-7.6.1-cp39-cp39-win_amd64.whl", hash = "sha256:06a737c882bd26d0d6ee7269b20b12f14a8704807a01056c80bb881a4b2ce6ca"},
{file = "coverage-7.6.1-pp38.pp39.pp310-none-any.whl", hash = "sha256:e9a6e0eb86070e8ccaedfbd9d38fec54864f3125ab95419970575b42af7541df"},
{file = "coverage-7.6.1.tar.gz", hash = "sha256:953510dfb7b12ab69d20135a0662397f077c59b1e6379a768e97c59d852ee51d"},
] ]
[package.extras] [package.extras]
@ -159,13 +149,13 @@ format-nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-
[[package]] [[package]]
name = "jsonschema-specifications" name = "jsonschema-specifications"
version = "2023.12.1" version = "2024.10.1"
description = "The JSON Schema meta-schemas and vocabularies, exposed as a Registry" description = "The JSON Schema meta-schemas and vocabularies, exposed as a Registry"
optional = false optional = false
python-versions = ">=3.8" python-versions = ">=3.9"
files = [ files = [
{file = "jsonschema_specifications-2023.12.1-py3-none-any.whl", hash = "sha256:87e4fdf3a94858b8a2ba2778d9ba57d8a9cafca7c7489c46ba0d30a8bc6a9c3c"}, {file = "jsonschema_specifications-2024.10.1-py3-none-any.whl", hash = "sha256:a09a0680616357d9a0ecf05c12ad234479f549239d0f5b55f3deea67475da9bf"},
{file = "jsonschema_specifications-2023.12.1.tar.gz", hash = "sha256:48a76787b3e70f5ed53f1160d2b81f586e4ca6d1548c5de7085d1682674764cc"}, {file = "jsonschema_specifications-2024.10.1.tar.gz", hash = "sha256:0f38b83639958ce1152d02a7f062902c41c8fd20d558b0c34344292d417ae272"},
] ]
[package.dependencies] [package.dependencies]
@ -382,5 +372,5 @@ files = [
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.11" python-versions = "^3.12.5"
content-hash = "714de2b0053908d6b99066dc538a65b430eaada1e84fe294d019f7b76355a483" content-hash = "48fccebf6b6253d7e85eb11a28b384d89f26bfd2b3b9535a68452c6d2968d8d3"

View File

@ -457,7 +457,7 @@ async def test_dht_write_read_local():
await rc0.set_dht_value(desc.key, ValueSubkey(0), TEST_DATA) await rc0.set_dht_value(desc.key, ValueSubkey(0), TEST_DATA)
await rc0.set_dht_value(desc.key, ValueSubkey(1), TEST_DATA2) await rc0.set_dht_value(desc.key, ValueSubkey(1), TEST_DATA2)
print(f' {n}') print(f' {n}: {desc.key} {desc.owner}:{desc.owner_secret}')
print('syncing records to the network') print('syncing records to the network')
@ -476,6 +476,9 @@ async def test_dht_write_read_local():
print(f' {len(syncrecords)} records {subkeysleft} subkeys left') print(f' {len(syncrecords)} records {subkeysleft} subkeys left')
time.sleep(1) time.sleep(1)
for desc0 in records:
await rc0.close_dht_record(desc0.key)
await api0.debug("record purge local") await api0.debug("record purge local")
await api0.debug("record purge remote") await api0.debug("record purge remote")
@ -483,7 +486,6 @@ async def test_dht_write_read_local():
print(f'reading {COUNT} records') print(f'reading {COUNT} records')
n = 0 n = 0
for desc0 in records: for desc0 in records:
await rc0.close_dht_record(desc0.key)
desc1 = await rc0.open_dht_record(desc0.key) desc1 = await rc0.open_dht_record(desc0.key)
vd0 = await rc0.get_dht_value(desc1.key, ValueSubkey(0), force_refresh=True) vd0 = await rc0.get_dht_value(desc1.key, ValueSubkey(0), force_refresh=True)

View File

@ -863,7 +863,7 @@ impl Settings {
pub fn get_default_config_path(subpath: &str) -> PathBuf { pub fn get_default_config_path(subpath: &str) -> PathBuf {
#[cfg(unix)] #[cfg(unix)]
{ {
let globalpath = PathBuf::from("/var/db/veilid-server"); let globalpath = PathBuf::from("/etc/veilid-server");
if globalpath.exists() { if globalpath.exists() {
return globalpath.join(subpath); return globalpath.join(subpath);
@ -910,7 +910,6 @@ impl Settings {
true true
} }
#[cfg_attr(windows, expect(dead_code))]
fn get_default_directory(subpath: &str) -> PathBuf { fn get_default_directory(subpath: &str) -> PathBuf {
#[cfg(unix)] #[cfg(unix)]
{ {