protect route hops + refactor

This commit is contained in:
Christien Rioux 2023-10-25 22:32:06 -04:00
parent c70c260bb8
commit b964ddb6eb
50 changed files with 286 additions and 701 deletions

View File

@ -4,7 +4,7 @@ use network_manager::*;
use routing_table::*; use routing_table::*;
use storage_manager::*; use storage_manager::*;
pub struct AttachmentManagerInner { struct AttachmentManagerInner {
last_attachment_state: AttachmentState, last_attachment_state: AttachmentState,
last_routing_table_health: Option<RoutingTableHealth>, last_routing_table_health: Option<RoutingTableHealth>,
maintain_peers: bool, maintain_peers: bool,
@ -13,13 +13,13 @@ pub struct AttachmentManagerInner {
attachment_maintainer_jh: Option<MustJoinHandle<()>>, attachment_maintainer_jh: Option<MustJoinHandle<()>>,
} }
pub struct AttachmentManagerUnlockedInner { struct AttachmentManagerUnlockedInner {
config: VeilidConfig, config: VeilidConfig,
network_manager: NetworkManager, network_manager: NetworkManager,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct AttachmentManager { pub(crate) struct AttachmentManager {
inner: Arc<Mutex<AttachmentManagerInner>>, inner: Arc<Mutex<AttachmentManagerInner>>,
unlocked_inner: Arc<AttachmentManagerUnlockedInner>, unlocked_inner: Arc<AttachmentManagerUnlockedInner>,
} }
@ -38,7 +38,6 @@ impl AttachmentManager {
network_manager: NetworkManager::new( network_manager: NetworkManager::new(
config, config,
storage_manager, storage_manager,
protected_store,
table_store, table_store,
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
block_store, block_store,

View File

@ -199,7 +199,7 @@ impl ServicesContext {
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
/// ///
pub struct VeilidCoreContext { pub(crate) struct VeilidCoreContext {
pub config: VeilidConfig, pub config: VeilidConfig,
pub update_callback: UpdateCallback, pub update_callback: UpdateCallback,
// Services // Services

View File

@ -49,7 +49,6 @@ mod core_context;
mod crypto; mod crypto;
mod intf; mod intf;
mod network_manager; mod network_manager;
mod receipt_manager;
mod routing_table; mod routing_table;
mod rpc_processor; mod rpc_processor;
mod storage_manager; mod storage_manager;

View File

@ -69,7 +69,7 @@ impl fmt::Debug for AddressFilterUnlockedInner {
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct AddressFilter { pub(crate) struct AddressFilter {
unlocked_inner: Arc<AddressFilterUnlockedInner>, unlocked_inner: Arc<AddressFilterUnlockedInner>,
inner: Arc<Mutex<AddressFilterInner>>, inner: Arc<Mutex<AddressFilterInner>>,
} }

View File

@ -37,7 +37,7 @@ impl core::fmt::Debug for ConnectionManagerArc {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ConnectionManager { pub(crate) struct ConnectionManager {
arc: Arc<ConnectionManagerArc>, arc: Arc<ConnectionManagerArc>,
} }
@ -139,10 +139,12 @@ impl ConnectionManager {
} }
// Internal routine to see if we should keep this connection // Internal routine to see if we should keep this connection
// from being LRU removed. Used on our initiated relay connections. // from being LRU removed. Used on our initiated relay connections and allocated routes
fn should_protect_connection(&self, conn: &NetworkConnection) -> bool { fn should_protect_connection(&self, conn: &NetworkConnection) -> bool {
let netman = self.network_manager(); let netman = self.network_manager();
let routing_table = netman.routing_table(); let routing_table = netman.routing_table();
// See if this is a relay connection
let remote_address = conn.connection_descriptor().remote_address().address(); let remote_address = conn.connection_descriptor().remote_address().address();
let Some(routing_domain) = routing_table.routing_domain_for_address(remote_address) else { let Some(routing_domain) = routing_table.routing_domain_for_address(remote_address) else {
return false; return false;
@ -162,6 +164,7 @@ impl ConnectionManager {
return true; return true;
} }
} }
false false
} }
@ -233,13 +236,19 @@ impl ConnectionManager {
} }
// Returns a network connection if one already is established // Returns a network connection if one already is established
//#[instrument(level = "trace", skip(self), ret)]
pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option<ConnectionHandle> { pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option<ConnectionHandle> {
self.arc self.arc
.connection_table .connection_table
.get_connection_by_descriptor(descriptor) .get_connection_by_descriptor(descriptor)
} }
// Protects a network connection if one already is established
pub fn protect_connection(&self, descriptor: ConnectionDescriptor) -> bool {
self.arc
.connection_table
.protect_connection_by_descriptor(descriptor)
}
/// Called when we want to create a new connection or get the current one that already exists /// Called when we want to create a new connection or get the current one that already exists
/// This will kill off any connections that are in conflict with the new connection to be made /// This will kill off any connections that are in conflict with the new connection to be made
/// in order to make room for the new connection in the system's connection table /// in order to make room for the new connection in the system's connection table

View File

@ -4,7 +4,7 @@ use hashlink::LruCache;
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
#[derive(ThisError, Debug)] #[derive(ThisError, Debug)]
pub enum ConnectionTableAddError { pub(in crate::network_manager) enum ConnectionTableAddError {
#[error("Connection already added to table")] #[error("Connection already added to table")]
AlreadyExists(NetworkConnection), AlreadyExists(NetworkConnection),
#[error("Connection address was filtered")] #[error("Connection address was filtered")]
@ -23,7 +23,7 @@ impl ConnectionTableAddError {
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
#[derive(Debug)] #[derive(Debug)]
pub struct ConnectionTableInner { struct ConnectionTableInner {
max_connections: Vec<usize>, max_connections: Vec<usize>,
conn_by_id: Vec<LruCache<NetworkConnectionId, NetworkConnection>>, conn_by_id: Vec<LruCache<NetworkConnectionId, NetworkConnection>>,
protocol_index_by_id: BTreeMap<NetworkConnectionId, usize>, protocol_index_by_id: BTreeMap<NetworkConnectionId, usize>,
@ -33,7 +33,7 @@ pub struct ConnectionTableInner {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct ConnectionTable { pub(in crate::network_manager) struct ConnectionTable {
inner: Arc<Mutex<ConnectionTableInner>>, inner: Arc<Mutex<ConnectionTableInner>>,
} }
@ -218,6 +218,18 @@ impl ConnectionTable {
Some(out.get_handle()) Some(out.get_handle())
} }
//#[instrument(level = "trace", skip(self), ret)]
#[allow(dead_code)]
pub fn protect_connection_by_id(&self, id: NetworkConnectionId) -> bool {
let mut inner = self.inner.lock();
let Some(protocol_index) = inner.protocol_index_by_id.get(&id).copied() else {
return false;
};
let out = inner.conn_by_id[protocol_index].get_mut(&id).unwrap();
out.protect();
true
}
//#[instrument(level = "trace", skip(self), ret)] //#[instrument(level = "trace", skip(self), ret)]
pub fn get_connection_by_descriptor( pub fn get_connection_by_descriptor(
&self, &self,
@ -231,6 +243,19 @@ impl ConnectionTable {
Some(out.get_handle()) Some(out.get_handle())
} }
//#[instrument(level = "trace", skip(self), ret)]
pub fn protect_connection_by_descriptor(&self, descriptor: ConnectionDescriptor) -> bool {
let mut inner = self.inner.lock();
let Some(id) = inner.id_by_descriptor.get(&descriptor).copied() else {
return false;
};
let protocol_index = Self::protocol_to_index(descriptor.protocol_type());
let out = inner.conn_by_id[protocol_index].get_mut(&id).unwrap();
out.protect();
true
}
// #[instrument(level = "trace", skip(self), ret)] // #[instrument(level = "trace", skip(self), ret)]
pub fn get_best_connection_by_remote( pub fn get_best_connection_by_remote(
&self, &self,

View File

@ -11,6 +11,7 @@ mod connection_manager;
mod connection_table; mod connection_table;
mod direct_boot; mod direct_boot;
mod network_connection; mod network_connection;
mod receipt_manager;
mod send_data; mod send_data;
mod stats; mod stats;
mod tasks; mod tasks;
@ -21,11 +22,11 @@ pub mod tests;
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
pub use connection_manager::*; pub(crate) use connection_manager::*;
pub use direct_boot::*; pub(crate) use network_connection::*;
pub use network_connection::*; pub(crate) use receipt_manager::*;
pub use send_data::*; pub(crate) use stats::*;
pub use stats::*;
pub use types::*; pub use types::*;
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
@ -34,12 +35,10 @@ use connection_handle::*;
use crypto::*; use crypto::*;
use futures_util::stream::FuturesUnordered; use futures_util::stream::FuturesUnordered;
use hashlink::LruCache; use hashlink::LruCache;
use intf::*;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use native::*; use native::*;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub use native::{LOCAL_NETWORK_CAPABILITIES, MAX_CAPABILITIES, PUBLIC_INTERNET_CAPABILITIES}; pub use native::{LOCAL_NETWORK_CAPABILITIES, MAX_CAPABILITIES, PUBLIC_INTERNET_CAPABILITIES};
use receipt_manager::*;
use routing_table::*; use routing_table::*;
use rpc_processor::*; use rpc_processor::*;
use storage_manager::*; use storage_manager::*;
@ -90,11 +89,14 @@ struct ClientWhitelistEntry {
last_seen_ts: Timestamp, last_seen_ts: Timestamp,
} }
#[derive(Copy, Clone, Debug)] #[derive(Clone, Debug)]
pub enum SendDataKind { pub(crate) struct SendDataMethod {
Direct(ConnectionDescriptor), /// How the data was sent, possibly to a relay
Indirect, pub contact_method: NodeContactMethod,
Existing(ConnectionDescriptor), /// Pre-relayed contact method
pub opt_relayed_contact_method: Option<NodeContactMethod>,
/// The connection used to send the data
pub connection_descriptor: ConnectionDescriptor,
} }
/// Mechanism required to contact another node /// Mechanism required to contact another node
@ -141,7 +143,6 @@ struct NetworkManagerUnlockedInner {
// Handles // Handles
config: VeilidConfig, config: VeilidConfig,
storage_manager: StorageManager, storage_manager: StorageManager,
protected_store: ProtectedStore,
table_store: TableStore, table_store: TableStore,
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
block_store: BlockStore, block_store: BlockStore,
@ -160,7 +161,7 @@ struct NetworkManagerUnlockedInner {
} }
#[derive(Clone)] #[derive(Clone)]
pub struct NetworkManager { pub(crate) struct NetworkManager {
inner: Arc<Mutex<NetworkManagerInner>>, inner: Arc<Mutex<NetworkManagerInner>>,
unlocked_inner: Arc<NetworkManagerUnlockedInner>, unlocked_inner: Arc<NetworkManagerUnlockedInner>,
} }
@ -178,7 +179,6 @@ impl NetworkManager {
fn new_unlocked_inner( fn new_unlocked_inner(
config: VeilidConfig, config: VeilidConfig,
storage_manager: StorageManager, storage_manager: StorageManager,
protected_store: ProtectedStore,
table_store: TableStore, table_store: TableStore,
#[cfg(feature = "unstable-blockstore")] block_store: BlockStore, #[cfg(feature = "unstable-blockstore")] block_store: BlockStore,
crypto: Crypto, crypto: Crypto,
@ -187,7 +187,6 @@ impl NetworkManager {
NetworkManagerUnlockedInner { NetworkManagerUnlockedInner {
config: config.clone(), config: config.clone(),
storage_manager, storage_manager,
protected_store,
table_store, table_store,
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
block_store, block_store,
@ -206,7 +205,6 @@ impl NetworkManager {
pub fn new( pub fn new(
config: VeilidConfig, config: VeilidConfig,
storage_manager: StorageManager, storage_manager: StorageManager,
protected_store: ProtectedStore,
table_store: TableStore, table_store: TableStore,
#[cfg(feature = "unstable-blockstore")] block_store: BlockStore, #[cfg(feature = "unstable-blockstore")] block_store: BlockStore,
crypto: Crypto, crypto: Crypto,
@ -243,7 +241,6 @@ impl NetworkManager {
unlocked_inner: Arc::new(Self::new_unlocked_inner( unlocked_inner: Arc::new(Self::new_unlocked_inner(
config, config,
storage_manager, storage_manager,
protected_store,
table_store, table_store,
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
block_store, block_store,
@ -268,9 +265,6 @@ impl NetworkManager {
pub fn storage_manager(&self) -> StorageManager { pub fn storage_manager(&self) -> StorageManager {
self.unlocked_inner.storage_manager.clone() self.unlocked_inner.storage_manager.clone()
} }
pub fn protected_store(&self) -> ProtectedStore {
self.unlocked_inner.protected_store.clone()
}
pub fn table_store(&self) -> TableStore { pub fn table_store(&self) -> TableStore {
self.unlocked_inner.table_store.clone() self.unlocked_inner.table_store.clone()
} }
@ -297,7 +291,7 @@ impl NetworkManager {
.unwrap() .unwrap()
.clone() .clone()
} }
pub fn net(&self) -> Network { fn net(&self) -> Network {
self.unlocked_inner self.unlocked_inner
.components .components
.read() .read()
@ -306,6 +300,15 @@ impl NetworkManager {
.net .net
.clone() .clone()
} }
fn receipt_manager(&self) -> ReceiptManager {
self.unlocked_inner
.components
.read()
.as_ref()
.unwrap()
.receipt_manager
.clone()
}
pub fn rpc_processor(&self) -> RPCProcessor { pub fn rpc_processor(&self) -> RPCProcessor {
self.unlocked_inner self.unlocked_inner
.components .components
@ -315,15 +318,6 @@ impl NetworkManager {
.rpc_processor .rpc_processor
.clone() .clone()
} }
pub fn receipt_manager(&self) -> ReceiptManager {
self.unlocked_inner
.components
.read()
.as_ref()
.unwrap()
.receipt_manager
.clone()
}
pub fn connection_manager(&self) -> ConnectionManager { pub fn connection_manager(&self) -> ConnectionManager {
self.unlocked_inner self.unlocked_inner
.components .components
@ -814,7 +808,7 @@ impl NetworkManager {
node_ref: NodeRef, node_ref: NodeRef,
destination_node_ref: Option<NodeRef>, destination_node_ref: Option<NodeRef>,
body: B, body: B,
) -> EyreResult<NetworkResult<SendDataKind>> { ) -> EyreResult<NetworkResult<SendDataMethod>> {
let destination_node_ref = destination_node_ref.as_ref().unwrap_or(&node_ref).clone(); let destination_node_ref = destination_node_ref.as_ref().unwrap_or(&node_ref).clone();
let best_node_id = destination_node_ref.best_node_id(); let best_node_id = destination_node_ref.best_node_id();
@ -1116,4 +1110,8 @@ impl NetworkManager {
// Inform caller that we dealt with the envelope locally // Inform caller that we dealt with the envelope locally
Ok(true) Ok(true)
} }
pub fn debug_restart_network(&self) {
self.net().restart_network();
}
} }

View File

@ -49,12 +49,12 @@ struct DiscoveryContextUnlockedInner {
} }
#[derive(Clone)] #[derive(Clone)]
pub struct DiscoveryContext { pub(super) struct DiscoveryContext {
unlocked_inner: Arc<DiscoveryContextUnlockedInner>, unlocked_inner: Arc<DiscoveryContextUnlockedInner>,
inner: Arc<Mutex<DiscoveryContextInner>>, inner: Arc<Mutex<DiscoveryContextInner>>,
} }
pub type ClearNetworkCallback = Arc<dyn Fn() -> SendPinBoxFuture<()> + Send + Sync>; pub(super) type ClearNetworkCallback = Arc<dyn Fn() -> SendPinBoxFuture<()> + Send + Sync>;
impl DiscoveryContext { impl DiscoveryContext {
pub fn new( pub fn new(
@ -135,7 +135,7 @@ impl DiscoveryContext {
async fn request_public_address(&self, node_ref: NodeRef) -> Option<SocketAddress> { async fn request_public_address(&self, node_ref: NodeRef) -> Option<SocketAddress> {
let rpc = self.unlocked_inner.routing_table.rpc_processor(); let rpc = self.unlocked_inner.routing_table.rpc_processor();
let res = network_result_value_or_log!(match rpc.rpc_call_status(Destination::direct(node_ref.clone())).await { let res = network_result_value_or_log!(match rpc.rpc_call_status(Destination::direct(node_ref.clone()), false).await {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
log_net!(error log_net!(error

View File

@ -14,7 +14,7 @@ use network_tcp::*;
use protocol::tcp::RawTcpProtocolHandler; use protocol::tcp::RawTcpProtocolHandler;
use protocol::udp::RawUdpProtocolHandler; use protocol::udp::RawUdpProtocolHandler;
use protocol::ws::WebsocketProtocolHandler; use protocol::ws::WebsocketProtocolHandler;
pub use protocol::*; pub(in crate::network_manager) use protocol::*;
use async_tls::TlsAcceptor; use async_tls::TlsAcceptor;
use futures_util::StreamExt; use futures_util::StreamExt;
@ -137,7 +137,7 @@ struct NetworkUnlockedInner {
} }
#[derive(Clone)] #[derive(Clone)]
pub struct Network { pub(in crate::network_manager) struct Network {
config: VeilidConfig, config: VeilidConfig,
inner: Arc<Mutex<NetworkInner>>, inner: Arc<Mutex<NetworkInner>>,
unlocked_inner: Arc<NetworkUnlockedInner>, unlocked_inner: Arc<NetworkUnlockedInner>,

View File

@ -6,7 +6,7 @@ use stop_token::future::FutureExt;
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
#[derive(Clone)] #[derive(Clone)]
pub struct ListenerState { pub(in crate::network_manager) struct ListenerState {
pub protocol_accept_handlers: Vec<Box<dyn ProtocolAcceptHandler + 'static>>, pub protocol_accept_handlers: Vec<Box<dyn ProtocolAcceptHandler + 'static>>,
pub tls_protocol_handlers: Vec<Box<dyn ProtocolAcceptHandler + 'static>>, pub tls_protocol_handlers: Vec<Box<dyn ProtocolAcceptHandler + 'static>>,
pub tls_acceptor: Option<TlsAcceptor>, pub tls_acceptor: Option<TlsAcceptor>,

View File

@ -8,7 +8,7 @@ use super::*;
use std::io; use std::io;
#[derive(Debug)] #[derive(Debug)]
pub enum ProtocolNetworkConnection { pub(in crate::network_manager) enum ProtocolNetworkConnection {
Dummy(DummyNetworkConnection), Dummy(DummyNetworkConnection),
RawTcp(tcp::RawTcpNetworkConnection), RawTcp(tcp::RawTcpNetworkConnection),
WsAccepted(ws::WebSocketNetworkConnectionAccepted), WsAccepted(ws::WebSocketNetworkConnectionAccepted),

View File

@ -112,7 +112,7 @@ impl RawTcpNetworkConnection {
/// ///
#[derive(Clone)] #[derive(Clone)]
pub struct RawTcpProtocolHandler pub(in crate::network_manager) struct RawTcpProtocolHandler
where where
Self: ProtocolAcceptHandler, Self: ProtocolAcceptHandler,
{ {

View File

@ -2,7 +2,7 @@ use super::*;
use sockets::*; use sockets::*;
#[derive(Clone)] #[derive(Clone)]
pub struct RawUdpProtocolHandler { pub(in crate::network_manager) struct RawUdpProtocolHandler {
socket: Arc<UdpSocket>, socket: Arc<UdpSocket>,
assembly_buffer: AssemblyBuffer, assembly_buffer: AssemblyBuffer,
address_filter: Option<AddressFilter>, address_filter: Option<AddressFilter>,

View File

@ -162,7 +162,7 @@ struct WebsocketProtocolHandlerArc {
} }
#[derive(Clone)] #[derive(Clone)]
pub struct WebsocketProtocolHandler pub(in crate::network_manager) struct WebsocketProtocolHandler
where where
Self: ProtocolAcceptHandler, Self: ProtocolAcceptHandler,
{ {

View File

@ -11,7 +11,7 @@ cfg_if::cfg_if! {
/////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////
// Accept // Accept
pub trait ProtocolAcceptHandler: ProtocolAcceptHandlerClone + Send + Sync { pub(in crate::network_manager) trait ProtocolAcceptHandler: ProtocolAcceptHandlerClone + Send + Sync {
fn on_accept( fn on_accept(
&self, &self,
stream: AsyncPeekStream, stream: AsyncPeekStream,
@ -20,7 +20,7 @@ cfg_if::cfg_if! {
) -> SendPinBoxFuture<io::Result<Option<ProtocolNetworkConnection>>>; ) -> SendPinBoxFuture<io::Result<Option<ProtocolNetworkConnection>>>;
} }
pub trait ProtocolAcceptHandlerClone { pub(in crate::network_manager) trait ProtocolAcceptHandlerClone {
fn clone_box(&self) -> Box<dyn ProtocolAcceptHandler>; fn clone_box(&self) -> Box<dyn ProtocolAcceptHandler>;
} }
@ -38,7 +38,7 @@ cfg_if::cfg_if! {
} }
} }
pub type NewProtocolAcceptHandler = pub(in crate::network_manager) type NewProtocolAcceptHandler =
dyn Fn(VeilidConfig, bool) -> Box<dyn ProtocolAcceptHandler> + Send; dyn Fn(VeilidConfig, bool) -> Box<dyn ProtocolAcceptHandler> + Send;
} }
} }

View File

@ -10,11 +10,11 @@ impl NetworkManager {
/// could be established. /// could be established.
/// ///
/// Sending to a node requires determining a NetworkClass compatible mechanism /// Sending to a node requires determining a NetworkClass compatible mechanism
pub fn send_data( pub(crate) fn send_data(
&self, &self,
destination_node_ref: NodeRef, destination_node_ref: NodeRef,
data: Vec<u8>, data: Vec<u8>,
) -> SendPinBoxFuture<EyreResult<NetworkResult<SendDataKind>>> { ) -> SendPinBoxFuture<EyreResult<NetworkResult<SendDataMethod>>> {
let this = self.clone(); let this = self.clone();
Box::pin( Box::pin(
async move { async move {
@ -31,9 +31,11 @@ impl NetworkManager {
destination_node_ref destination_node_ref
.set_last_connection(connection_descriptor, get_aligned_timestamp()); .set_last_connection(connection_descriptor, get_aligned_timestamp());
return Ok(NetworkResult::value(SendDataKind::Existing( return Ok(NetworkResult::value(SendDataMethod {
opt_relayed_contact_method: None,
contact_method: NodeContactMethod::Existing,
connection_descriptor, connection_descriptor,
))); }));
} }
Some(data) => { Some(data) => {
// Couldn't send data to existing connection // Couldn't send data to existing connection
@ -49,16 +51,16 @@ impl NetworkManager {
// No existing connection was found or usable, so we proceed to see how to make a new one // No existing connection was found or usable, so we proceed to see how to make a new one
// Get the best way to contact this node // Get the best way to contact this node
let contact_method = this.get_node_contact_method(destination_node_ref.clone())?; let possibly_relayed_contact_method = this.get_node_contact_method(destination_node_ref.clone())?;
// If we need to relay, do it // If we need to relay, do it
let (contact_method, target_node_ref, relayed) = match contact_method { let (contact_method, target_node_ref, opt_relayed_contact_method) = match possibly_relayed_contact_method.clone() {
NodeContactMethod::OutboundRelay(relay_nr) NodeContactMethod::OutboundRelay(relay_nr)
| NodeContactMethod::InboundRelay(relay_nr) => { | NodeContactMethod::InboundRelay(relay_nr) => {
let cm = this.get_node_contact_method(relay_nr.clone())?; let cm = this.get_node_contact_method(relay_nr.clone())?;
(cm, relay_nr, true) (cm, relay_nr, Some(possibly_relayed_contact_method))
} }
cm => (cm, destination_node_ref.clone(), false), cm => (cm, destination_node_ref.clone(), None),
}; };
#[cfg(feature = "verbose-tracing")] #[cfg(feature = "verbose-tracing")]
@ -68,7 +70,7 @@ impl NetworkManager {
); );
// Try the contact method // Try the contact method
let sdk = match contact_method { let mut send_data_method = match contact_method {
NodeContactMethod::OutboundRelay(relay_nr) => { NodeContactMethod::OutboundRelay(relay_nr) => {
// Relay loop or multiple relays // Relay loop or multiple relays
bail!( bail!(
@ -117,11 +119,9 @@ impl NetworkManager {
) )
} }
}; };
send_data_method.opt_relayed_contact_method = opt_relayed_contact_method;
if relayed { Ok(NetworkResult::value(send_data_method))
return Ok(NetworkResult::value(SendDataKind::Indirect));
}
Ok(NetworkResult::value(sdk))
} }
.instrument(trace_span!("send_data")), .instrument(trace_span!("send_data")),
) )
@ -132,7 +132,7 @@ impl NetworkManager {
&self, &self,
target_node_ref: NodeRef, target_node_ref: NodeRef,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<SendDataKind>> { ) -> EyreResult<NetworkResult<SendDataMethod>> {
// First try to send data to the last connection we've seen this peer on // First try to send data to the last connection we've seen this peer on
let Some(connection_descriptor) = target_node_ref.last_connection() else { let Some(connection_descriptor) = target_node_ref.last_connection() else {
return Ok(NetworkResult::no_connection_other( return Ok(NetworkResult::no_connection_other(
@ -154,9 +154,11 @@ impl NetworkManager {
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
target_node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); target_node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp());
Ok(NetworkResult::value(SendDataKind::Existing( Ok(NetworkResult::value(SendDataMethod{
connection_descriptor, contact_method: NodeContactMethod::Existing,
))) opt_relayed_contact_method: None,
connection_descriptor
}))
} }
/// Send data using NodeContactMethod::Unreachable /// Send data using NodeContactMethod::Unreachable
@ -164,7 +166,7 @@ impl NetworkManager {
&self, &self,
target_node_ref: NodeRef, target_node_ref: NodeRef,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<SendDataKind>> { ) -> EyreResult<NetworkResult<SendDataMethod>> {
// Try to send data to the last socket we've seen this peer on // Try to send data to the last socket we've seen this peer on
let Some(connection_descriptor) = target_node_ref.last_connection() else { let Some(connection_descriptor) = target_node_ref.last_connection() else {
return Ok(NetworkResult::no_connection_other( return Ok(NetworkResult::no_connection_other(
@ -186,9 +188,11 @@ impl NetworkManager {
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
target_node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); target_node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp());
Ok(NetworkResult::value(SendDataKind::Existing( Ok(NetworkResult::value(SendDataMethod {
connection_descriptor, connection_descriptor,
))) contact_method: NodeContactMethod::Existing,
opt_relayed_contact_method: None,
}))
} }
/// Send data using NodeContactMethod::SignalReverse /// Send data using NodeContactMethod::SignalReverse
@ -197,7 +201,7 @@ impl NetworkManager {
relay_nr: NodeRef, relay_nr: NodeRef,
target_node_ref: NodeRef, target_node_ref: NodeRef,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<SendDataKind>> { ) -> EyreResult<NetworkResult<SendDataMethod>> {
// First try to send data to the last socket we've seen this peer on // First try to send data to the last socket we've seen this peer on
let data = if let Some(connection_descriptor) = target_node_ref.last_connection() { let data = if let Some(connection_descriptor) = target_node_ref.last_connection() {
match self match self
@ -210,9 +214,11 @@ impl NetworkManager {
target_node_ref target_node_ref
.set_last_connection(connection_descriptor, get_aligned_timestamp()); .set_last_connection(connection_descriptor, get_aligned_timestamp());
return Ok(NetworkResult::value(SendDataKind::Existing( return Ok(NetworkResult::value(SendDataMethod{
connection_descriptor, contact_method: NodeContactMethod::Existing,
))); opt_relayed_contact_method: None,
connection_descriptor
}));
} }
Some(data) => { Some(data) => {
// Couldn't send data to existing connection // Couldn't send data to existing connection
@ -226,12 +232,14 @@ impl NetworkManager {
}; };
let connection_descriptor = network_result_try!( let connection_descriptor = network_result_try!(
self.do_reverse_connect(relay_nr, target_node_ref, data) self.do_reverse_connect(relay_nr.clone(), target_node_ref.clone(), data)
.await? .await?
); );
Ok(NetworkResult::value(SendDataKind::Direct( Ok(NetworkResult::value(SendDataMethod {
connection_descriptor, connection_descriptor,
))) contact_method: NodeContactMethod::SignalReverse(relay_nr, target_node_ref),
opt_relayed_contact_method: None,
}))
} }
/// Send data using NodeContactMethod::SignalHolePunch /// Send data using NodeContactMethod::SignalHolePunch
@ -240,7 +248,7 @@ impl NetworkManager {
relay_nr: NodeRef, relay_nr: NodeRef,
target_node_ref: NodeRef, target_node_ref: NodeRef,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<SendDataKind>> { ) -> EyreResult<NetworkResult<SendDataMethod>> {
// First try to send data to the last socket we've seen this peer on // First try to send data to the last socket we've seen this peer on
let data = if let Some(connection_descriptor) = target_node_ref.last_connection() { let data = if let Some(connection_descriptor) = target_node_ref.last_connection() {
match self match self
@ -253,9 +261,11 @@ impl NetworkManager {
target_node_ref target_node_ref
.set_last_connection(connection_descriptor, get_aligned_timestamp()); .set_last_connection(connection_descriptor, get_aligned_timestamp());
return Ok(NetworkResult::value(SendDataKind::Existing( return Ok(NetworkResult::value(SendDataMethod{
connection_descriptor, contact_method: NodeContactMethod::Existing,
))); opt_relayed_contact_method: None,
connection_descriptor
}));
} }
Some(data) => { Some(data) => {
// Couldn't send data to existing connection // Couldn't send data to existing connection
@ -269,10 +279,12 @@ impl NetworkManager {
}; };
let connection_descriptor = let connection_descriptor =
network_result_try!(self.do_hole_punch(relay_nr, target_node_ref, data).await?); network_result_try!(self.do_hole_punch(relay_nr.clone(), target_node_ref.clone(), data).await?);
Ok(NetworkResult::value(SendDataKind::Direct( Ok(NetworkResult::value(SendDataMethod {
connection_descriptor, connection_descriptor,
))) contact_method: NodeContactMethod::SignalHolePunch(relay_nr, target_node_ref),
opt_relayed_contact_method: None,
}))
} }
/// Send data using NodeContactMethod::Direct /// Send data using NodeContactMethod::Direct
@ -281,7 +293,7 @@ impl NetworkManager {
node_ref: NodeRef, node_ref: NodeRef,
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<SendDataKind>> { ) -> EyreResult<NetworkResult<SendDataMethod>> {
// Since we have the best dial info already, we can find a connection to use by protocol type // Since we have the best dial info already, we can find a connection to use by protocol type
let node_ref = node_ref.filtered_clone(NodeRefFilter::from(dial_info.make_filter())); let node_ref = node_ref.filtered_clone(NodeRefFilter::from(dial_info.make_filter()));
@ -302,9 +314,11 @@ impl NetworkManager {
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp());
return Ok(NetworkResult::value(SendDataKind::Existing( return Ok(NetworkResult::value(SendDataMethod{
connection_descriptor, contact_method: NodeContactMethod::Existing,
))); opt_relayed_contact_method: None,
connection_descriptor
}));
} }
Some(d) => { Some(d) => {
// Connection couldn't send, kill it // Connection couldn't send, kill it
@ -318,14 +332,16 @@ impl NetworkManager {
// New direct connection was necessary for this dial info // New direct connection was necessary for this dial info
let connection_descriptor = let connection_descriptor =
network_result_try!(self.net().send_data_to_dial_info(dial_info, data).await?); network_result_try!(self.net().send_data_to_dial_info(dial_info.clone(), data).await?);
// If we connected to this node directly, save off the last connection so we can use it again // If we connected to this node directly, save off the last connection so we can use it again
node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp());
Ok(NetworkResult::value(SendDataKind::Direct( Ok(NetworkResult::value(SendDataMethod {
connection_descriptor, connection_descriptor,
))) contact_method: NodeContactMethod::Direct(dial_info),
opt_relayed_contact_method: None,
}))
} }
/// Figure out how to reach a node from our own node over the best routing domain and reference the nodes we want to access /// Figure out how to reach a node from our own node over the best routing domain and reference the nodes we want to access

View File

@ -1,476 +0,0 @@
use crate::*;
use core::fmt;
use crypto::*;
use futures_util::stream::{FuturesUnordered, StreamExt};
use network_manager::*;
use routing_table::*;
use stop_token::future::FutureExt;
#[derive(Clone, Debug)]
pub enum ReceiptEvent {
ReturnedOutOfBand,
ReturnedInBand { inbound_noderef: NodeRef },
ReturnedSafety,
ReturnedPrivate { private_route: PublicKey },
Expired,
Cancelled,
}
#[derive(Clone, Debug)]
pub enum ReceiptReturned {
OutOfBand,
InBand { inbound_noderef: NodeRef },
Safety,
Private { private_route: PublicKey },
}
pub trait ReceiptCallback: Send + 'static {
fn call(
&self,
event: ReceiptEvent,
receipt: Receipt,
returns_so_far: u32,
expected_returns: u32,
) -> SendPinBoxFuture<()>;
}
impl<F, T> ReceiptCallback for T
where
T: Fn(ReceiptEvent, Receipt, u32, u32) -> F + Send + 'static,
F: Future<Output = ()> + Send + 'static,
{
fn call(
&self,
event: ReceiptEvent,
receipt: Receipt,
returns_so_far: u32,
expected_returns: u32,
) -> SendPinBoxFuture<()> {
Box::pin(self(event, receipt, returns_so_far, expected_returns))
}
}
type ReceiptCallbackType = Box<dyn ReceiptCallback>;
type ReceiptSingleShotType = SingleShotEventual<ReceiptEvent>;
enum ReceiptRecordCallbackType {
Normal(ReceiptCallbackType),
SingleShot(Option<ReceiptSingleShotType>),
}
impl fmt::Debug for ReceiptRecordCallbackType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"ReceiptRecordCallbackType::{}",
match self {
Self::Normal(_) => "Normal".to_owned(),
Self::SingleShot(_) => "SingleShot".to_owned(),
}
)
}
}
pub struct ReceiptRecord {
expiration_ts: Timestamp,
receipt: Receipt,
expected_returns: u32,
returns_so_far: u32,
receipt_callback: ReceiptRecordCallbackType,
}
impl fmt::Debug for ReceiptRecord {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReceiptRecord")
.field("expiration_ts", &self.expiration_ts)
.field("receipt", &self.receipt)
.field("expected_returns", &self.expected_returns)
.field("returns_so_far", &self.returns_so_far)
.field("receipt_callback", &self.receipt_callback)
.finish()
}
}
impl ReceiptRecord {
pub fn new(
receipt: Receipt,
expiration_ts: Timestamp,
expected_returns: u32,
receipt_callback: impl ReceiptCallback,
) -> Self {
Self {
expiration_ts,
receipt,
expected_returns,
returns_so_far: 0u32,
receipt_callback: ReceiptRecordCallbackType::Normal(Box::new(receipt_callback)),
}
}
pub fn new_single_shot(
receipt: Receipt,
expiration_ts: Timestamp,
eventual: ReceiptSingleShotType,
) -> Self {
Self {
expiration_ts,
receipt,
returns_so_far: 0u32,
expected_returns: 1u32,
receipt_callback: ReceiptRecordCallbackType::SingleShot(Some(eventual)),
}
}
}
/* XXX: may be useful for O(1) timestamp expiration
#[derive(Clone, Debug)]
struct ReceiptRecordTimestampSort {
expiration_ts: Timestamp,
record: Arc<Mutex<ReceiptRecord>>,
}
impl PartialEq for ReceiptRecordTimestampSort {
fn eq(&self, other: &ReceiptRecordTimestampSort) -> bool {
self.expiration_ts == other.expiration_ts
}
}
impl Eq for ReceiptRecordTimestampSort {}
impl Ord for ReceiptRecordTimestampSort {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.expiration_ts.cmp(&other.expiration_ts).reverse()
}
}
impl PartialOrd for ReceiptRecordTimestampSort {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(&other))
}
}
*/
///////////////////////////////////
pub struct ReceiptManagerInner {
network_manager: NetworkManager,
records_by_nonce: BTreeMap<Nonce, Arc<Mutex<ReceiptRecord>>>,
next_oldest_ts: Option<Timestamp>,
stop_source: Option<StopSource>,
timeout_task: MustJoinSingleFuture<()>,
}
#[derive(Clone)]
pub struct ReceiptManager {
inner: Arc<Mutex<ReceiptManagerInner>>,
}
impl ReceiptManager {
fn new_inner(network_manager: NetworkManager) -> ReceiptManagerInner {
ReceiptManagerInner {
network_manager,
records_by_nonce: BTreeMap::new(),
next_oldest_ts: None,
stop_source: None,
timeout_task: MustJoinSingleFuture::new(),
}
}
pub fn new(network_manager: NetworkManager) -> Self {
Self {
inner: Arc::new(Mutex::new(Self::new_inner(network_manager))),
}
}
pub fn network_manager(&self) -> NetworkManager {
self.inner.lock().network_manager.clone()
}
pub async fn startup(&self) -> EyreResult<()> {
trace!("startup receipt manager");
// Retrieve config
{
// let config = self.core().config();
// let c = config.get();
let mut inner = self.inner.lock();
inner.stop_source = Some(StopSource::new());
}
Ok(())
}
fn perform_callback(
evt: ReceiptEvent,
record_mut: &mut ReceiptRecord,
) -> Option<SendPinBoxFuture<()>> {
match &mut record_mut.receipt_callback {
ReceiptRecordCallbackType::Normal(callback) => Some(callback.call(
evt,
record_mut.receipt.clone(),
record_mut.returns_so_far,
record_mut.expected_returns,
)),
ReceiptRecordCallbackType::SingleShot(eventual) => {
// resolve this eventual with the receiptevent
// don't need to wait for the instance to receive it
// because this can only happen once
if let Some(eventual) = eventual.take() {
eventual.resolve(evt);
}
None
}
}
}
#[instrument(level = "trace", skip(self))]
pub async fn timeout_task_routine(self, now: Timestamp, stop_token: StopToken) {
// Go through all receipts and build a list of expired nonces
let mut new_next_oldest_ts: Option<Timestamp> = None;
let mut expired_records = Vec::new();
{
let mut inner = self.inner.lock();
let mut expired_nonces = Vec::new();
for (k, v) in &inner.records_by_nonce {
let receipt_inner = v.lock();
if receipt_inner.expiration_ts <= now {
// Expire this receipt
expired_nonces.push(*k);
} else if new_next_oldest_ts.is_none()
|| receipt_inner.expiration_ts < new_next_oldest_ts.unwrap()
{
// Mark the next oldest timestamp we would need to take action on as we go through everything
new_next_oldest_ts = Some(receipt_inner.expiration_ts);
}
}
if expired_nonces.is_empty() {
return;
}
// Now remove the expired receipts
for e in expired_nonces {
let expired_record = inner.records_by_nonce.remove(&e).expect("key should exist");
expired_records.push(expired_record);
}
// Update the next oldest timestamp
inner.next_oldest_ts = new_next_oldest_ts;
}
let mut callbacks = FuturesUnordered::new();
for expired_record in expired_records {
let mut expired_record_mut = expired_record.lock();
if let Some(callback) =
Self::perform_callback(ReceiptEvent::Expired, &mut expired_record_mut)
{
callbacks.push(callback.instrument(Span::current()))
}
}
// Wait on all the multi-call callbacks
loop {
if let Ok(None) | Err(_) = callbacks.next().timeout_at(stop_token.clone()).await {
break;
}
}
}
pub async fn tick(&self) -> EyreResult<()> {
let (next_oldest_ts, timeout_task, stop_token) = {
let inner = self.inner.lock();
let stop_token = match inner.stop_source.as_ref() {
Some(ss) => ss.token(),
None => {
// Do nothing if we're shutting down
return Ok(());
}
};
(inner.next_oldest_ts, inner.timeout_task.clone(), stop_token)
};
let now = get_aligned_timestamp();
// If we have at least one timestamp to expire, lets do it
if let Some(next_oldest_ts) = next_oldest_ts {
if now >= next_oldest_ts {
// Single-spawn the timeout task routine
let _ = timeout_task
.single_spawn(self.clone().timeout_task_routine(now, stop_token))
.await;
}
}
Ok(())
}
pub async fn shutdown(&self) {
debug!("starting receipt manager shutdown");
let network_manager = self.network_manager();
// Stop all tasks
let timeout_task = {
let mut inner = self.inner.lock();
// Drop the stop
drop(inner.stop_source.take());
inner.timeout_task.clone()
};
// Wait for everything to stop
debug!("waiting for timeout task to stop");
if timeout_task.join().await.is_err() {
panic!("joining timeout task failed");
}
*self.inner.lock() = Self::new_inner(network_manager);
debug!("finished receipt manager shutdown");
}
pub fn record_receipt(
&self,
receipt: Receipt,
expiration: Timestamp,
expected_returns: u32,
callback: impl ReceiptCallback,
) {
let receipt_nonce = receipt.get_nonce();
log_rpc!(debug "== New Multiple Receipt ({}) {} ", expected_returns, receipt_nonce.encode());
let record = Arc::new(Mutex::new(ReceiptRecord::new(
receipt,
expiration,
expected_returns,
callback,
)));
let mut inner = self.inner.lock();
inner.records_by_nonce.insert(receipt_nonce, record);
Self::update_next_oldest_timestamp(&mut inner);
}
pub fn record_single_shot_receipt(
&self,
receipt: Receipt,
expiration: Timestamp,
eventual: ReceiptSingleShotType,
) {
let receipt_nonce = receipt.get_nonce();
log_rpc!(debug "== New SingleShot Receipt {}", receipt_nonce.encode());
let record = Arc::new(Mutex::new(ReceiptRecord::new_single_shot(
receipt, expiration, eventual,
)));
let mut inner = self.inner.lock();
inner.records_by_nonce.insert(receipt_nonce, record);
Self::update_next_oldest_timestamp(&mut inner);
}
fn update_next_oldest_timestamp(inner: &mut ReceiptManagerInner) {
// Update the next oldest timestamp
let mut new_next_oldest_ts: Option<Timestamp> = None;
for v in inner.records_by_nonce.values() {
let receipt_inner = v.lock();
if new_next_oldest_ts.is_none()
|| receipt_inner.expiration_ts < new_next_oldest_ts.unwrap()
{
// Mark the next oldest timestamp we would need to take action on as we go through everything
new_next_oldest_ts = Some(receipt_inner.expiration_ts);
}
}
inner.next_oldest_ts = new_next_oldest_ts;
}
pub async fn cancel_receipt(&self, nonce: &Nonce) -> EyreResult<()> {
log_rpc!(debug "== Cancel Receipt {}", nonce.encode());
// Remove the record
let record = {
let mut inner = self.inner.lock();
let record = match inner.records_by_nonce.remove(nonce) {
Some(r) => r,
None => {
bail!("receipt not recorded");
}
};
Self::update_next_oldest_timestamp(&mut inner);
record
};
// Generate a cancelled callback
let callback_future = {
let mut record_mut = record.lock();
Self::perform_callback(ReceiptEvent::Cancelled, &mut record_mut)
};
// Issue the callback
if let Some(callback_future) = callback_future {
callback_future.await;
}
Ok(())
}
pub async fn handle_receipt(
&self,
receipt: Receipt,
receipt_returned: ReceiptReturned,
) -> NetworkResult<()> {
let receipt_nonce = receipt.get_nonce();
let extra_data = receipt.get_extra_data();
log_rpc!(debug "<<== RECEIPT {} <- {}{}",
receipt_nonce.encode(),
match receipt_returned {
ReceiptReturned::OutOfBand => "OutOfBand".to_owned(),
ReceiptReturned::InBand { ref inbound_noderef } => format!("InBand({})", inbound_noderef),
ReceiptReturned::Safety => "Safety".to_owned(),
ReceiptReturned::Private { ref private_route } => format!("Private({})", private_route),
},
if extra_data.is_empty() {
"".to_owned()
} else {
format!("[{} extra]", extra_data.len())
}
);
// Increment return count
let (callback_future, stop_token) = {
// Look up the receipt record from the nonce
let mut inner = self.inner.lock();
let stop_token = match inner.stop_source.as_ref() {
Some(ss) => ss.token(),
None => {
// If we're stopping do nothing here
return NetworkResult::value(());
}
};
let record = match inner.records_by_nonce.get(&receipt_nonce) {
Some(r) => r.clone(),
None => {
return NetworkResult::invalid_message("receipt not recorded");
}
};
// Generate the callback future
let mut record_mut = record.lock();
record_mut.returns_so_far += 1;
// Get the receipt event to return
let receipt_event = match receipt_returned {
ReceiptReturned::OutOfBand => ReceiptEvent::ReturnedOutOfBand,
ReceiptReturned::Safety => ReceiptEvent::ReturnedSafety,
ReceiptReturned::InBand { inbound_noderef } => {
ReceiptEvent::ReturnedInBand { inbound_noderef }
}
ReceiptReturned::Private { private_route } => {
ReceiptEvent::ReturnedPrivate { private_route }
}
};
let callback_future = Self::perform_callback(receipt_event, &mut record_mut);
// Remove the record if we're done
if record_mut.returns_so_far == record_mut.expected_returns {
inner.records_by_nonce.remove(&receipt_nonce);
Self::update_next_oldest_timestamp(&mut inner);
}
(callback_future, stop_token)
};
// Issue the callback
if let Some(callback_future) = callback_future {
let _ = callback_future.timeout_at(stop_token).await;
}
NetworkResult::value(())
}
}

View File

@ -29,18 +29,18 @@ const NEVER_REACHED_PING_COUNT: u32 = 3;
// Do not change order here, it will mess up other sorts // Do not change order here, it will mess up other sorts
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum BucketEntryState { pub(crate) enum BucketEntryState {
Dead, Dead,
Unreliable, Unreliable,
Reliable, Reliable,
} }
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] #[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
pub struct LastConnectionKey(ProtocolType, AddressType); pub(crate) struct LastConnectionKey(ProtocolType, AddressType);
/// Bucket entry information specific to the LocalNetwork RoutingDomain /// Bucket entry information specific to the LocalNetwork RoutingDomain
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct BucketEntryPublicInternet { pub(crate) struct BucketEntryPublicInternet {
/// The PublicInternet node info /// The PublicInternet node info
signed_node_info: Option<Box<SignedNodeInfo>>, signed_node_info: Option<Box<SignedNodeInfo>>,
/// The last node info timestamp of ours that this entry has seen /// The last node info timestamp of ours that this entry has seen
@ -51,7 +51,7 @@ pub struct BucketEntryPublicInternet {
/// Bucket entry information specific to the LocalNetwork RoutingDomain /// Bucket entry information specific to the LocalNetwork RoutingDomain
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct BucketEntryLocalNetwork { pub(crate) struct BucketEntryLocalNetwork {
/// The LocalNetwork node info /// The LocalNetwork node info
signed_node_info: Option<Box<SignedNodeInfo>>, signed_node_info: Option<Box<SignedNodeInfo>>,
/// The last node info timestamp of ours that this entry has seen /// The last node info timestamp of ours that this entry has seen
@ -62,7 +62,7 @@ pub struct BucketEntryLocalNetwork {
/// The data associated with each bucket entry /// The data associated with each bucket entry
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct BucketEntryInner { pub(crate) struct BucketEntryInner {
/// The node ids matching this bucket entry, with the cryptography versions supported by this node as the 'kind' field /// The node ids matching this bucket entry, with the cryptography versions supported by this node as the 'kind' field
validated_node_ids: TypedKeyGroup, validated_node_ids: TypedKeyGroup,
/// The node ids claimed by the remote node that use cryptography versions we do not support /// The node ids claimed by the remote node that use cryptography versions we do not support
@ -828,7 +828,7 @@ impl BucketEntryInner {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct BucketEntry { pub(crate) struct BucketEntry {
pub(super) ref_count: AtomicU32, pub(super) ref_count: AtomicU32,
inner: RwLock<BucketEntryInner>, inner: RwLock<BucketEntryInner>,
} }

View File

@ -57,8 +57,8 @@ const CACHE_VALIDITY_KEY: &[u8] = b"cache_validity_key";
// Critical sections // Critical sections
const LOCK_TAG_TICK: &str = "TICK"; const LOCK_TAG_TICK: &str = "TICK";
pub type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>; type LowLevelProtocolPorts = BTreeSet<(LowLevelProtocolType, AddressType, u16)>;
pub type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>; type ProtocolToPortMapping = BTreeMap<(ProtocolType, AddressType), (LowLevelProtocolType, u16)>;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct LowLevelPortInfo { pub struct LowLevelPortInfo {
pub low_level_protocol_ports: LowLevelProtocolPorts, pub low_level_protocol_ports: LowLevelProtocolPorts,
@ -66,11 +66,12 @@ pub struct LowLevelPortInfo {
} }
pub type RoutingTableEntryFilter<'t> = pub type RoutingTableEntryFilter<'t> =
Box<dyn FnMut(&RoutingTableInner, Option<Arc<BucketEntry>>) -> bool + Send + 't>; Box<dyn FnMut(&RoutingTableInner, Option<Arc<BucketEntry>>) -> bool + Send + 't>;
pub type SerializedBuckets = Vec<Vec<u8>>;
pub type SerializedBucketMap = BTreeMap<CryptoKind, SerializedBuckets>; type SerializedBuckets = Vec<Vec<u8>>;
type SerializedBucketMap = BTreeMap<CryptoKind, SerializedBuckets>;
#[derive(Clone, Debug, Default, Eq, PartialEq)] #[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct RoutingTableHealth { pub(crate) struct RoutingTableHealth {
/// Number of reliable (long-term responsive) entries in the routing table /// Number of reliable (long-term responsive) entries in the routing table
pub reliable_entry_count: usize, pub reliable_entry_count: usize,
/// Number of unreliable (occasionally unresponsive) entries in the routing table /// Number of unreliable (occasionally unresponsive) entries in the routing table
@ -87,7 +88,12 @@ pub struct RoutingTableHealth {
pub type BucketIndex = (CryptoKind, usize); pub type BucketIndex = (CryptoKind, usize);
pub struct RoutingTableUnlockedInner { #[derive(Debug, Clone, Copy)]
pub(crate) struct RecentPeersEntry {
pub last_connection: ConnectionDescriptor,
}
pub(crate) struct RoutingTableUnlockedInner {
// Accessors // Accessors
config: VeilidConfig, config: VeilidConfig,
network_manager: NetworkManager, network_manager: NetworkManager,
@ -192,7 +198,7 @@ impl RoutingTableUnlockedInner {
} }
#[derive(Clone)] #[derive(Clone)]
pub struct RoutingTable { pub(crate) struct RoutingTable {
inner: Arc<RwLock<RoutingTableInner>>, inner: Arc<RwLock<RoutingTableInner>>,
unlocked_inner: Arc<RoutingTableUnlockedInner>, unlocked_inner: Arc<RoutingTableUnlockedInner>,
} }
@ -788,7 +794,7 @@ impl RoutingTable {
/// Only one protocol per low level protocol/port combination is required /// Only one protocol per low level protocol/port combination is required
/// For example, if WS/WSS and TCP protocols are on the same low-level TCP port, only TCP keepalives will be required /// For example, if WS/WSS and TCP protocols are on the same low-level TCP port, only TCP keepalives will be required
/// and we do not need to do WS/WSS keepalive as well. If they are on different ports, then we will need WS/WSS keepalives too. /// and we do not need to do WS/WSS keepalive as well. If they are on different ports, then we will need WS/WSS keepalives too.
pub fn get_low_level_port_info(&self) -> LowLevelPortInfo { fn get_low_level_port_info(&self) -> LowLevelPortInfo {
let mut low_level_protocol_ports = let mut low_level_protocol_ports =
BTreeSet::<(LowLevelProtocolType, AddressType, u16)>::new(); BTreeSet::<(LowLevelProtocolType, AddressType, u16)>::new();
let mut protocol_to_port = let mut protocol_to_port =

View File

@ -4,7 +4,7 @@ use alloc::fmt;
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
pub struct NodeRefBaseCommon { pub(crate) struct NodeRefBaseCommon {
routing_table: RoutingTable, routing_table: RoutingTable,
entry: Arc<BucketEntry>, entry: Arc<BucketEntry>,
filter: Option<NodeRefFilter>, filter: Option<NodeRefFilter>,
@ -15,7 +15,7 @@ pub struct NodeRefBaseCommon {
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
pub trait NodeRefBase: Sized { pub(crate) trait NodeRefBase: Sized {
// Common field access // Common field access
fn common(&self) -> &NodeRefBaseCommon; fn common(&self) -> &NodeRefBaseCommon;
fn common_mut(&mut self) -> &mut NodeRefBaseCommon; fn common_mut(&mut self) -> &mut NodeRefBaseCommon;
@ -314,6 +314,17 @@ pub trait NodeRefBase: Sized {
}) })
} }
fn protect_last_connection(&self) -> bool {
if let Some(descriptor) = self.last_connection() {
self.routing_table()
.network_manager()
.connection_manager()
.protect_connection(descriptor)
} else {
false
}
}
fn has_any_dial_info(&self) -> bool { fn has_any_dial_info(&self) -> bool {
self.operate(|_rti, e| { self.operate(|_rti, e| {
for rtd in RoutingDomain::all() { for rtd in RoutingDomain::all() {
@ -369,7 +380,7 @@ pub trait NodeRefBase: Sized {
/// Reference to a routing table entry /// Reference to a routing table entry
/// Keeps entry in the routing table until all references are gone /// Keeps entry in the routing table until all references are gone
pub struct NodeRef { pub(crate) struct NodeRef {
common: NodeRefBaseCommon, common: NodeRefBaseCommon,
} }
@ -496,7 +507,7 @@ impl Drop for NodeRef {
/// For internal use inside the RoutingTable module where you have /// For internal use inside the RoutingTable module where you have
/// already locked a RoutingTableInner /// already locked a RoutingTableInner
/// Keeps entry in the routing table until all references are gone /// Keeps entry in the routing table until all references are gone
pub struct NodeRefLocked<'a> { pub(crate) struct NodeRefLocked<'a> {
inner: Mutex<&'a RoutingTableInner>, inner: Mutex<&'a RoutingTableInner>,
nr: NodeRef, nr: NodeRef,
} }
@ -559,7 +570,7 @@ impl<'a> fmt::Debug for NodeRefLocked<'a> {
/// For internal use inside the RoutingTable module where you have /// For internal use inside the RoutingTable module where you have
/// already locked a RoutingTableInner /// already locked a RoutingTableInner
/// Keeps entry in the routing table until all references are gone /// Keeps entry in the routing table until all references are gone
pub struct NodeRefLockedMut<'a> { pub(crate) struct NodeRefLockedMut<'a> {
inner: Mutex<&'a mut RoutingTableInner>, inner: Mutex<&'a mut RoutingTableInner>,
nr: NodeRef, nr: NodeRef,
} }

View File

@ -5,7 +5,7 @@ use super::*;
/// An encrypted private/safety route hop /// An encrypted private/safety route hop
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct RouteHopData { pub(crate) struct RouteHopData {
/// The nonce used in the encryption ENC(Xn,DH(PKn,SKapr)) /// The nonce used in the encryption ENC(Xn,DH(PKn,SKapr))
pub nonce: Nonce, pub nonce: Nonce,
/// The encrypted blob /// The encrypted blob
@ -14,7 +14,7 @@ pub struct RouteHopData {
/// How to find a route node /// How to find a route node
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum RouteNode { pub(crate) enum RouteNode {
/// Route node is optimized, no contact method information as this node id has been seen before /// Route node is optimized, no contact method information as this node id has been seen before
NodeId(PublicKey), NodeId(PublicKey),
/// Route node with full contact method information to ensure the peer is reachable /// Route node with full contact method information to ensure the peer is reachable
@ -79,7 +79,7 @@ impl RouteNode {
/// An unencrypted private/safety route hop /// An unencrypted private/safety route hop
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct RouteHop { pub(crate) struct RouteHop {
/// The location of the hop /// The location of the hop
pub node: RouteNode, pub node: RouteNode,
/// The encrypted blob to pass to the next hop as its data (None for stubs) /// The encrypted blob to pass to the next hop as its data (None for stubs)
@ -93,7 +93,7 @@ impl RouteHop {
/// The kind of hops a private route can have /// The kind of hops a private route can have
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum PrivateRouteHops { pub(crate) enum PrivateRouteHops {
/// The first hop of a private route, unencrypted, route_hops == total hop count /// The first hop of a private route, unencrypted, route_hops == total hop count
FirstHop(Box<RouteHop>), FirstHop(Box<RouteHop>),
/// Private route internal node. Has > 0 private route hops left but < total hop count /// Private route internal node. Has > 0 private route hops left but < total hop count
@ -113,7 +113,7 @@ impl PrivateRouteHops {
} }
/// A private route for receiver privacy /// A private route for receiver privacy
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct PrivateRoute { pub(crate) struct PrivateRoute {
/// The public key used for the entire route /// The public key used for the entire route
pub public_key: TypedKey, pub public_key: TypedKey,
pub hop_count: u8, pub hop_count: u8,
@ -225,7 +225,7 @@ impl fmt::Display for PrivateRoute {
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum SafetyRouteHops { pub(crate) enum SafetyRouteHops {
/// Has >= 1 safety route hops /// Has >= 1 safety route hops
Data(RouteHopData), Data(RouteHopData),
/// Has 0 safety route hops /// Has 0 safety route hops
@ -233,7 +233,7 @@ pub enum SafetyRouteHops {
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct SafetyRoute { pub(crate) struct SafetyRoute {
pub public_key: TypedKey, pub public_key: TypedKey,
pub hop_count: u8, pub hop_count: u8,
pub hops: SafetyRouteHops, pub hops: SafetyRouteHops,

View File

@ -703,7 +703,7 @@ impl RouteSpecStore {
// Test with double-round trip ping to self // Test with double-round trip ping to self
let rpc_processor = self.unlocked_inner.routing_table.rpc_processor(); let rpc_processor = self.unlocked_inner.routing_table.rpc_processor();
let _res = match rpc_processor.rpc_call_status(dest).await? { let _res = match rpc_processor.rpc_call_status(dest, true).await? {
NetworkResult::Value(v) => v, NetworkResult::Value(v) => v,
_ => { _ => {
// Did not error, but did not come back, just return false // Did not error, but did not come back, just return false
@ -746,7 +746,7 @@ impl RouteSpecStore {
// Test with double-round trip ping to self // Test with double-round trip ping to self
let rpc_processor = self.unlocked_inner.routing_table.rpc_processor(); let rpc_processor = self.unlocked_inner.routing_table.rpc_processor();
let _res = match rpc_processor.rpc_call_status(dest).await? { let _res = match rpc_processor.rpc_call_status(dest, true).await? {
NetworkResult::Value(v) => v, NetworkResult::Value(v) => v,
_ => { _ => {
// Did not error, but did not come back, just return false // Did not error, but did not come back, just return false

View File

@ -6,13 +6,8 @@ pub const RECENT_PEERS_TABLE_SIZE: usize = 64;
pub type EntryCounts = BTreeMap<(RoutingDomain, CryptoKind), usize>; pub type EntryCounts = BTreeMap<(RoutingDomain, CryptoKind), usize>;
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone, Copy)]
pub struct RecentPeersEntry {
pub last_connection: ConnectionDescriptor,
}
/// RoutingTable rwlock-internal data /// RoutingTable rwlock-internal data
pub struct RoutingTableInner { pub(crate) struct RoutingTableInner {
/// Extra pointer to unlocked members to simplify access /// Extra pointer to unlocked members to simplify access
pub(super) unlocked_inner: Arc<RoutingTableUnlockedInner>, pub(super) unlocked_inner: Arc<RoutingTableUnlockedInner>,
/// Routing table buckets that hold references to entries, per crypto kind /// Routing table buckets that hold references to entries, per crypto kind

View File

@ -109,8 +109,10 @@ impl RoutingTable {
unord.push( unord.push(
async move { async move {
rpc.rpc_call_status(Destination::direct(relay_nr_filtered)) let out = rpc
.await .rpc_call_status(Destination::direct(relay_nr_filtered), true)
.await;
out
} }
.instrument(Span::current()) .instrument(Span::current())
.boxed(), .boxed(),
@ -145,7 +147,7 @@ impl RoutingTable {
let rpc = rpc.clone(); let rpc = rpc.clone();
log_rtab!("--> Validator ping to {:?}", nr); log_rtab!("--> Validator ping to {:?}", nr);
unord.push( unord.push(
async move { rpc.rpc_call_status(Destination::direct(nr)).await } async move { rpc.rpc_call_status(Destination::direct(nr), false).await }
.instrument(Span::current()) .instrument(Span::current())
.boxed(), .boxed(),
); );
@ -173,7 +175,7 @@ impl RoutingTable {
// Just do a single ping with the best protocol for all the nodes // Just do a single ping with the best protocol for all the nodes
unord.push( unord.push(
async move { rpc.rpc_call_status(Destination::direct(nr)).await } async move { rpc.rpc_call_status(Destination::direct(nr), false).await }
.instrument(Span::current()) .instrument(Span::current())
.boxed(), .boxed(),
); );

View File

@ -19,7 +19,6 @@ pub(crate) fn mock_routing_table() -> routing_table::RoutingTable {
let network_manager = network_manager::NetworkManager::new( let network_manager = network_manager::NetworkManager::new(
veilid_config.clone(), veilid_config.clone(),
storage_manager, storage_manager,
protected_store.clone(),
table_store.clone(), table_store.clone(),
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
block_store.clone(), block_store.clone(),

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCAnswer { pub(in crate::rpc_processor) struct RPCAnswer {
detail: RPCAnswerDetail, detail: RPCAnswerDetail,
} }
@ -30,7 +30,7 @@ impl RPCAnswer {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum RPCAnswerDetail { pub(in crate::rpc_processor) enum RPCAnswerDetail {
StatusA(Box<RPCOperationStatusA>), StatusA(Box<RPCOperationStatusA>),
FindNodeA(Box<RPCOperationFindNodeA>), FindNodeA(Box<RPCOperationFindNodeA>),
AppCallA(Box<RPCOperationAppCallA>), AppCallA(Box<RPCOperationAppCallA>),

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum RPCOperationKind { pub(in crate::rpc_processor) enum RPCOperationKind {
Question(Box<RPCQuestion>), Question(Box<RPCQuestion>),
Statement(Box<RPCStatement>), Statement(Box<RPCStatement>),
Answer(Box<RPCAnswer>), Answer(Box<RPCAnswer>),
@ -60,7 +60,7 @@ impl RPCOperationKind {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperation { pub(in crate::rpc_processor) struct RPCOperation {
op_id: OperationId, op_id: OperationId,
opt_sender_peer_info: Option<PeerInfo>, opt_sender_peer_info: Option<PeerInfo>,
target_node_info_ts: Timestamp, target_node_info_ts: Timestamp,

View File

@ -4,7 +4,7 @@ const MAX_APP_CALL_Q_MESSAGE_LEN: usize = 32768;
const MAX_APP_CALL_A_MESSAGE_LEN: usize = 32768; const MAX_APP_CALL_A_MESSAGE_LEN: usize = 32768;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationAppCallQ { pub(in crate::rpc_processor) struct RPCOperationAppCallQ {
message: Vec<u8>, message: Vec<u8>,
} }
@ -46,7 +46,7 @@ impl RPCOperationAppCallQ {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationAppCallA { pub(in crate::rpc_processor) struct RPCOperationAppCallA {
message: Vec<u8>, message: Vec<u8>,
} }

View File

@ -3,7 +3,7 @@ use super::*;
const MAX_APP_MESSAGE_MESSAGE_LEN: usize = 32768; const MAX_APP_MESSAGE_MESSAGE_LEN: usize = 32768;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationAppMessage { pub(in crate::rpc_processor) struct RPCOperationAppMessage {
message: Vec<u8>, message: Vec<u8>,
} }

View File

@ -2,7 +2,7 @@ use super::*;
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationCancelTunnelQ { pub(in crate::rpc_processor) struct RPCOperationCancelTunnelQ {
id: TunnelId, id: TunnelId,
} }
@ -40,7 +40,7 @@ impl RPCOperationCancelTunnelQ {
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum RPCOperationCancelTunnelA { pub(in crate::rpc_processor) enum RPCOperationCancelTunnelA {
Tunnel(TunnelId), Tunnel(TunnelId),
Error(TunnelError), Error(TunnelError),
} }

View File

@ -2,7 +2,7 @@ use super::*;
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationCompleteTunnelQ { pub(in crate::rpc_processor) struct RPCOperationCompleteTunnelQ {
id: TunnelId, id: TunnelId,
local_mode: TunnelMode, local_mode: TunnelMode,
depth: u8, depth: u8,
@ -77,7 +77,7 @@ impl RPCOperationCompleteTunnelQ {
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum RPCOperationCompleteTunnelA { pub(in crate::rpc_processor) enum RPCOperationCompleteTunnelA {
Tunnel(FullTunnel), Tunnel(FullTunnel),
Error(TunnelError), Error(TunnelError),
} }

View File

@ -5,7 +5,7 @@ const MAX_FIND_BLOCK_A_SUPPLIERS_LEN: usize = 10;
const MAX_FIND_BLOCK_A_PEERS_LEN: usize = 10; const MAX_FIND_BLOCK_A_PEERS_LEN: usize = 10;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationFindBlockQ { pub(in crate::rpc_processor) struct RPCOperationFindBlockQ {
block_id: TypedKey, block_id: TypedKey,
} }
@ -45,7 +45,7 @@ impl RPCOperationFindBlockQ {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationFindBlockA { pub(in crate::rpc_processor) struct RPCOperationFindBlockA {
data: Vec<u8>, data: Vec<u8>,
suppliers: Vec<PeerInfo>, suppliers: Vec<PeerInfo>,
peers: Vec<PeerInfo>, peers: Vec<PeerInfo>,

View File

@ -3,7 +3,7 @@ use super::*;
const MAX_FIND_NODE_A_PEERS_LEN: usize = 20; const MAX_FIND_NODE_A_PEERS_LEN: usize = 20;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationFindNodeQ { pub(in crate::rpc_processor) struct RPCOperationFindNodeQ {
node_id: TypedKey, node_id: TypedKey,
capabilities: Vec<Capability>, capabilities: Vec<Capability>,
} }
@ -74,7 +74,7 @@ impl RPCOperationFindNodeQ {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationFindNodeA { pub(in crate::rpc_processor) struct RPCOperationFindNodeA {
peers: Vec<PeerInfo>, peers: Vec<PeerInfo>,
} }

View File

@ -4,7 +4,7 @@ use crate::storage_manager::{SignedValueData, SignedValueDescriptor};
const MAX_GET_VALUE_A_PEERS_LEN: usize = 20; const MAX_GET_VALUE_A_PEERS_LEN: usize = 20;
#[derive(Clone)] #[derive(Clone)]
pub struct ValidateGetValueContext { pub(in crate::rpc_processor) struct ValidateGetValueContext {
pub last_descriptor: Option<SignedValueDescriptor>, pub last_descriptor: Option<SignedValueDescriptor>,
pub subkey: ValueSubkey, pub subkey: ValueSubkey,
pub vcrypto: CryptoSystemVersion, pub vcrypto: CryptoSystemVersion,
@ -21,7 +21,7 @@ impl fmt::Debug for ValidateGetValueContext {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationGetValueQ { pub(in crate::rpc_processor) struct RPCOperationGetValueQ {
key: TypedKey, key: TypedKey,
subkey: ValueSubkey, subkey: ValueSubkey,
want_descriptor: bool, want_descriptor: bool,
@ -76,7 +76,7 @@ impl RPCOperationGetValueQ {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationGetValueA { pub(in crate::rpc_processor) struct RPCOperationGetValueA {
value: Option<SignedValueData>, value: Option<SignedValueData>,
peers: Vec<PeerInfo>, peers: Vec<PeerInfo>,
descriptor: Option<SignedValueDescriptor>, descriptor: Option<SignedValueDescriptor>,

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationReturnReceipt { pub(in crate::rpc_processor) struct RPCOperationReturnReceipt {
receipt: Vec<u8>, receipt: Vec<u8>,
} }

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Clone)] #[derive(Clone)]
pub struct RoutedOperation { pub(crate) struct RoutedOperation {
sequencing: Sequencing, sequencing: Sequencing,
signatures: Vec<Signature>, signatures: Vec<Signature>,
nonce: Nonce, nonce: Nonce,
@ -106,7 +106,7 @@ impl RoutedOperation {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationRoute { pub(crate) struct RPCOperationRoute {
safety_route: SafetyRoute, safety_route: SafetyRoute,
operation: RoutedOperation, operation: RoutedOperation,
} }

View File

@ -4,7 +4,7 @@ use crate::storage_manager::{SignedValueData, SignedValueDescriptor};
const MAX_SET_VALUE_A_PEERS_LEN: usize = 20; const MAX_SET_VALUE_A_PEERS_LEN: usize = 20;
#[derive(Clone)] #[derive(Clone)]
pub struct ValidateSetValueContext { pub(in crate::rpc_processor) struct ValidateSetValueContext {
pub descriptor: SignedValueDescriptor, pub descriptor: SignedValueDescriptor,
pub subkey: ValueSubkey, pub subkey: ValueSubkey,
pub vcrypto: CryptoSystemVersion, pub vcrypto: CryptoSystemVersion,
@ -21,7 +21,7 @@ impl fmt::Debug for ValidateSetValueContext {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationSetValueQ { pub(in crate::rpc_processor) struct RPCOperationSetValueQ {
key: TypedKey, key: TypedKey,
subkey: ValueSubkey, subkey: ValueSubkey,
value: SignedValueData, value: SignedValueData,
@ -110,7 +110,7 @@ impl RPCOperationSetValueQ {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationSetValueA { pub(in crate::rpc_processor) struct RPCOperationSetValueA {
set: bool, set: bool,
value: Option<SignedValueData>, value: Option<SignedValueData>,
peers: Vec<PeerInfo>, peers: Vec<PeerInfo>,

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationSignal { pub(in crate::rpc_processor) struct RPCOperationSignal {
signal_info: SignalInfo, signal_info: SignalInfo,
} }

View File

@ -2,7 +2,7 @@ use super::*;
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationStartTunnelQ { pub(in crate::rpc_processor) struct RPCOperationStartTunnelQ {
id: TunnelId, id: TunnelId,
local_mode: TunnelMode, local_mode: TunnelMode,
depth: u8, depth: u8,
@ -67,7 +67,7 @@ impl RPCOperationStartTunnelQ {
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum RPCOperationStartTunnelA { pub(in crate::rpc_processor) enum RPCOperationStartTunnelA {
Partial(PartialTunnel), Partial(PartialTunnel),
Error(TunnelError), Error(TunnelError),
} }

View File

@ -3,7 +3,7 @@ use super::*;
const MAX_SUPPLY_BLOCK_A_PEERS_LEN: usize = 20; const MAX_SUPPLY_BLOCK_A_PEERS_LEN: usize = 20;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationSupplyBlockQ { pub(in crate::rpc_processor) struct RPCOperationSupplyBlockQ {
block_id: TypedKey, block_id: TypedKey,
} }
@ -43,7 +43,7 @@ impl RPCOperationSupplyBlockQ {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationSupplyBlockA { pub(in crate::rpc_processor) struct RPCOperationSupplyBlockA {
expiration: u64, expiration: u64,
peers: Vec<PeerInfo>, peers: Vec<PeerInfo>,
} }

View File

@ -4,7 +4,7 @@ const MAX_WATCH_VALUE_Q_SUBKEYS_LEN: usize = 512;
const MAX_WATCH_VALUE_A_PEERS_LEN: usize = 20; const MAX_WATCH_VALUE_A_PEERS_LEN: usize = 20;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationWatchValueQ { pub(in crate::rpc_processor) struct RPCOperationWatchValueQ {
key: TypedKey, key: TypedKey,
subkeys: ValueSubkeyRangeSet, subkeys: ValueSubkeyRangeSet,
expiration: u64, expiration: u64,
@ -199,7 +199,7 @@ impl RPCOperationWatchValueQ {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationWatchValueA { pub(in crate::rpc_processor) struct RPCOperationWatchValueA {
expiration: u64, expiration: u64,
peers: Vec<PeerInfo>, peers: Vec<PeerInfo>,
} }

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCQuestion { pub(in crate::rpc_processor) struct RPCQuestion {
respond_to: RespondTo, respond_to: RespondTo,
detail: RPCQuestionDetail, detail: RPCQuestionDetail,
} }
@ -42,7 +42,7 @@ impl RPCQuestion {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum RPCQuestionDetail { pub(in crate::rpc_processor) enum RPCQuestionDetail {
StatusQ(Box<RPCOperationStatusQ>), StatusQ(Box<RPCOperationStatusQ>),
FindNodeQ(Box<RPCOperationFindNodeQ>), FindNodeQ(Box<RPCOperationFindNodeQ>),
AppCallQ(Box<RPCOperationAppCallQ>), AppCallQ(Box<RPCOperationAppCallQ>),

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum RespondTo { pub(in crate::rpc_processor) enum RespondTo {
Sender, Sender,
PrivateRoute(PrivateRoute), PrivateRoute(PrivateRoute),
} }

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCStatement { pub(in crate::rpc_processor) struct RPCStatement {
detail: RPCStatementDetail, detail: RPCStatementDetail,
} }
@ -33,7 +33,7 @@ impl RPCStatement {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum RPCStatementDetail { pub(in crate::rpc_processor) enum RPCStatementDetail {
ValidateDialInfo(Box<RPCOperationValidateDialInfo>), ValidateDialInfo(Box<RPCOperationValidateDialInfo>),
Route(Box<RPCOperationRoute>), Route(Box<RPCOperationRoute>),
ValueChanged(Box<RPCOperationValueChanged>), ValueChanged(Box<RPCOperationValueChanged>),

View File

@ -2,7 +2,7 @@ use super::*;
//////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////
pub fn encode_route_hop_data( pub(crate) fn encode_route_hop_data(
route_hop_data: &RouteHopData, route_hop_data: &RouteHopData,
builder: &mut veilid_capnp::route_hop_data::Builder, builder: &mut veilid_capnp::route_hop_data::Builder,
) -> Result<(), RPCError> { ) -> Result<(), RPCError> {
@ -24,7 +24,7 @@ pub fn encode_route_hop_data(
Ok(()) Ok(())
} }
pub fn decode_route_hop_data( pub(crate) fn decode_route_hop_data(
reader: &veilid_capnp::route_hop_data::Reader, reader: &veilid_capnp::route_hop_data::Reader,
) -> Result<RouteHopData, RPCError> { ) -> Result<RouteHopData, RPCError> {
let nonce = decode_nonce( let nonce = decode_nonce(
@ -45,7 +45,7 @@ pub fn decode_route_hop_data(
//////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////
pub fn encode_route_hop( pub(crate) fn encode_route_hop(
route_hop: &RouteHop, route_hop: &RouteHop,
builder: &mut veilid_capnp::route_hop::Builder, builder: &mut veilid_capnp::route_hop::Builder,
) -> Result<(), RPCError> { ) -> Result<(), RPCError> {
@ -67,7 +67,9 @@ pub fn encode_route_hop(
Ok(()) Ok(())
} }
pub fn decode_route_hop(reader: &veilid_capnp::route_hop::Reader) -> Result<RouteHop, RPCError> { pub(crate) fn decode_route_hop(
reader: &veilid_capnp::route_hop::Reader,
) -> Result<RouteHop, RPCError> {
let n_reader = reader.reborrow().get_node(); let n_reader = reader.reborrow().get_node();
let node = match n_reader.which().map_err(RPCError::protocol)? { let node = match n_reader.which().map_err(RPCError::protocol)? {
veilid_capnp::route_hop::node::Which::NodeId(ni) => { veilid_capnp::route_hop::node::Which::NodeId(ni) => {
@ -97,7 +99,7 @@ pub fn decode_route_hop(reader: &veilid_capnp::route_hop::Reader) -> Result<Rout
//////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////
pub fn encode_private_route( pub(crate) fn encode_private_route(
private_route: &PrivateRoute, private_route: &PrivateRoute,
builder: &mut veilid_capnp::private_route::Builder, builder: &mut veilid_capnp::private_route::Builder,
) -> Result<(), RPCError> { ) -> Result<(), RPCError> {
@ -123,7 +125,7 @@ pub fn encode_private_route(
Ok(()) Ok(())
} }
pub fn decode_private_route( pub(crate) fn decode_private_route(
reader: &veilid_capnp::private_route::Reader, reader: &veilid_capnp::private_route::Reader,
) -> Result<PrivateRoute, RPCError> { ) -> Result<PrivateRoute, RPCError> {
let public_key = decode_typed_key(&reader.get_public_key().map_err( let public_key = decode_typed_key(&reader.get_public_key().map_err(
@ -152,7 +154,7 @@ pub fn decode_private_route(
//////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////
pub fn encode_safety_route( pub(crate) fn encode_safety_route(
safety_route: &SafetyRoute, safety_route: &SafetyRoute,
builder: &mut veilid_capnp::safety_route::Builder, builder: &mut veilid_capnp::safety_route::Builder,
) -> Result<(), RPCError> { ) -> Result<(), RPCError> {
@ -176,7 +178,7 @@ pub fn encode_safety_route(
Ok(()) Ok(())
} }
pub fn decode_safety_route( pub(crate) fn decode_safety_route(
reader: &veilid_capnp::safety_route::Reader, reader: &veilid_capnp::safety_route::Reader,
) -> Result<SafetyRoute, RPCError> { ) -> Result<SafetyRoute, RPCError> {
let public_key = decode_typed_key( let public_key = decode_typed_key(

View File

@ -42,7 +42,6 @@ use super::*;
use crypto::*; use crypto::*;
use futures_util::StreamExt; use futures_util::StreamExt;
use network_manager::*; use network_manager::*;
use receipt_manager::*;
use routing_table::*; use routing_table::*;
use stop_token::future::FutureExt; use stop_token::future::FutureExt;
use storage_manager::*; use storage_manager::*;
@ -186,7 +185,7 @@ struct WaitableReply {
timeout_us: TimestampDuration, timeout_us: TimestampDuration,
node_ref: NodeRef, node_ref: NodeRef,
send_ts: Timestamp, send_ts: Timestamp,
send_data_kind: SendDataKind, send_data_method: SendDataMethod,
safety_route: Option<PublicKey>, safety_route: Option<PublicKey>,
remote_private_route: Option<PublicKey>, remote_private_route: Option<PublicKey>,
reply_private_route: Option<PublicKey>, reply_private_route: Option<PublicKey>,
@ -1199,7 +1198,7 @@ impl RPCProcessor {
); );
RPCError::network(e) RPCError::network(e)
})?; })?;
let send_data_kind = network_result_value_or_log!( res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] { let send_data_method = network_result_value_or_log!( res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] {
// If we couldn't send we're still cleaning up // If we couldn't send we're still cleaning up
self.record_send_failure(RPCKind::Question, send_ts, node_ref.clone(), safety_route, remote_private_route); self.record_send_failure(RPCKind::Question, send_ts, node_ref.clone(), safety_route, remote_private_route);
network_result_raise!(res); network_result_raise!(res);
@ -1222,7 +1221,7 @@ impl RPCProcessor {
timeout_us, timeout_us,
node_ref, node_ref,
send_ts, send_ts,
send_data_kind, send_data_method,
safety_route, safety_route,
remote_private_route, remote_private_route,
reply_private_route, reply_private_route,
@ -1284,7 +1283,7 @@ impl RPCProcessor {
); );
RPCError::network(e) RPCError::network(e)
})?; })?;
let _send_data_kind = network_result_value_or_log!( res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] { let _send_data_method = network_result_value_or_log!( res => [ format!(": node_ref={}, destination_node_ref={}, message.len={}", node_ref, destination_node_ref, message_len) ] {
// If we couldn't send we're still cleaning up // If we couldn't send we're still cleaning up
self.record_send_failure(RPCKind::Statement, send_ts, node_ref.clone(), safety_route, remote_private_route); self.record_send_failure(RPCKind::Statement, send_ts, node_ref.clone(), safety_route, remote_private_route);
network_result_raise!(res); network_result_raise!(res);

View File

@ -22,6 +22,7 @@ impl RPCProcessor {
pub async fn rpc_call_status( pub async fn rpc_call_status(
self, self,
dest: Destination, dest: Destination,
protect: bool,
) -> RPCNetworkResult<Answer<Option<SenderInfo>>> { ) -> RPCNetworkResult<Answer<Option<SenderInfo>>> {
let (opt_target_nr, routing_domain, node_status) = match dest.get_safety_selection() { let (opt_target_nr, routing_domain, node_status) = match dest.get_safety_selection() {
SafetySelection::Unsafe(_) => { SafetySelection::Unsafe(_) => {
@ -110,8 +111,20 @@ impl RPCProcessor {
let waitable_reply = let waitable_reply =
network_result_try!(self.question(dest.clone(), question, None).await?); network_result_try!(self.question(dest.clone(), question, None).await?);
// Optionally protect the connection in the event this for a relay or route keepalive
if protect {
self.network_manager()
.connection_manager()
.protect_connection(
waitable_reply
.send_data_method
.connection_descriptor
.clone(),
);
}
// Note what kind of ping this was and to what peer scope // Note what kind of ping this was and to what peer scope
let send_data_kind = waitable_reply.send_data_kind; let send_data_method = waitable_reply.send_data_method.clone();
// Wait for reply // Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? { let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {
@ -149,33 +162,30 @@ impl RPCProcessor {
} => { } => {
if matches!(safety_selection, SafetySelection::Unsafe(_)) { if matches!(safety_selection, SafetySelection::Unsafe(_)) {
if let Some(sender_info) = sender_info { if let Some(sender_info) = sender_info {
match send_data_kind { if send_data_method.opt_relayed_contact_method.is_none()
SendDataKind::Direct(connection_descriptor) => { && matches!(
// Directly requested status that actually gets sent directly and not over a relay will tell us what our IP address appears as send_data_method.contact_method,
// If this changes, we'd want to know about that to reset the networking stack NodeContactMethod::Direct(_)
match routing_domain { )
RoutingDomain::PublicInternet => self {
.network_manager() // Directly requested status that actually gets sent directly and not over a relay will tell us what our IP address appears as
.report_public_internet_socket_address( // If this changes, we'd want to know about that to reset the networking stack
sender_info.socket_address, match routing_domain {
connection_descriptor, RoutingDomain::PublicInternet => self
target, .network_manager()
), .report_public_internet_socket_address(
RoutingDomain::LocalNetwork => { sender_info.socket_address,
self.network_manager().report_local_network_socket_address( send_data_method.connection_descriptor,
sender_info.socket_address, target,
connection_descriptor, ),
target, RoutingDomain::LocalNetwork => {
) self.network_manager().report_local_network_socket_address(
} sender_info.socket_address,
send_data_method.connection_descriptor,
target,
)
} }
} }
SendDataKind::Indirect => {
// Do nothing in this case, as the socket address returned here would be for any node other than ours
}
SendDataKind::Existing(_) => {
// Do nothing in this case, as an existing connection could not have a different public address or it would have been reset
}
}; };
opt_sender_info = Some(sender_info.clone()); opt_sender_info = Some(sender_info.clone());
} }

View File

@ -764,7 +764,7 @@ impl VeilidAPI {
} }
let netman = self.network_manager()?; let netman = self.network_manager()?;
netman.net().restart_network(); netman.debug_restart_network();
Ok("Network restarted".to_owned()) Ok("Network restarted".to_owned())
} else { } else {
@ -930,7 +930,7 @@ impl VeilidAPI {
// Send a StatusQ // Send a StatusQ
let out = match rpc let out = match rpc
.rpc_call_status(dest) .rpc_call_status(dest, false)
.await .await
.map_err(VeilidAPIError::internal)? .map_err(VeilidAPIError::internal)?
{ {

View File

@ -24,7 +24,6 @@ pub use crypto::*;
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
pub use intf::BlockStore; pub use intf::BlockStore;
pub use intf::ProtectedStore; pub use intf::ProtectedStore;
pub use routing_table::{NodeRef, NodeRefBase};
pub use table_store::{TableDB, TableDBTransaction, TableStore}; pub use table_store::{TableDB, TableDBTransaction, TableStore};
use crate::*; use crate::*;

View File

@ -1,4 +1,5 @@
use super::*; use super::*;
use routing_table::NodeRefBase;
/////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////
@ -18,15 +19,6 @@ pub struct RoutingContextUnlockedInner {
safety_selection: SafetySelection, safety_selection: SafetySelection,
} }
impl Drop for RoutingContextInner {
fn drop(&mut self) {
// self.api
// .borrow_mut()
// .routing_contexts
// //.remove(&self.id);
}
}
/// Routing contexts are the way you specify the communication preferences for Veilid. /// Routing contexts are the way you specify the communication preferences for Veilid.
/// ///
/// By default routing contexts are 'direct' from node to node, offering no privacy. To enable sender /// By default routing contexts are 'direct' from node to node, offering no privacy. To enable sender