0.4.1 changes

This commit is contained in:
Christien Rioux 2024-10-23 23:27:34 +00:00
parent f0716af537
commit 1780ecb684
140 changed files with 1272 additions and 710 deletions

View File

@ -1,3 +1,12 @@
**Changed in Veilid 0.4.1**
- Implement top level event bus to do asynchronous lock-free communication between subsystems
- Fix deadlock in socket address change event
- Fix deadlock in peer info change event
- Fix incorrect node info equivalence check
- Ping relays every second instead of every 10 seconds
- MR !328 'tiny improvements'
**Changed in Veilid 0.4.0** **Changed in Veilid 0.4.0**
- RFC-0001: Constrain DHT Subkey Size, issue #406 - RFC-0001: Constrain DHT Subkey Size, issue #406

View File

@ -15,18 +15,20 @@ struct AttachmentManagerInner {
} }
struct AttachmentManagerUnlockedInner { struct AttachmentManagerUnlockedInner {
_event_bus: EventBus,
config: VeilidConfig, config: VeilidConfig,
network_manager: NetworkManager, network_manager: NetworkManager,
} }
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct AttachmentManager { pub struct AttachmentManager {
inner: Arc<Mutex<AttachmentManagerInner>>, inner: Arc<Mutex<AttachmentManagerInner>>,
unlocked_inner: Arc<AttachmentManagerUnlockedInner>, unlocked_inner: Arc<AttachmentManagerUnlockedInner>,
} }
impl AttachmentManager { impl AttachmentManager {
fn new_unlocked_inner( fn new_unlocked_inner(
event_bus: EventBus,
config: VeilidConfig, config: VeilidConfig,
storage_manager: StorageManager, storage_manager: StorageManager,
table_store: TableStore, table_store: TableStore,
@ -34,8 +36,10 @@ impl AttachmentManager {
crypto: Crypto, crypto: Crypto,
) -> AttachmentManagerUnlockedInner { ) -> AttachmentManagerUnlockedInner {
AttachmentManagerUnlockedInner { AttachmentManagerUnlockedInner {
_event_bus: event_bus.clone(),
config: config.clone(), config: config.clone(),
network_manager: NetworkManager::new( network_manager: NetworkManager::new(
event_bus,
config, config,
storage_manager, storage_manager,
table_store, table_store,
@ -57,6 +61,7 @@ impl AttachmentManager {
} }
} }
pub fn new( pub fn new(
event_bus: EventBus,
config: VeilidConfig, config: VeilidConfig,
storage_manager: StorageManager, storage_manager: StorageManager,
table_store: TableStore, table_store: TableStore,
@ -66,6 +71,7 @@ impl AttachmentManager {
Self { Self {
inner: Arc::new(Mutex::new(Self::new_inner())), inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(Self::new_unlocked_inner( unlocked_inner: Arc::new(Self::new_unlocked_inner(
event_bus,
config, config,
storage_manager, storage_manager,
table_store, table_store,

View File

@ -11,10 +11,11 @@ pub type UpdateCallback = Arc<dyn Fn(VeilidUpdate) + Send + Sync>;
/// Internal services startup mechanism. /// Internal services startup mechanism.
/// Ensures that everything is started up, and shut down in the right order /// Ensures that everything is started up, and shut down in the right order
/// and provides an atomic state for if the system is properly operational. /// and provides an atomic state for if the system is properly operational.
struct ServicesContext { struct StartupShutdownContext {
pub config: VeilidConfig, pub config: VeilidConfig,
pub update_callback: UpdateCallback, pub update_callback: UpdateCallback,
pub event_bus: Option<EventBus>,
pub protected_store: Option<ProtectedStore>, pub protected_store: Option<ProtectedStore>,
pub table_store: Option<TableStore>, pub table_store: Option<TableStore>,
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
@ -24,11 +25,12 @@ struct ServicesContext {
pub storage_manager: Option<StorageManager>, pub storage_manager: Option<StorageManager>,
} }
impl ServicesContext { impl StartupShutdownContext {
pub fn new_empty(config: VeilidConfig, update_callback: UpdateCallback) -> Self { pub fn new_empty(config: VeilidConfig, update_callback: UpdateCallback) -> Self {
Self { Self {
config, config,
update_callback, update_callback,
event_bus: None,
protected_store: None, protected_store: None,
table_store: None, table_store: None,
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
@ -39,9 +41,11 @@ impl ServicesContext {
} }
} }
#[allow(clippy::too_many_arguments)]
pub fn new_full( pub fn new_full(
config: VeilidConfig, config: VeilidConfig,
update_callback: UpdateCallback, update_callback: UpdateCallback,
event_bus: EventBus,
protected_store: ProtectedStore, protected_store: ProtectedStore,
table_store: TableStore, table_store: TableStore,
#[cfg(feature = "unstable-blockstore")] block_store: BlockStore, #[cfg(feature = "unstable-blockstore")] block_store: BlockStore,
@ -52,6 +56,7 @@ impl ServicesContext {
Self { Self {
config, config,
update_callback, update_callback,
event_bus: Some(event_bus),
protected_store: Some(protected_store), protected_store: Some(protected_store),
table_store: Some(table_store), table_store: Some(table_store),
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
@ -75,8 +80,17 @@ impl ServicesContext {
ApiTracingLayer::add_callback(program_name, namespace, self.update_callback.clone()) ApiTracingLayer::add_callback(program_name, namespace, self.update_callback.clone())
.await?; .await?;
// Add the event bus
let event_bus = EventBus::new();
if let Err(e) = event_bus.startup().await {
error!("failed to start up event bus: {}", e);
self.shutdown().await;
return Err(e.into());
}
self.event_bus = Some(event_bus.clone());
// Set up protected store // Set up protected store
let protected_store = ProtectedStore::new(self.config.clone()); let protected_store = ProtectedStore::new(event_bus.clone(), self.config.clone());
if let Err(e) = protected_store.init().await { if let Err(e) = protected_store.init().await {
error!("failed to init protected store: {}", e); error!("failed to init protected store: {}", e);
self.shutdown().await; self.shutdown().await;
@ -85,8 +99,12 @@ impl ServicesContext {
self.protected_store = Some(protected_store.clone()); self.protected_store = Some(protected_store.clone());
// Set up tablestore and crypto system // Set up tablestore and crypto system
let table_store = TableStore::new(self.config.clone(), protected_store.clone()); let table_store = TableStore::new(
let crypto = Crypto::new(self.config.clone(), table_store.clone()); event_bus.clone(),
self.config.clone(),
protected_store.clone(),
);
let crypto = Crypto::new(event_bus.clone(), self.config.clone(), table_store.clone());
table_store.set_crypto(crypto.clone()); table_store.set_crypto(crypto.clone());
// Initialize table store first, so crypto code can load caches // Initialize table store first, so crypto code can load caches
@ -110,7 +128,7 @@ impl ServicesContext {
// Set up block store // Set up block store
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
{ {
let block_store = BlockStore::new(self.config.clone()); let block_store = BlockStore::new(event_bus.clone(), self.config.clone());
if let Err(e) = block_store.init().await { if let Err(e) = block_store.init().await {
error!("failed to init block store: {}", e); error!("failed to init block store: {}", e);
self.shutdown().await; self.shutdown().await;
@ -123,6 +141,7 @@ impl ServicesContext {
let update_callback = self.update_callback.clone(); let update_callback = self.update_callback.clone();
let storage_manager = StorageManager::new( let storage_manager = StorageManager::new(
event_bus.clone(),
self.config.clone(), self.config.clone(),
self.crypto.clone().unwrap(), self.crypto.clone().unwrap(),
self.table_store.clone().unwrap(), self.table_store.clone().unwrap(),
@ -139,6 +158,7 @@ impl ServicesContext {
// Set up attachment manager // Set up attachment manager
let update_callback = self.update_callback.clone(); let update_callback = self.update_callback.clone();
let attachment_manager = AttachmentManager::new( let attachment_manager = AttachmentManager::new(
event_bus.clone(),
self.config.clone(), self.config.clone(),
storage_manager, storage_manager,
table_store, table_store,
@ -180,6 +200,9 @@ impl ServicesContext {
if let Some(protected_store) = &mut self.protected_store { if let Some(protected_store) = &mut self.protected_store {
protected_store.terminate().await; protected_store.terminate().await;
} }
if let Some(event_bus) = &mut self.event_bus {
event_bus.shutdown().await;
}
info!("Veilid API shutdown complete"); info!("Veilid API shutdown complete");
@ -199,9 +222,11 @@ impl ServicesContext {
} }
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
pub(crate) struct VeilidCoreContext { pub struct VeilidCoreContext {
pub config: VeilidConfig, pub config: VeilidConfig,
pub update_callback: UpdateCallback, pub update_callback: UpdateCallback,
// Event bus
pub event_bus: EventBus,
// Services // Services
pub storage_manager: StorageManager, pub storage_manager: StorageManager,
pub protected_store: ProtectedStore, pub protected_store: ProtectedStore,
@ -249,12 +274,13 @@ impl VeilidCoreContext {
} }
} }
let mut sc = ServicesContext::new_empty(config.clone(), update_callback); let mut sc = StartupShutdownContext::new_empty(config.clone(), update_callback);
sc.startup().await.map_err(VeilidAPIError::generic)?; sc.startup().await.map_err(VeilidAPIError::generic)?;
Ok(VeilidCoreContext { Ok(VeilidCoreContext {
config: sc.config, config: sc.config,
update_callback: sc.update_callback, update_callback: sc.update_callback,
event_bus: sc.event_bus.unwrap(),
storage_manager: sc.storage_manager.unwrap(), storage_manager: sc.storage_manager.unwrap(),
protected_store: sc.protected_store.unwrap(), protected_store: sc.protected_store.unwrap(),
table_store: sc.table_store.unwrap(), table_store: sc.table_store.unwrap(),
@ -267,9 +293,10 @@ impl VeilidCoreContext {
#[instrument(level = "trace", target = "core_context", skip_all)] #[instrument(level = "trace", target = "core_context", skip_all)]
async fn shutdown(self) { async fn shutdown(self) {
let mut sc = ServicesContext::new_full( let mut sc = StartupShutdownContext::new_full(
self.config.clone(), self.config.clone(),
self.update_callback.clone(), self.update_callback.clone(),
self.event_bus,
self.protected_store, self.protected_store,
self.table_store, self.table_store,
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
@ -397,7 +424,7 @@ pub async fn api_startup_config(
} }
#[instrument(level = "trace", target = "core_context", skip_all)] #[instrument(level = "trace", target = "core_context", skip_all)]
pub(crate) async fn api_shutdown(context: VeilidCoreContext) { pub async fn api_shutdown(context: VeilidCoreContext) {
let mut initialized_lock = INITIALIZED.lock().await; let mut initialized_lock = INITIALIZED.lock().await;
let init_key = { let init_key = {

View File

@ -79,6 +79,7 @@ struct CryptoInner {
} }
struct CryptoUnlockedInner { struct CryptoUnlockedInner {
_event_bus: EventBus,
config: VeilidConfig, config: VeilidConfig,
table_store: TableStore, table_store: TableStore,
} }
@ -102,9 +103,10 @@ impl Crypto {
} }
} }
pub fn new(config: VeilidConfig, table_store: TableStore) -> Self { pub fn new(event_bus: EventBus, config: VeilidConfig, table_store: TableStore) -> Self {
let out = Self { let out = Self {
unlocked_inner: Arc::new(CryptoUnlockedInner { unlocked_inner: Arc::new(CryptoUnlockedInner {
_event_bus: event_bus,
config, config,
table_store, table_store,
}), }),

View File

@ -6,6 +6,7 @@ struct BlockStoreInner {
#[derive(Clone)] #[derive(Clone)]
pub struct BlockStore { pub struct BlockStore {
event_bus: EventBus,
config: VeilidConfig, config: VeilidConfig,
inner: Arc<Mutex<BlockStoreInner>>, inner: Arc<Mutex<BlockStoreInner>>,
} }
@ -14,8 +15,9 @@ impl BlockStore {
fn new_inner() -> BlockStoreInner { fn new_inner() -> BlockStoreInner {
BlockStoreInner {} BlockStoreInner {}
} }
pub fn new(config: VeilidConfig) -> Self { pub fn new(event_bus: EventBus, config: VeilidConfig) -> Self {
Self { Self {
event_bus,
config, config,
inner: Arc::new(Mutex::new(Self::new_inner())), inner: Arc::new(Mutex::new(Self::new_inner())),
} }

View File

@ -9,6 +9,7 @@ pub struct ProtectedStoreInner {
#[derive(Clone)] #[derive(Clone)]
pub struct ProtectedStore { pub struct ProtectedStore {
_event_bus: EventBus,
config: VeilidConfig, config: VeilidConfig,
inner: Arc<Mutex<ProtectedStoreInner>>, inner: Arc<Mutex<ProtectedStoreInner>>,
} }
@ -20,8 +21,9 @@ impl ProtectedStore {
} }
} }
pub fn new(config: VeilidConfig) -> Self { pub fn new(event_bus: EventBus, config: VeilidConfig) -> Self {
Self { Self {
_event_bus: event_bus,
config, config,
inner: Arc::new(Mutex::new(Self::new_inner())), inner: Arc::new(Mutex::new(Self::new_inner())),
} }

View File

@ -6,6 +6,7 @@ struct BlockStoreInner {
#[derive(Clone)] #[derive(Clone)]
pub struct BlockStore { pub struct BlockStore {
event_bus: EventBus,
config: VeilidConfig, config: VeilidConfig,
inner: Arc<Mutex<BlockStoreInner>>, inner: Arc<Mutex<BlockStoreInner>>,
} }
@ -14,8 +15,9 @@ impl BlockStore {
fn new_inner() -> BlockStoreInner { fn new_inner() -> BlockStoreInner {
BlockStoreInner {} BlockStoreInner {}
} }
pub fn new(config: VeilidConfig) -> Self { pub fn new(event_bus: EventBus, config: VeilidConfig) -> Self {
Self { Self {
event_bus,
config, config,
inner: Arc::new(Mutex::new(Self::new_inner())), inner: Arc::new(Mutex::new(Self::new_inner())),
} }

View File

@ -5,12 +5,16 @@ use web_sys::*;
#[derive(Clone)] #[derive(Clone)]
pub struct ProtectedStore { pub struct ProtectedStore {
_event_bus: EventBus,
config: VeilidConfig, config: VeilidConfig,
} }
impl ProtectedStore { impl ProtectedStore {
pub fn new(config: VeilidConfig) -> Self { pub fn new(event_bus: EventBus, config: VeilidConfig) -> Self {
Self { config } Self {
_event_bus: event_bus,
config,
}
} }
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]

View File

@ -23,9 +23,9 @@ pub const ADDRESS_CHECK_CACHE_SIZE: usize = 10;
// TimestampDuration::new(3_600_000_000_u64); // 60 minutes // TimestampDuration::new(3_600_000_000_u64); // 60 minutes
/// Address checker config /// Address checker config
pub(crate) struct AddressCheckConfig { pub struct AddressCheckConfig {
pub(crate) detect_address_changes: bool, pub detect_address_changes: bool,
pub(crate) ip6_prefix_size: usize, pub ip6_prefix_size: usize,
} }
#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)] #[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
@ -33,7 +33,7 @@ struct AddressCheckCacheKey(RoutingDomain, ProtocolType, AddressType);
/// Address checker - keep track of how other nodes are seeing our node's address on a per-protocol basis /// Address checker - keep track of how other nodes are seeing our node's address on a per-protocol basis
/// Used to determine if our address has changed and if we should re-publish new PeerInfo /// Used to determine if our address has changed and if we should re-publish new PeerInfo
pub(crate) struct AddressCheck { pub struct AddressCheck {
config: AddressCheckConfig, config: AddressCheckConfig,
net: Network, net: Network,
current_network_class: BTreeMap<RoutingDomain, NetworkClass>, current_network_class: BTreeMap<RoutingDomain, NetworkClass>,

View File

@ -7,7 +7,7 @@ const DIAL_INFO_FAILURE_DURATION_MIN: usize = 10;
const MAX_DIAL_INFO_FAILURES: usize = 65536; const MAX_DIAL_INFO_FAILURES: usize = 65536;
#[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)] #[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)]
pub enum AddressFilterError { pub(crate) enum AddressFilterError {
#[error("Count exceeded")] #[error("Count exceeded")]
CountExceeded, CountExceeded,
#[error("Rate exceeded")] #[error("Rate exceeded")]

View File

@ -1,5 +1,5 @@
use super::*; use super::*;
pub(crate) use connection_table::ConnectionRefKind; use connection_table::ConnectionRefKind;
use connection_table::*; use connection_table::*;
use network_connection::*; use network_connection::*;
use stop_token::future::FutureExt; use stop_token::future::FutureExt;
@ -19,7 +19,7 @@ enum ConnectionManagerEvent {
} }
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct ConnectionRefScope { pub struct ConnectionRefScope {
connection_manager: ConnectionManager, connection_manager: ConnectionManager,
id: NetworkConnectionId, id: NetworkConnectionId,
} }
@ -78,7 +78,7 @@ impl core::fmt::Debug for ConnectionManagerArc {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct ConnectionManager { pub struct ConnectionManager {
arc: Arc<ConnectionManagerArc>, arc: Arc<ConnectionManagerArc>,
} }

View File

@ -8,7 +8,7 @@ const PRIORITY_FLOW_PERCENTAGE: usize = 25;
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
#[derive(ThisError, Debug)] #[derive(ThisError, Debug)]
pub(in crate::network_manager) enum ConnectionTableAddError { pub enum ConnectionTableAddError {
#[error("Connection already added to table")] #[error("Connection already added to table")]
AlreadyExists(NetworkConnection), AlreadyExists(NetworkConnection),
#[error("Connection address was filtered")] #[error("Connection address was filtered")]
@ -30,7 +30,7 @@ impl ConnectionTableAddError {
} }
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
pub(crate) enum ConnectionRefKind { pub enum ConnectionRefKind {
AddRef, AddRef,
RemoveRef, RemoveRef,
} }
@ -49,7 +49,7 @@ struct ConnectionTableInner {
} }
#[derive(Debug)] #[derive(Debug)]
pub(in crate::network_manager) struct ConnectionTable { pub struct ConnectionTable {
inner: Arc<Mutex<ConnectionTableInner>>, inner: Arc<Mutex<ConnectionTableInner>>,
} }

View File

@ -3,7 +3,7 @@ use super::*;
impl NetworkManager { impl NetworkManager {
// Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism) // Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism)
#[instrument(level = "trace", target = "net", skip(self), ret, err)] #[instrument(level = "trace", target = "net", skip(self), ret, err)]
pub(crate) async fn handle_boot_request(&self, flow: Flow) -> EyreResult<NetworkResult<()>> { pub async fn handle_boot_request(&self, flow: Flow) -> EyreResult<NetworkResult<()>> {
let routing_table = self.routing_table(); let routing_table = self.routing_table();
// Get a bunch of nodes with the various // Get a bunch of nodes with the various

View File

@ -23,12 +23,11 @@ pub mod tests;
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
pub(crate) use connection_manager::*; pub use connection_manager::*;
pub(crate) use network_connection::*; pub use network_connection::*;
pub(crate) use receipt_manager::*; pub use receipt_manager::*;
pub(crate) use stats::*; pub use stats::*;
pub(crate) use types::*;
pub use types::*;
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
use address_check::*; use address_check::*;
@ -76,7 +75,7 @@ struct ClientAllowlistEntry {
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) struct SendDataMethod { pub struct SendDataMethod {
/// How the data was sent, possibly to a relay /// How the data was sent, possibly to a relay
pub contact_method: NodeContactMethod, pub contact_method: NodeContactMethod,
/// Pre-relayed contact method /// Pre-relayed contact method
@ -87,7 +86,7 @@ pub(crate) struct SendDataMethod {
/// Mechanism required to contact another node /// Mechanism required to contact another node
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) enum NodeContactMethod { pub enum NodeContactMethod {
/// Node is not reachable by any means /// Node is not reachable by any means
Unreachable, Unreachable,
/// Connection should have already existed /// Connection should have already existed
@ -134,6 +133,7 @@ struct NetworkManagerInner {
struct NetworkManagerUnlockedInner { struct NetworkManagerUnlockedInner {
// Handles // Handles
event_bus: EventBus,
config: VeilidConfig, config: VeilidConfig,
storage_manager: StorageManager, storage_manager: StorageManager,
table_store: TableStore, table_store: TableStore,
@ -170,6 +170,7 @@ impl NetworkManager {
} }
} }
fn new_unlocked_inner( fn new_unlocked_inner(
event_bus: EventBus,
config: VeilidConfig, config: VeilidConfig,
storage_manager: StorageManager, storage_manager: StorageManager,
table_store: TableStore, table_store: TableStore,
@ -178,6 +179,7 @@ impl NetworkManager {
network_key: Option<SharedSecret>, network_key: Option<SharedSecret>,
) -> NetworkManagerUnlockedInner { ) -> NetworkManagerUnlockedInner {
NetworkManagerUnlockedInner { NetworkManagerUnlockedInner {
event_bus,
config: config.clone(), config: config.clone(),
storage_manager, storage_manager,
table_store, table_store,
@ -202,6 +204,7 @@ impl NetworkManager {
} }
pub fn new( pub fn new(
event_bus: EventBus,
config: VeilidConfig, config: VeilidConfig,
storage_manager: StorageManager, storage_manager: StorageManager,
table_store: TableStore, table_store: TableStore,
@ -238,6 +241,7 @@ impl NetworkManager {
let this = Self { let this = Self {
inner: Arc::new(Mutex::new(Self::new_inner())), inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(Self::new_unlocked_inner( unlocked_inner: Arc::new(Self::new_unlocked_inner(
event_bus,
config, config,
storage_manager, storage_manager,
table_store, table_store,
@ -252,6 +256,9 @@ impl NetworkManager {
this this
} }
pub fn event_bus(&self) -> EventBus {
self.unlocked_inner.event_bus.clone()
}
pub fn config(&self) -> VeilidConfig { pub fn config(&self) -> VeilidConfig {
self.unlocked_inner.config.clone() self.unlocked_inner.config.clone()
} }
@ -404,7 +411,7 @@ impl NetworkManager {
.unwrap() .unwrap()
.clone(), .clone(),
); );
let receipt_manager = ReceiptManager::new(self.clone()); let receipt_manager = ReceiptManager::new();
*self.unlocked_inner.components.write() = Some(NetworkComponents { *self.unlocked_inner.components.write() = Some(NetworkComponents {
net: net.clone(), net: net.clone(),
connection_manager: connection_manager.clone(), connection_manager: connection_manager.clone(),
@ -437,6 +444,22 @@ impl NetworkManager {
rpc_processor.startup().await?; rpc_processor.startup().await?;
receipt_manager.startup().await?; receipt_manager.startup().await?;
// Register event handlers
let this = self.clone();
self.event_bus().subscribe(move |evt| {
let this = this.clone();
Box::pin(async move {
this.peer_info_change_event_handler(evt);
})
});
let this = self.clone();
self.event_bus().subscribe(move |evt| {
let this = this.clone();
Box::pin(async move {
this.socket_address_change_event_handler(evt);
})
});
log_net!("NetworkManager::internal_startup end"); log_net!("NetworkManager::internal_startup end");
Ok(StartupDisposition::Success) Ok(StartupDisposition::Success)
@ -1260,32 +1283,25 @@ impl NetworkManager {
} }
// Report peer info changes // Report peer info changes
pub fn report_peer_info_change(&mut self, peer_info: Arc<PeerInfo>) { fn peer_info_change_event_handler(&self, evt: Arc<PeerInfoChangeEvent>) {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
if let Some(address_check) = inner.address_check.as_mut() { if let Some(address_check) = inner.address_check.as_mut() {
address_check.report_peer_info_change(peer_info); address_check.report_peer_info_change(evt.peer_info.clone());
} }
} }
// Determine if our IP address has changed // Determine if our IP address has changed
// this means we should recreate our public dial info if it is not static and rediscover it // this means we should recreate our public dial info if it is not static and rediscover it
// Wait until we have received confirmation from N different peers // Wait until we have received confirmation from N different peers
pub fn report_socket_address_change( fn socket_address_change_event_handler(&self, evt: Arc<SocketAddressChangeEvent>) {
&self,
routing_domain: RoutingDomain, // the routing domain this flow is over
socket_address: SocketAddress, // the socket address as seen by the remote peer
old_socket_address: Option<SocketAddress>, // the socket address previously for this peer
flow: Flow, // the flow used
reporting_peer: NodeRef, // the peer's noderef reporting the socket address
) {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
if let Some(address_check) = inner.address_check.as_mut() { if let Some(address_check) = inner.address_check.as_mut() {
address_check.report_socket_address_change( address_check.report_socket_address_change(
routing_domain, evt.routing_domain,
socket_address, evt.socket_address,
old_socket_address, evt.old_socket_address,
flow, evt.flow,
reporting_peer, evt.reporting_peer.clone(),
); );
} }
} }

View File

@ -7,6 +7,8 @@ mod protocol;
mod start_protocols; mod start_protocols;
mod tasks; mod tasks;
pub(super) use protocol::*;
use super::*; use super::*;
use crate::routing_table::*; use crate::routing_table::*;
use connection_manager::*; use connection_manager::*;
@ -16,7 +18,6 @@ use network_tcp::*;
use protocol::tcp::RawTcpProtocolHandler; use protocol::tcp::RawTcpProtocolHandler;
use protocol::udp::RawUdpProtocolHandler; use protocol::udp::RawUdpProtocolHandler;
use protocol::ws::WebsocketProtocolHandler; use protocol::ws::WebsocketProtocolHandler;
pub(in crate::network_manager) use protocol::*;
use start_protocols::*; use start_protocols::*;
use async_tls::TlsAcceptor; use async_tls::TlsAcceptor;
@ -133,7 +134,7 @@ struct NetworkUnlockedInner {
} }
#[derive(Clone)] #[derive(Clone)]
pub(in crate::network_manager) struct Network { pub(super) struct Network {
config: VeilidConfig, config: VeilidConfig,
inner: Arc<Mutex<NetworkInner>>, inner: Arc<Mutex<NetworkInner>>,
unlocked_inner: Arc<NetworkUnlockedInner>, unlocked_inner: Arc<NetworkUnlockedInner>,

View File

@ -6,7 +6,7 @@ use stop_token::future::FutureExt;
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
#[derive(Clone)] #[derive(Clone)]
pub(in crate::network_manager) struct ListenerState { pub struct ListenerState {
pub protocol_accept_handlers: Vec<Box<dyn ProtocolAcceptHandler + 'static>>, pub protocol_accept_handlers: Vec<Box<dyn ProtocolAcceptHandler + 'static>>,
pub tls_protocol_handlers: Vec<Box<dyn ProtocolAcceptHandler + 'static>>, pub tls_protocol_handlers: Vec<Box<dyn ProtocolAcceptHandler + 'static>>,
pub tls_acceptor: Option<TlsAcceptor>, pub tls_acceptor: Option<TlsAcceptor>,

View File

@ -8,7 +8,7 @@ use super::*;
use std::io; use std::io;
#[derive(Debug)] #[derive(Debug)]
pub(in crate::network_manager) enum ProtocolNetworkConnection { pub(crate) enum ProtocolNetworkConnection {
// Dummy(DummyNetworkConnection), // Dummy(DummyNetworkConnection),
RawTcp(tcp::RawTcpNetworkConnection), RawTcp(tcp::RawTcpNetworkConnection),
WsAccepted(ws::WebSocketNetworkConnectionAccepted), WsAccepted(ws::WebSocketNetworkConnectionAccepted),

View File

@ -42,11 +42,16 @@ impl RawTcpNetworkConnection {
if message.len() > MAX_MESSAGE_SIZE { if message.len() > MAX_MESSAGE_SIZE {
bail_io_error_other!("sending too large TCP message"); bail_io_error_other!("sending too large TCP message");
} }
let len = message.len() as u16; let len = message.len() as u16;
let header = [b'V', b'L', len as u8, (len >> 8) as u8]; let header = [b'V', b'L', len as u8, (len >> 8) as u8];
network_result_try!(stream.write_all(&header).await.into_network_result()?); let mut data = Vec::with_capacity(message.len() + 4);
network_result_try!(stream.write_all(&message).await.into_network_result()?); data.extend_from_slice(&header);
data.extend_from_slice(&message);
network_result_try!(stream.write_all(&data).await.into_network_result()?);
stream.flush().await.into_network_result() stream.flush().await.into_network_result()
} }
@ -100,7 +105,7 @@ impl RawTcpNetworkConnection {
/////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////
#[derive(Clone)] #[derive(Clone)]
pub(in crate::network_manager) struct RawTcpProtocolHandler pub struct RawTcpProtocolHandler
where where
Self: ProtocolAcceptHandler, Self: ProtocolAcceptHandler,
{ {

View File

@ -2,7 +2,7 @@ use super::*;
use sockets::*; use sockets::*;
#[derive(Clone)] #[derive(Clone)]
pub(in crate::network_manager) struct RawUdpProtocolHandler { pub struct RawUdpProtocolHandler {
socket: Arc<UdpSocket>, socket: Arc<UdpSocket>,
assembly_buffer: AssemblyBuffer, assembly_buffer: AssemblyBuffer,
address_filter: Option<AddressFilter>, address_filter: Option<AddressFilter>,

View File

@ -181,7 +181,7 @@ struct WebsocketProtocolHandlerArc {
} }
#[derive(Clone)] #[derive(Clone)]
pub(in crate::network_manager) struct WebsocketProtocolHandler pub struct WebsocketProtocolHandler
where where
Self: ProtocolAcceptHandler, Self: ProtocolAcceptHandler,
{ {

View File

@ -5,7 +5,7 @@ mod upnp_task;
use super::*; use super::*;
impl Network { impl Network {
pub(crate) fn setup_tasks(&self) { pub fn setup_tasks(&self) {
// Set update network class tick task // Set update network class tick task
{ {
let this = self.clone(); let this = self.clone();
@ -45,7 +45,7 @@ impl Network {
} }
#[instrument(level = "trace", target = "net", name = "Network::tick", skip_all, err)] #[instrument(level = "trace", target = "net", name = "Network::tick", skip_all, err)]
pub(crate) async fn tick(&self) -> EyreResult<()> { pub async fn tick(&self) -> EyreResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up"); log_net!(debug "ignoring due to not started up");
return Ok(()); return Ok(());

View File

@ -11,7 +11,7 @@ cfg_if::cfg_if! {
/////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////
// Accept // Accept
pub(in crate::network_manager) trait ProtocolAcceptHandler: ProtocolAcceptHandlerClone + Send + Sync { pub(crate) trait ProtocolAcceptHandler: ProtocolAcceptHandlerClone + Send + Sync {
fn on_accept( fn on_accept(
&self, &self,
stream: AsyncPeekStream, stream: AsyncPeekStream,
@ -20,7 +20,7 @@ cfg_if::cfg_if! {
) -> SendPinBoxFuture<io::Result<Option<ProtocolNetworkConnection>>>; ) -> SendPinBoxFuture<io::Result<Option<ProtocolNetworkConnection>>>;
} }
pub(in crate::network_manager) trait ProtocolAcceptHandlerClone { pub(crate) trait ProtocolAcceptHandlerClone {
fn clone_box(&self) -> Box<dyn ProtocolAcceptHandler>; fn clone_box(&self) -> Box<dyn ProtocolAcceptHandler>;
} }
@ -38,7 +38,7 @@ cfg_if::cfg_if! {
} }
} }
pub(in crate::network_manager) type NewProtocolAcceptHandler = pub(crate) type NewProtocolAcceptHandler =
dyn Fn(VeilidConfig, bool) -> Box<dyn ProtocolAcceptHandler> + Send; dyn Fn(VeilidConfig, bool) -> Box<dyn ProtocolAcceptHandler> + Send;
} }
} }
@ -84,7 +84,7 @@ pub struct NetworkConnectionStats {
/// Represents a connection in the connection table for connection-oriented protocols /// Represents a connection in the connection table for connection-oriented protocols
#[derive(Debug)] #[derive(Debug)]
pub(in crate::network_manager) struct NetworkConnection { pub(crate) struct NetworkConnection {
/// A unique id for this connection /// A unique id for this connection
connection_id: NetworkConnectionId, connection_id: NetworkConnectionId,
/// The dial info used to make this connection if it was made with 'connect' /// The dial info used to make this connection if it was made with 'connect'

View File

@ -7,7 +7,7 @@ use routing_table::*;
use stop_token::future::FutureExt; use stop_token::future::FutureExt;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) enum ReceiptEvent { pub enum ReceiptEvent {
ReturnedOutOfBand, ReturnedOutOfBand,
ReturnedInBand { ReturnedInBand {
inbound_noderef: FilteredNodeRef, inbound_noderef: FilteredNodeRef,
@ -29,7 +29,7 @@ pub(super) enum ReceiptReturned {
Private { private_route: PublicKey }, Private { private_route: PublicKey },
} }
pub(crate) trait ReceiptCallback: Send + 'static { pub trait ReceiptCallback: Send + 'static {
fn call( fn call(
&self, &self,
event: ReceiptEvent, event: ReceiptEvent,
@ -143,7 +143,6 @@ impl PartialOrd for ReceiptRecordTimestampSort {
/////////////////////////////////// ///////////////////////////////////
struct ReceiptManagerInner { struct ReceiptManagerInner {
network_manager: NetworkManager,
records_by_nonce: BTreeMap<Nonce, Arc<Mutex<ReceiptRecord>>>, records_by_nonce: BTreeMap<Nonce, Arc<Mutex<ReceiptRecord>>>,
next_oldest_ts: Option<Timestamp>, next_oldest_ts: Option<Timestamp>,
stop_source: Option<StopSource>, stop_source: Option<StopSource>,
@ -161,9 +160,8 @@ pub(super) struct ReceiptManager {
} }
impl ReceiptManager { impl ReceiptManager {
fn new_inner(network_manager: NetworkManager) -> ReceiptManagerInner { fn new_inner() -> ReceiptManagerInner {
ReceiptManagerInner { ReceiptManagerInner {
network_manager,
records_by_nonce: BTreeMap::new(), records_by_nonce: BTreeMap::new(),
next_oldest_ts: None, next_oldest_ts: None,
stop_source: None, stop_source: None,
@ -171,19 +169,15 @@ impl ReceiptManager {
} }
} }
pub fn new(network_manager: NetworkManager) -> Self { pub fn new() -> Self {
Self { Self {
inner: Arc::new(Mutex::new(Self::new_inner(network_manager))), inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(ReceiptManagerUnlockedInner { unlocked_inner: Arc::new(ReceiptManagerUnlockedInner {
startup_lock: StartupLock::new(), startup_lock: StartupLock::new(),
}), }),
} }
} }
pub fn network_manager(&self) -> NetworkManager {
self.inner.lock().network_manager.clone()
}
pub async fn startup(&self) -> EyreResult<()> { pub async fn startup(&self) -> EyreResult<()> {
let guard = self.unlocked_inner.startup_lock.startup()?; let guard = self.unlocked_inner.startup_lock.startup()?;
log_net!(debug "startup receipt manager"); log_net!(debug "startup receipt manager");
@ -322,8 +316,6 @@ impl ReceiptManager {
return; return;
}; };
let network_manager = self.network_manager();
// Stop all tasks // Stop all tasks
let timeout_task = { let timeout_task = {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
@ -338,7 +330,7 @@ impl ReceiptManager {
panic!("joining timeout task failed"); panic!("joining timeout task failed");
} }
*self.inner.lock() = Self::new_inner(network_manager); *self.inner.lock() = Self::new_inner();
guard.success(); guard.success();
log_net!(debug "finished receipt manager shutdown"); log_net!(debug "finished receipt manager shutdown");

View File

@ -16,7 +16,7 @@ impl NetworkManager {
/// NodeContactMethod calculation requires first calculating the per-RoutingDomain ContactMethod /// NodeContactMethod calculation requires first calculating the per-RoutingDomain ContactMethod
/// between the source and destination PeerInfo, which is a stateless operation. /// between the source and destination PeerInfo, which is a stateless operation.
#[instrument(level = "trace", target = "net", skip_all, err)] #[instrument(level = "trace", target = "net", skip_all, err)]
pub(crate) async fn send_data( pub async fn send_data(
&self, &self,
destination_node_ref: FilteredNodeRef, destination_node_ref: FilteredNodeRef,
data: Vec<u8>, data: Vec<u8>,
@ -34,7 +34,7 @@ impl NetworkManager {
} }
#[instrument(level = "trace", target = "net", skip_all)] #[instrument(level = "trace", target = "net", skip_all)]
pub(crate) fn try_possibly_relayed_contact_method( pub fn try_possibly_relayed_contact_method(
&self, &self,
possibly_relayed_contact_method: NodeContactMethod, possibly_relayed_contact_method: NodeContactMethod,
destination_node_ref: FilteredNodeRef, destination_node_ref: FilteredNodeRef,
@ -400,7 +400,7 @@ impl NetworkManager {
/// Uses NodeRefs to ensure nodes are referenced, this is not a part of 'RoutingTable' because RoutingTable is not /// Uses NodeRefs to ensure nodes are referenced, this is not a part of 'RoutingTable' because RoutingTable is not
/// allowed to use NodeRefs due to recursive locking /// allowed to use NodeRefs due to recursive locking
#[instrument(level = "trace", target = "net", skip_all, err)] #[instrument(level = "trace", target = "net", skip_all, err)]
pub(crate) fn get_node_contact_method( pub fn get_node_contact_method(
&self, &self,
target_node_ref: FilteredNodeRef, target_node_ref: FilteredNodeRef,
) -> EyreResult<NodeContactMethod> { ) -> EyreResult<NodeContactMethod> {

View File

@ -35,7 +35,7 @@ impl Default for NetworkManagerStats {
impl NetworkManager { impl NetworkManager {
// Callbacks from low level network for statistics gathering // Callbacks from low level network for statistics gathering
pub(crate) fn stats_packet_sent(&self, addr: IpAddr, bytes: ByteCount) { pub fn stats_packet_sent(&self, addr: IpAddr, bytes: ByteCount) {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
inner inner
.stats .stats
@ -52,7 +52,7 @@ impl NetworkManager {
.add_up(bytes); .add_up(bytes);
} }
pub(crate) fn stats_packet_rcvd(&self, addr: IpAddr, bytes: ByteCount) { pub fn stats_packet_rcvd(&self, addr: IpAddr, bytes: ByteCount) {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
inner inner
.stats .stats

View File

@ -3,7 +3,7 @@ pub mod rolling_transfers;
use super::*; use super::*;
impl NetworkManager { impl NetworkManager {
pub(crate) fn setup_tasks(&self) { pub fn setup_tasks(&self) {
// Set rolling transfers tick task // Set rolling transfers tick task
{ {
let this = self.clone(); let this = self.clone();
@ -60,7 +60,7 @@ impl NetworkManager {
Ok(()) Ok(())
} }
pub(crate) async fn cancel_tasks(&self) { pub async fn cancel_tasks(&self) {
log_net!(debug "stopping rolling transfers task"); log_net!(debug "stopping rolling transfers task");
if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await {
warn!("rolling_transfers_task not stopped: {}", e); warn!("rolling_transfers_task not stopped: {}", e);

View File

@ -3,7 +3,7 @@ use super::*;
impl NetworkManager { impl NetworkManager {
// Compute transfer statistics for the low level network // Compute transfer statistics for the low level network
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(crate) async fn rolling_transfers_task_routine( pub async fn rolling_transfers_task_routine(
self, self,
_stop_token: StopToken, _stop_token: StopToken,
last_ts: Timestamp, last_ts: Timestamp,

View File

@ -2,7 +2,7 @@ use super::*;
// Ordering here matters, IPV6 is preferred to IPV4 in dial info sorts // Ordering here matters, IPV6 is preferred to IPV4 in dial info sorts
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)] #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
pub enum Address { pub(crate) enum Address {
IPV6(Ipv6Addr), IPV6(Ipv6Addr),
IPV4(Ipv4Addr), IPV4(Ipv4Addr),
} }

View File

@ -4,8 +4,8 @@ use super::*;
#[allow(clippy::derived_hash_with_manual_eq)] #[allow(clippy::derived_hash_with_manual_eq)]
#[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)] #[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)]
#[enumset(repr = "u8")] #[enumset(repr = "u8")]
pub enum AddressType { pub(crate) enum AddressType {
IPV6 = 0, IPV6 = 0,
IPV4 = 1, IPV4 = 1,
} }
pub type AddressTypeSet = EnumSet<AddressType>; pub(crate) type AddressTypeSet = EnumSet<AddressType>;

View File

@ -5,16 +5,16 @@ mod wss;
use super::*; use super::*;
pub use tcp::*; pub(crate) use tcp::*;
pub use udp::*; pub(crate) use udp::*;
pub use ws::*; pub(crate) use ws::*;
pub use wss::*; pub(crate) use wss::*;
// Keep member order appropriate for sorting < preference // Keep member order appropriate for sorting < preference
// Must match ProtocolType order // Must match ProtocolType order
#[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
#[serde(tag = "kind")] #[serde(tag = "kind")]
pub enum DialInfo { pub(crate) enum DialInfo {
UDP(DialInfoUDP), UDP(DialInfoUDP),
TCP(DialInfoTCP), TCP(DialInfoTCP),
WS(DialInfoWS), WS(DialInfoWS),

View File

@ -1,6 +1,6 @@
use super::*; use super::*;
#[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)] #[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
pub struct DialInfoTCP { pub(crate) struct DialInfoTCP {
pub socket_address: SocketAddress, pub socket_address: SocketAddress,
} }

View File

@ -1,6 +1,6 @@
use super::*; use super::*;
#[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)] #[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
pub struct DialInfoUDP { pub(crate) struct DialInfoUDP {
pub socket_address: SocketAddress, pub socket_address: SocketAddress,
} }

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)] #[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
pub struct DialInfoWS { pub(crate) struct DialInfoWS {
pub socket_address: SocketAddress, pub socket_address: SocketAddress,
pub request: String, pub request: String,
} }

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)] #[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
pub struct DialInfoWSS { pub(crate) struct DialInfoWSS {
pub socket_address: SocketAddress, pub socket_address: SocketAddress,
pub request: String, pub request: String,
} }

View File

@ -2,7 +2,7 @@ use super::*;
// Keep member order appropriate for sorting < preference // Keep member order appropriate for sorting < preference
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum DialInfoClass { pub(crate) enum DialInfoClass {
Direct = 0, // D = Directly reachable with public IP and no firewall, with statically configured port Direct = 0, // D = Directly reachable with public IP and no firewall, with statically configured port
Mapped = 1, // M = Directly reachable with via portmap behind any NAT or firewalled with dynamically negotiated port Mapped = 1, // M = Directly reachable with via portmap behind any NAT or firewalled with dynamically negotiated port
FullConeNAT = 2, // F = Directly reachable device without portmap behind full-cone NAT (or manually mapped firewall port with no configuration change) FullConeNAT = 2, // F = Directly reachable device without portmap behind full-cone NAT (or manually mapped firewall port with no configuration change)

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] #[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct DialInfoFilter { pub(crate) struct DialInfoFilter {
pub protocol_type_set: ProtocolTypeSet, pub protocol_type_set: ProtocolTypeSet,
pub address_type_set: AddressTypeSet, pub address_type_set: AddressTypeSet,
} }

View File

@ -0,0 +1,13 @@
use super::*;
pub(crate) struct PeerInfoChangeEvent {
pub peer_info: Arc<PeerInfo>,
}
pub(crate) struct SocketAddressChangeEvent {
pub routing_domain: RoutingDomain, // the routing domain this flow is over
pub socket_address: SocketAddress, // the socket address as seen by the remote peer
pub old_socket_address: Option<SocketAddress>, // the socket address previously for this peer
pub flow: Flow, // the flow used
pub reporting_peer: NodeRef, // the peer's noderef reporting the socket address
}

View File

@ -12,7 +12,7 @@ use super::*;
/// ///
#[derive(Copy, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] #[derive(Copy, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct Flow { pub(crate) struct Flow {
remote: PeerAddress, remote: PeerAddress,
local: Option<SocketAddress>, local: Option<SocketAddress>,
} }
@ -75,7 +75,7 @@ impl MatchesDialInfoFilter for Flow {
/// The NetworkConnectionId associated with each flow may represent a low level network connection /// The NetworkConnectionId associated with each flow may represent a low level network connection
/// and will be unique with high probability per low-level connection /// and will be unique with high probability per low-level connection
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct UniqueFlow { pub(crate) struct UniqueFlow {
pub flow: Flow, pub flow: Flow,
pub connection_id: Option<NetworkConnectionId>, pub connection_id: Option<NetworkConnectionId>,
} }
@ -95,4 +95,4 @@ impl fmt::Display for UniqueFlow {
} }
} }
pub type NetworkConnectionId = AlignedU64; pub(crate) type NetworkConnectionId = AlignedU64;

View File

@ -5,7 +5,7 @@ use super::*;
// Keep member order appropriate for sorting < preference // Keep member order appropriate for sorting < preference
// Must match DialInfo order // Must match DialInfo order
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum LowLevelProtocolType { pub(crate) enum LowLevelProtocolType {
UDP = 0, UDP = 0,
TCP = 1, TCP = 1,
} }

View File

@ -3,6 +3,7 @@ mod address_type;
mod dial_info; mod dial_info;
mod dial_info_class; mod dial_info_class;
mod dial_info_filter; mod dial_info_filter;
mod events;
mod flow; mod flow;
mod low_level_protocol_type; mod low_level_protocol_type;
mod network_class; mod network_class;
@ -15,17 +16,18 @@ mod socket_address;
use super::*; use super::*;
pub use address::*; pub(crate) use address::*;
pub use address_type::*; pub(crate) use address_type::*;
pub use dial_info::*; pub(crate) use dial_info::*;
pub use dial_info_class::*; pub(crate) use dial_info_class::*;
pub use dial_info_filter::*; pub(crate) use dial_info_filter::*;
pub use flow::*; pub(crate) use events::*;
pub use low_level_protocol_type::*; pub(crate) use flow::*;
pub use network_class::*; pub(crate) use low_level_protocol_type::*;
pub use peer_address::*; pub(crate) use network_class::*;
pub use protocol_type::*; pub(crate) use peer_address::*;
pub use punishment::*; pub(crate) use protocol_type::*;
pub use relay_kind::*; pub(crate) use punishment::*;
pub use signal_info::*; pub(crate) use relay_kind::*;
pub use socket_address::*; pub(crate) use signal_info::*;
pub(crate) use socket_address::*;

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum NetworkClass { pub(crate) enum NetworkClass {
InboundCapable = 0, // I = Inbound capable without relay, may require signal InboundCapable = 0, // I = Inbound capable without relay, may require signal
OutboundOnly = 1, // O = Outbound only, inbound relay required except with reverse connect signal OutboundOnly = 1, // O = Outbound only, inbound relay required except with reverse connect signal
WebApp = 2, // W = PWA, outbound relay is required in most cases WebApp = 2, // W = PWA, outbound relay is required in most cases

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)] #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
pub struct PeerAddress { pub(crate) struct PeerAddress {
protocol_type: ProtocolType, protocol_type: ProtocolType,
#[serde(with = "as_human_string")] #[serde(with = "as_human_string")]
socket_address: SocketAddress, socket_address: SocketAddress,

View File

@ -6,7 +6,7 @@ use super::*;
#[allow(clippy::derived_hash_with_manual_eq)] #[allow(clippy::derived_hash_with_manual_eq)]
#[derive(Debug, PartialOrd, Ord, Hash, EnumSetType, Serialize, Deserialize)] #[derive(Debug, PartialOrd, Ord, Hash, EnumSetType, Serialize, Deserialize)]
#[enumset(repr = "u8")] #[enumset(repr = "u8")]
pub enum ProtocolType { pub(crate) enum ProtocolType {
UDP = 0, UDP = 0,
TCP = 1, TCP = 1,
WS = 2, WS = 2,

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PunishmentReason { pub(crate) enum PunishmentReason {
// IP-level punishments // IP-level punishments
FailedToDecryptEnvelopeBody, FailedToDecryptEnvelopeBody,
FailedToDecodeEnvelope, FailedToDecodeEnvelope,
@ -17,7 +17,7 @@ pub enum PunishmentReason {
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Punishment { pub(crate) struct Punishment {
pub reason: PunishmentReason, pub reason: PunishmentReason,
pub timestamp: Timestamp, pub timestamp: Timestamp,
} }

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum RelayKind { pub(crate) enum RelayKind {
Inbound = 0, Inbound = 0,
Outbound = 1, Outbound = 1,
} }

View File

@ -2,7 +2,7 @@ use super::*;
/// Parameter for Signal operation /// Parameter for Signal operation
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum SignalInfo { pub(crate) enum SignalInfo {
/// UDP Hole Punch Request /// UDP Hole Punch Request
HolePunch { HolePunch {
/// /// Receipt to be returned after the hole punch /// /// Receipt to be returned after the hole punch

View File

@ -3,7 +3,7 @@ use super::*;
#[derive( #[derive(
Copy, Default, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize, Copy, Default, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize,
)] )]
pub struct SocketAddress { pub(crate) struct SocketAddress {
address: Address, address: Address,
port: u16, port: u16,
} }

View File

@ -75,7 +75,7 @@ struct NetworkUnlockedInner {
} }
#[derive(Clone)] #[derive(Clone)]
pub(in crate::network_manager) struct Network { pub(super) struct Network {
config: VeilidConfig, config: VeilidConfig,
inner: Arc<Mutex<NetworkInner>>, inner: Arc<Mutex<NetworkInner>>,
unlocked_inner: Arc<NetworkUnlockedInner>, unlocked_inner: Arc<NetworkUnlockedInner>,
@ -510,7 +510,7 @@ impl Network {
////////////////////////////////////////// //////////////////////////////////////////
#[instrument(level = "trace", target = "net", name = "Network::tick", skip_all, err)] #[instrument(level = "trace", target = "net", name = "Network::tick", skip_all, err)]
pub(crate) async fn tick(&self) -> EyreResult<()> { pub async fn tick(&self) -> EyreResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up"); log_net!(debug "ignoring due to not started up");
return Ok(()); return Ok(());

View File

@ -2,7 +2,7 @@ use super::*;
use routing_table::tasks::bootstrap::BOOTSTRAP_TXT_VERSION_0; use routing_table::tasks::bootstrap::BOOTSTRAP_TXT_VERSION_0;
impl RoutingTable { impl RoutingTable {
pub(crate) async fn debug_info_txtrecord(&self) -> String { pub async fn debug_info_txtrecord(&self) -> String {
let mut out = String::new(); let mut out = String::new();
let gdis = self.dial_info_details(RoutingDomain::PublicInternet); let gdis = self.dial_info_details(RoutingDomain::PublicInternet);
@ -55,7 +55,7 @@ impl RoutingTable {
out out
} }
pub(crate) fn debug_info_nodeid(&self) -> String { pub fn debug_info_nodeid(&self) -> String {
let mut out = String::new(); let mut out = String::new();
for nid in self.unlocked_inner.node_ids().iter() { for nid in self.unlocked_inner.node_ids().iter() {
out += &format!("{}\n", nid); out += &format!("{}\n", nid);
@ -63,7 +63,7 @@ impl RoutingTable {
out out
} }
pub(crate) fn debug_info_nodeinfo(&self) -> String { pub fn debug_info_nodeinfo(&self) -> String {
let mut out = String::new(); let mut out = String::new();
let inner = self.inner.read(); let inner = self.inner.read();
out += &format!("Node Ids: {}\n", self.unlocked_inner.node_ids()); out += &format!("Node Ids: {}\n", self.unlocked_inner.node_ids());
@ -76,7 +76,7 @@ impl RoutingTable {
out out
} }
pub(crate) fn debug_info_dialinfo(&self) -> String { pub fn debug_info_dialinfo(&self) -> String {
let ldis = self.dial_info_details(RoutingDomain::LocalNetwork); let ldis = self.dial_info_details(RoutingDomain::LocalNetwork);
let gdis = self.dial_info_details(RoutingDomain::PublicInternet); let gdis = self.dial_info_details(RoutingDomain::PublicInternet);
let mut out = String::new(); let mut out = String::new();
@ -92,11 +92,7 @@ impl RoutingTable {
out out
} }
pub(crate) fn debug_info_peerinfo( pub fn debug_info_peerinfo(&self, routing_domain: RoutingDomain, published: bool) -> String {
&self,
routing_domain: RoutingDomain,
published: bool,
) -> String {
let mut out = String::new(); let mut out = String::new();
if published { if published {
let pistr = if let Some(pi) = self.get_published_peer_info(routing_domain) { let pistr = if let Some(pi) = self.get_published_peer_info(routing_domain) {
@ -189,7 +185,7 @@ impl RoutingTable {
) )
} }
pub(crate) fn debug_info_entries( pub fn debug_info_entries(
&self, &self,
min_state: BucketEntryState, min_state: BucketEntryState,
capabilities: Vec<FourCC>, capabilities: Vec<FourCC>,
@ -272,7 +268,7 @@ impl RoutingTable {
out out
} }
pub(crate) fn debug_info_entries_fastest( pub fn debug_info_entries_fastest(
&self, &self,
min_state: BucketEntryState, min_state: BucketEntryState,
capabilities: Vec<FourCC>, capabilities: Vec<FourCC>,
@ -354,7 +350,7 @@ impl RoutingTable {
out out
} }
pub(crate) fn debug_info_entry(&self, node_ref: NodeRef) -> String { pub fn debug_info_entry(&self, node_ref: NodeRef) -> String {
let cur_ts = Timestamp::now(); let cur_ts = Timestamp::now();
let mut out = String::new(); let mut out = String::new();
@ -369,7 +365,7 @@ impl RoutingTable {
out out
} }
pub(crate) fn debug_info_buckets(&self, min_state: BucketEntryState) -> String { pub fn debug_info_buckets(&self, min_state: BucketEntryState) -> String {
let inner = self.inner.read(); let inner = self.inner.read();
let inner = &*inner; let inner = &*inner;
let cur_ts = Timestamp::now(); let cur_ts = Timestamp::now();

View File

@ -165,7 +165,7 @@ impl RoutingTable {
/// Determine if set of peers is closer to key_near than key_far is to key_near /// Determine if set of peers is closer to key_near than key_far is to key_near
#[instrument(level = "trace", target = "rtab", skip_all, err)] #[instrument(level = "trace", target = "rtab", skip_all, err)]
pub(crate) fn verify_peers_closer( pub fn verify_peers_closer(
vcrypto: CryptoSystemVersion, vcrypto: CryptoSystemVersion,
key_far: TypedKey, key_far: TypedKey,
key_near: TypedKey, key_near: TypedKey,

View File

@ -12,6 +12,14 @@ mod types;
pub mod tests; pub mod tests;
pub(crate) use bucket_entry::*;
pub(crate) use node_ref::*;
pub(crate) use privacy::*;
pub(crate) use route_spec_store::*;
pub(crate) use routing_table_inner::*;
pub(crate) use stats_accounting::*;
pub use types::*;
use super::*; use super::*;
use crate::crypto::*; use crate::crypto::*;
@ -21,15 +29,6 @@ use crate::rpc_processor::*;
use bucket::*; use bucket::*;
use hashlink::LruCache; use hashlink::LruCache;
pub(crate) use bucket_entry::*;
pub(crate) use node_ref::*;
pub(crate) use privacy::*;
pub(crate) use route_spec_store::*;
pub(crate) use routing_table_inner::*;
pub(crate) use stats_accounting::*;
pub use types::*;
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
/// How many nodes in our routing table we require for a functional PublicInternet RoutingDomain /// How many nodes in our routing table we require for a functional PublicInternet RoutingDomain
@ -61,14 +60,14 @@ pub struct LowLevelPortInfo {
pub low_level_protocol_ports: LowLevelProtocolPorts, pub low_level_protocol_ports: LowLevelProtocolPorts,
pub protocol_to_port: ProtocolToPortMapping, pub protocol_to_port: ProtocolToPortMapping,
} }
pub(crate) type RoutingTableEntryFilter<'t> = pub type RoutingTableEntryFilter<'t> =
Box<dyn FnMut(&RoutingTableInner, Option<Arc<BucketEntry>>) -> bool + Send + 't>; Box<dyn FnMut(&RoutingTableInner, Option<Arc<BucketEntry>>) -> bool + Send + 't>;
type SerializedBuckets = Vec<Vec<u8>>; type SerializedBuckets = Vec<Vec<u8>>;
type SerializedBucketMap = BTreeMap<CryptoKind, SerializedBuckets>; type SerializedBucketMap = BTreeMap<CryptoKind, SerializedBuckets>;
#[derive(Clone, Debug, Default, Eq, PartialEq)] #[derive(Clone, Debug, Default, Eq, PartialEq)]
pub(crate) struct RoutingTableHealth { pub struct RoutingTableHealth {
/// Number of reliable (long-term responsive) entries in the routing table /// Number of reliable (long-term responsive) entries in the routing table
pub reliable_entry_count: usize, pub reliable_entry_count: usize,
/// Number of unreliable (occasionally unresponsive) entries in the routing table /// Number of unreliable (occasionally unresponsive) entries in the routing table
@ -86,12 +85,13 @@ pub(crate) struct RoutingTableHealth {
pub type BucketIndex = (CryptoKind, usize); pub type BucketIndex = (CryptoKind, usize);
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub(crate) struct RecentPeersEntry { pub struct RecentPeersEntry {
pub last_connection: Flow, pub last_connection: Flow,
} }
pub(crate) struct RoutingTableUnlockedInner { pub(crate) struct RoutingTableUnlockedInner {
// Accessors // Accessors
event_bus: EventBus,
config: VeilidConfig, config: VeilidConfig,
network_manager: NetworkManager, network_manager: NetworkManager,
@ -214,12 +214,14 @@ pub(crate) struct RoutingTable {
impl RoutingTable { impl RoutingTable {
fn new_unlocked_inner( fn new_unlocked_inner(
event_bus: EventBus,
config: VeilidConfig, config: VeilidConfig,
network_manager: NetworkManager, network_manager: NetworkManager,
) -> RoutingTableUnlockedInner { ) -> RoutingTableUnlockedInner {
let c = config.get(); let c = config.get();
RoutingTableUnlockedInner { RoutingTableUnlockedInner {
event_bus,
config: config.clone(), config: config.clone(),
network_manager, network_manager,
node_id: c.network.routing_table.node_id.clone(), node_id: c.network.routing_table.node_id.clone(),
@ -268,8 +270,9 @@ impl RoutingTable {
} }
} }
pub fn new(network_manager: NetworkManager) -> Self { pub fn new(network_manager: NetworkManager) -> Self {
let event_bus = network_manager.event_bus();
let config = network_manager.config(); let config = network_manager.config();
let unlocked_inner = Arc::new(Self::new_unlocked_inner(config, network_manager)); let unlocked_inner = Arc::new(Self::new_unlocked_inner(event_bus, config, network_manager));
let inner = Arc::new(RwLock::new(RoutingTableInner::new(unlocked_inner.clone()))); let inner = Arc::new(RwLock::new(RoutingTableInner::new(unlocked_inner.clone())));
let this = Self { let this = Self {
inner, inner,

View File

@ -1,6 +1,6 @@
use super::*; use super::*;
pub struct FilteredNodeRef { pub(crate) struct FilteredNodeRef {
routing_table: RoutingTable, routing_table: RoutingTable,
entry: Arc<BucketEntry>, entry: Arc<BucketEntry>,
filter: NodeRefFilter, filter: NodeRefFilter,

View File

@ -6,16 +6,16 @@ mod traits;
use super::*; use super::*;
pub use filtered_node_ref::*; pub(crate) use filtered_node_ref::*;
pub use node_ref_filter::*; pub(crate) use node_ref_filter::*;
pub use node_ref_lock::*; pub(crate) use node_ref_lock::*;
pub use node_ref_lock_mut::*; pub(crate) use node_ref_lock_mut::*;
pub use traits::*; pub(crate) use traits::*;
/////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////
// Default NodeRef // Default NodeRef
pub struct NodeRef { pub(crate) struct NodeRef {
routing_table: RoutingTable, routing_table: RoutingTable,
entry: Arc<BucketEntry>, entry: Arc<BucketEntry>,
#[cfg(feature = "tracking")] #[cfg(feature = "tracking")]

View File

@ -1,13 +1,13 @@
use super::*; use super::*;
pub type LockedNodeRef<'a> = NodeRefLock<'a, NodeRef>; pub(crate) type LockedNodeRef<'a> = NodeRefLock<'a, NodeRef>;
pub type LockedFilteredNodeRef<'a> = NodeRefLock<'a, FilteredNodeRef>; pub(crate) type LockedFilteredNodeRef<'a> = NodeRefLock<'a, FilteredNodeRef>;
/// Locked reference to a routing table entry /// Locked reference to a routing table entry
/// For internal use inside the RoutingTable module where you have /// For internal use inside the RoutingTable module where you have
/// already locked a RoutingTableInner /// already locked a RoutingTableInner
/// Keeps entry in the routing table until all references are gone /// Keeps entry in the routing table until all references are gone
pub struct NodeRefLock< pub(crate) struct NodeRefLock<
'a, 'a,
N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone,
> { > {

View File

@ -1,13 +1,13 @@
use super::*; use super::*;
pub type LockedMutNodeRef<'a> = NodeRefLockMut<'a, NodeRef>; pub(crate) type LockedMutNodeRef<'a> = NodeRefLockMut<'a, NodeRef>;
pub type LockedMutFilteredNodeRef<'a> = NodeRefLockMut<'a, FilteredNodeRef>; pub(crate) type LockedMutFilteredNodeRef<'a> = NodeRefLockMut<'a, FilteredNodeRef>;
/// Mutable locked reference to a routing table entry /// Mutable locked reference to a routing table entry
/// For internal use inside the RoutingTable module where you have /// For internal use inside the RoutingTable module where you have
/// already locked a RoutingTableInner /// already locked a RoutingTableInner
/// Keeps entry in the routing table until all references are gone /// Keeps entry in the routing table until all references are gone
pub struct NodeRefLockMut< pub(crate) struct NodeRefLockMut<
'a, 'a,
N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone,
> { > {

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
// Field accessors // Field accessors
pub trait NodeRefAccessorsTrait { pub(crate) trait NodeRefAccessorsTrait {
fn routing_table(&self) -> RoutingTable; fn routing_table(&self) -> RoutingTable;
fn entry(&self) -> Arc<BucketEntry>; fn entry(&self) -> Arc<BucketEntry>;
fn sequencing(&self) -> Sequencing; fn sequencing(&self) -> Sequencing;
@ -14,7 +14,7 @@ pub trait NodeRefAccessorsTrait {
} }
// Operate on entry // Operate on entry
pub trait NodeRefOperateTrait { pub(crate) trait NodeRefOperateTrait {
fn operate<T, F>(&self, f: F) -> T fn operate<T, F>(&self, f: F) -> T
where where
F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> T; F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> T;
@ -24,7 +24,7 @@ pub trait NodeRefOperateTrait {
} }
// Common Operations // Common Operations
pub trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait { pub(crate) trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait {
fn same_entry<T: NodeRefAccessorsTrait>(&self, other: &T) -> bool { fn same_entry<T: NodeRefAccessorsTrait>(&self, other: &T) -> bool {
Arc::ptr_eq(&self.entry(), &other.entry()) Arc::ptr_eq(&self.entry(), &other.entry())
} }

View File

@ -15,7 +15,7 @@ use route_spec_store_cache::*;
use route_spec_store_content::*; use route_spec_store_content::*;
pub(crate) use route_spec_store_cache::CompiledRoute; pub(crate) use route_spec_store_cache::CompiledRoute;
pub(crate) use route_stats::*; pub use route_stats::*;
/// The size of the remote private route cache /// The size of the remote private route cache
const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024; const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024;

View File

@ -2,7 +2,7 @@ use super::*;
/// What remote private routes have seen /// What remote private routes have seen
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
pub(crate) struct RemotePrivateRouteInfo { pub struct RemotePrivateRouteInfo {
/// The private routes themselves /// The private routes themselves
private_routes: Vec<PrivateRoute>, private_routes: Vec<PrivateRoute>,
/// Did this remote private route see our node info due to no safety route in use /// Did this remote private route see our node info due to no safety route in use

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct RouteSpecDetail { pub struct RouteSpecDetail {
/// Crypto kind /// Crypto kind
pub crypto_kind: CryptoKind, pub crypto_kind: CryptoKind,
/// Secret key /// Secret key
@ -11,7 +11,7 @@ pub(crate) struct RouteSpecDetail {
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct RouteSetSpecDetail { pub struct RouteSetSpecDetail {
/// Route set per crypto kind /// Route set per crypto kind
route_set: BTreeMap<PublicKey, RouteSpecDetail>, route_set: BTreeMap<PublicKey, RouteSpecDetail>,
/// Route noderefs /// Route noderefs

View File

@ -9,7 +9,7 @@ struct CompiledRouteCacheKey {
/// Compiled route (safety route + private route) /// Compiled route (safety route + private route)
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) struct CompiledRoute { pub struct CompiledRoute {
/// The safety route attached to the private route /// The safety route attached to the private route
pub safety_route: SafetyRoute, pub safety_route: SafetyRoute,
/// The secret used to encrypt the message payload /// The secret used to encrypt the message payload
@ -21,7 +21,7 @@ pub(crate) struct CompiledRoute {
/// Ephemeral data used to help the RouteSpecStore operate efficiently /// Ephemeral data used to help the RouteSpecStore operate efficiently
#[derive(Debug)] #[derive(Debug)]
pub(super) struct RouteSpecStoreCache { pub struct RouteSpecStoreCache {
/// How many times nodes have been used /// How many times nodes have been used
used_nodes: HashMap<PublicKey, usize>, used_nodes: HashMap<PublicKey, usize>,
/// How many times nodes have been used at the terminal point of a route /// How many times nodes have been used at the terminal point of a route

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Clone, Debug, Default, Serialize, Deserialize)] #[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub(crate) struct RouteStats { pub struct RouteStats {
/// Consecutive failed to send count /// Consecutive failed to send count
#[serde(skip)] #[serde(skip)]
pub failed_to_send: u32, pub failed_to_send: u32,

View File

@ -14,7 +14,7 @@ pub type EntryCounts = BTreeMap<(RoutingDomain, CryptoKind), usize>;
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
/// RoutingTable rwlock-internal data /// RoutingTable rwlock-internal data
pub(crate) struct RoutingTableInner { pub struct RoutingTableInner {
/// Extra pointer to unlocked members to simplify access /// Extra pointer to unlocked members to simplify access
pub(super) unlocked_inner: Arc<RoutingTableUnlockedInner>, pub(super) unlocked_inner: Arc<RoutingTableUnlockedInner>,
/// Routing table buckets that hold references to entries, per crypto kind /// Routing table buckets that hold references to entries, per crypto kind
@ -1388,7 +1388,7 @@ impl RoutingTableInner {
} }
#[instrument(level = "trace", skip_all)] #[instrument(level = "trace", skip_all)]
pub(crate) fn make_closest_noderef_sort( pub fn make_closest_noderef_sort(
crypto: Crypto, crypto: Crypto,
node_id: TypedKey, node_id: TypedKey,
) -> impl Fn(&LockedNodeRef, &LockedNodeRef) -> core::cmp::Ordering { ) -> impl Fn(&LockedNodeRef, &LockedNodeRef) -> core::cmp::Ordering {
@ -1417,7 +1417,7 @@ pub(crate) fn make_closest_noderef_sort(
} }
} }
pub(crate) fn make_closest_node_id_sort( pub fn make_closest_node_id_sort(
crypto: Crypto, crypto: Crypto,
node_id: TypedKey, node_id: TypedKey,
) -> impl Fn(&CryptoKey, &CryptoKey) -> core::cmp::Ordering { ) -> impl Fn(&CryptoKey, &CryptoKey) -> core::cmp::Ordering {

View File

@ -144,9 +144,13 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
pi pi
}; };
rti.unlocked_inner if let Err(e) = rti
.network_manager() .unlocked_inner
.report_peer_info_change(peer_info); .event_bus
.post(PeerInfoChangeEvent { peer_info })
{
log_rtab!(debug "Failed to post event: {}", e);
}
true true
} }
@ -162,7 +166,7 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
let can_contain_address = self.can_contain_address(address); let can_contain_address = self.can_contain_address(address);
if !can_contain_address { if !can_contain_address {
log_rtab!(debug "[LocalNetwork] can not add dial info to this routing domain: {:?}", dial_info); log_network_result!(debug "[LocalNetwork] can not add dial info to this routing domain: {:?}", dial_info);
return false; return false;
} }
if !dial_info.is_valid() { if !dial_info.is_valid() {

View File

@ -9,7 +9,7 @@ pub use local_network::*;
pub use public_internet::*; pub use public_internet::*;
/// General trait for all routing domains /// General trait for all routing domains
pub(crate) trait RoutingDomainDetail { pub trait RoutingDomainDetail {
// Common accessors // Common accessors
#[expect(dead_code)] #[expect(dead_code)]
fn routing_domain(&self) -> RoutingDomain; fn routing_domain(&self) -> RoutingDomain;

View File

@ -122,9 +122,13 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
pi pi
}; };
rti.unlocked_inner if let Err(e) = rti
.network_manager() .unlocked_inner
.report_peer_info_change(peer_info); .event_bus
.post(PeerInfoChangeEvent { peer_info })
{
log_rtab!(debug "Failed to post event: {}", e);
}
true true
} }
@ -140,7 +144,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
let can_contain_address = self.can_contain_address(address); let can_contain_address = self.can_contain_address(address);
if !can_contain_address { if !can_contain_address {
log_rtab!(debug "[PublicInternet] can not add dial info to this routing domain: {:?}", dial_info); log_network_result!(debug "[PublicInternet] can not add dial info to this routing domain: {:?}", dial_info);
return false; return false;
} }
if !dial_info.is_valid() { if !dial_info.is_valid() {

View File

@ -117,7 +117,7 @@ impl RoutingTable {
// Bootstrap lookup process // Bootstrap lookup process
#[instrument(level = "trace", skip(self), ret, err)] #[instrument(level = "trace", skip(self), ret, err)]
pub(crate) async fn resolve_bootstrap( pub async fn resolve_bootstrap(
&self, &self,
bootstrap: Vec<String>, bootstrap: Vec<String>,
) -> EyreResult<Vec<BootstrapRecord>> { ) -> EyreResult<Vec<BootstrapRecord>> {
@ -254,7 +254,7 @@ impl RoutingTable {
} }
//#[instrument(level = "trace", skip(self), err)] //#[instrument(level = "trace", skip(self), err)]
pub(crate) fn bootstrap_with_peer( pub fn bootstrap_with_peer(
self, self,
crypto_kinds: Vec<CryptoKind>, crypto_kinds: Vec<CryptoKind>,
pi: Arc<PeerInfo>, pi: Arc<PeerInfo>,
@ -324,7 +324,7 @@ impl RoutingTable {
} }
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(crate) async fn bootstrap_with_peer_list( pub async fn bootstrap_with_peer_list(
self, self,
peers: Vec<Arc<PeerInfo>>, peers: Vec<Arc<PeerInfo>>,
stop_token: StopToken, stop_token: StopToken,
@ -364,7 +364,7 @@ impl RoutingTable {
} }
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(crate) async fn bootstrap_task_routine(self, stop_token: StopToken) -> EyreResult<()> { pub async fn bootstrap_task_routine(self, stop_token: StopToken) -> EyreResult<()> {
let bootstrap = self let bootstrap = self
.unlocked_inner .unlocked_inner
.with_config(|c| c.network.routing_table.bootstrap.clone()); .with_config(|c| c.network.routing_table.bootstrap.clone());

View File

@ -10,10 +10,7 @@ impl RoutingTable {
/// Ask our closest peers to give us more peers close to ourselves. This will /// Ask our closest peers to give us more peers close to ourselves. This will
/// assist with the DHT and other algorithms that utilize the distance metric. /// assist with the DHT and other algorithms that utilize the distance metric.
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(crate) async fn closest_peers_refresh_task_routine( pub async fn closest_peers_refresh_task_routine(self, stop_token: StopToken) -> EyreResult<()> {
self,
stop_token: StopToken,
) -> EyreResult<()> {
let mut unord = FuturesUnordered::new(); let mut unord = FuturesUnordered::new();
for crypto_kind in VALID_CRYPTO_KINDS { for crypto_kind in VALID_CRYPTO_KINDS {

View File

@ -10,7 +10,7 @@ impl RoutingTable {
// Kick the queued buckets in the routing table to free dead nodes if necessary // Kick the queued buckets in the routing table to free dead nodes if necessary
// Attempts to keep the size of the routing table down to the bucket depth // Attempts to keep the size of the routing table down to the bucket depth
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(crate) async fn kick_buckets_task_routine( pub async fn kick_buckets_task_routine(
self, self,
_stop_token: StopToken, _stop_token: StopToken,
_last_ts: Timestamp, _last_ts: Timestamp,

View File

@ -10,7 +10,7 @@ pub mod update_statistics;
use super::*; use super::*;
impl RoutingTable { impl RoutingTable {
pub(crate) fn setup_tasks(&self) { pub fn setup_tasks(&self) {
// Set rolling transfers tick task // Set rolling transfers tick task
{ {
let this = self.clone(); let this = self.clone();
@ -287,12 +287,12 @@ impl RoutingTable {
Ok(()) Ok(())
} }
pub(crate) async fn pause_tasks(&self) -> AsyncTagLockGuard<&'static str> { pub async fn pause_tasks(&self) -> AsyncTagLockGuard<&'static str> {
let critical_sections = self.inner.read().critical_sections.clone(); let critical_sections = self.inner.read().critical_sections.clone();
critical_sections.lock_tag(LOCK_TAG_TICK).await critical_sections.lock_tag(LOCK_TAG_TICK).await
} }
pub(crate) async fn cancel_tasks(&self) { pub async fn cancel_tasks(&self) {
// Cancel all tasks being ticked // Cancel all tasks being ticked
log_rtab!(debug "stopping rolling transfers task"); log_rtab!(debug "stopping rolling transfers task");
if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await {

View File

@ -12,10 +12,7 @@ impl RoutingTable {
// nodes for their PublicInternet peers, which is a very fast way to get // nodes for their PublicInternet peers, which is a very fast way to get
// a new node online. // a new node online.
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(crate) async fn peer_minimum_refresh_task_routine( pub async fn peer_minimum_refresh_task_routine(self, stop_token: StopToken) -> EyreResult<()> {
self,
stop_token: StopToken,
) -> EyreResult<()> {
// Get counts by crypto kind // Get counts by crypto kind
let entry_count = self.inner.read().cached_entry_counts(); let entry_count = self.inner.read().cached_entry_counts();

View File

@ -2,10 +2,10 @@ use super::*;
/// Keepalive pings are done occasionally to ensure holepunched public dialinfo /// Keepalive pings are done occasionally to ensure holepunched public dialinfo
/// remains valid, as well as to make sure we remain in any relay node's routing table /// remains valid, as well as to make sure we remain in any relay node's routing table
const RELAY_KEEPALIVE_PING_INTERVAL_SECS: u32 = 10; const RELAY_KEEPALIVE_PING_INTERVAL_SECS: u32 = 1;
/// Keepalive pings are done for active watch nodes to make sure they are still there /// Keepalive pings are done for active watch nodes to make sure they are still there
const ACTIVE_WATCH_KEEPALIVE_PING_INTERVAL_SECS: u32 = 10; const ACTIVE_WATCH_KEEPALIVE_PING_INTERVAL_SECS: u32 = 1;
/// Ping queue processing depth per validator /// Ping queue processing depth per validator
const MAX_PARALLEL_PINGS: usize = 8; const MAX_PARALLEL_PINGS: usize = 8;
@ -17,7 +17,7 @@ type PingValidatorFuture = SendPinBoxFuture<Result<(), RPCError>>;
impl RoutingTable { impl RoutingTable {
// Task routine for PublicInternet status pings // Task routine for PublicInternet status pings
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(crate) async fn ping_validator_public_internet_task_routine( pub async fn ping_validator_public_internet_task_routine(
self, self,
stop_token: StopToken, stop_token: StopToken,
_last_ts: Timestamp, _last_ts: Timestamp,
@ -36,7 +36,7 @@ impl RoutingTable {
// Task routine for LocalNetwork status pings // Task routine for LocalNetwork status pings
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(crate) async fn ping_validator_local_network_task_routine( pub async fn ping_validator_local_network_task_routine(
self, self,
stop_token: StopToken, stop_token: StopToken,
_last_ts: Timestamp, _last_ts: Timestamp,
@ -55,7 +55,7 @@ impl RoutingTable {
// Task routine for PublicInternet relay keepalive pings // Task routine for PublicInternet relay keepalive pings
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(crate) async fn ping_validator_public_internet_relay_task_routine( pub async fn ping_validator_public_internet_relay_task_routine(
self, self,
stop_token: StopToken, stop_token: StopToken,
_last_ts: Timestamp, _last_ts: Timestamp,
@ -74,7 +74,7 @@ impl RoutingTable {
// Task routine for active watch keepalive pings // Task routine for active watch keepalive pings
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(crate) async fn ping_validator_active_watch_task_routine( pub async fn ping_validator_active_watch_task_routine(
self, self,
stop_token: StopToken, stop_token: StopToken,
_last_ts: Timestamp, _last_ts: Timestamp,

View File

@ -168,7 +168,7 @@ impl RoutingTable {
/// Keep private routes assigned and accessible /// Keep private routes assigned and accessible
#[instrument(level = "trace", skip(self, stop_token), err)] #[instrument(level = "trace", skip(self, stop_token), err)]
pub(crate) async fn private_route_management_task_routine( pub async fn private_route_management_task_routine(
self, self,
stop_token: StopToken, stop_token: StopToken,
_last_ts: Timestamp, _last_ts: Timestamp,

View File

@ -53,7 +53,7 @@ impl RoutingTable {
// Keep relays assigned and accessible // Keep relays assigned and accessible
#[instrument(level = "trace", skip_all, err)] #[instrument(level = "trace", skip_all, err)]
pub(crate) async fn relay_management_task_routine( pub async fn relay_management_task_routine(
self, self,
_stop_token: StopToken, _stop_token: StopToken,
_last_ts: Timestamp, _last_ts: Timestamp,

View File

@ -3,7 +3,7 @@ use super::*;
impl RoutingTable { impl RoutingTable {
// Compute transfer statistics to determine how 'fast' a node is // Compute transfer statistics to determine how 'fast' a node is
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(crate) async fn rolling_transfers_task_routine( pub async fn rolling_transfers_task_routine(
self, self,
_stop_token: StopToken, _stop_token: StopToken,
last_ts: Timestamp, last_ts: Timestamp,
@ -35,7 +35,7 @@ impl RoutingTable {
// Update state statistics in PeerStats // Update state statistics in PeerStats
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(crate) async fn update_state_stats_task_routine( pub async fn update_state_stats_task_routine(
self, self,
_stop_token: StopToken, _stop_token: StopToken,
_last_ts: Timestamp, _last_ts: Timestamp,
@ -56,7 +56,7 @@ impl RoutingTable {
// Update rolling answers in PeerStats // Update rolling answers in PeerStats
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub(crate) async fn rolling_answers_task_routine( pub async fn rolling_answers_task_routine(
self, self,
_stop_token: StopToken, _stop_token: StopToken,
_last_ts: Timestamp, _last_ts: Timestamp,

View File

@ -3,13 +3,23 @@ use super::*;
pub mod test_serialize_routing_table; pub mod test_serialize_routing_table;
pub(crate) fn mock_routing_table() -> routing_table::RoutingTable { pub(crate) fn mock_routing_table() -> routing_table::RoutingTable {
let event_bus = EventBus::new();
let veilid_config = VeilidConfig::new(); let veilid_config = VeilidConfig::new();
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
let block_store = BlockStore::new(veilid_config.clone()); let block_store = BlockStore::new(event_bus.clone(), veilid_config.clone());
let protected_store = ProtectedStore::new(veilid_config.clone()); let protected_store = ProtectedStore::new(event_bus.clone(), veilid_config.clone());
let table_store = TableStore::new(veilid_config.clone(), protected_store.clone()); let table_store = TableStore::new(
let crypto = Crypto::new(veilid_config.clone(), table_store.clone()); event_bus.clone(),
veilid_config.clone(),
protected_store.clone(),
);
let crypto = Crypto::new(
event_bus.clone(),
veilid_config.clone(),
table_store.clone(),
);
let storage_manager = storage_manager::StorageManager::new( let storage_manager = storage_manager::StorageManager::new(
event_bus.clone(),
veilid_config.clone(), veilid_config.clone(),
crypto.clone(), crypto.clone(),
table_store.clone(), table_store.clone(),
@ -17,6 +27,7 @@ pub(crate) fn mock_routing_table() -> routing_table::RoutingTable {
block_store.clone(), block_store.clone(),
); );
let network_manager = network_manager::NetworkManager::new( let network_manager = network_manager::NetworkManager::new(
event_bus.clone(),
veilid_config.clone(), veilid_config.clone(),
storage_manager, storage_manager,
table_store.clone(), table_store.clone(),

View File

@ -2,7 +2,7 @@ use super::*;
/// Mechanism required to contact another node /// Mechanism required to contact another node
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub(crate) enum ContactMethod { pub enum ContactMethod {
/// Node is not reachable by any means /// Node is not reachable by any means
Unreachable, Unreachable,
/// Connection should have already existed /// Connection should have already existed

View File

@ -11,7 +11,7 @@ mod signed_relayed_node_info;
use super::*; use super::*;
pub(crate) use contact_method::*; pub use contact_method::*;
pub use dial_info_detail::*; pub use dial_info_detail::*;
pub use direction::*; pub use direction::*;
pub use node_info::*; pub use node_info::*;

View File

@ -122,10 +122,10 @@ impl SignedNodeInfo {
match self { match self {
SignedNodeInfo::Direct(d) => match other { SignedNodeInfo::Direct(d) => match other {
SignedNodeInfo::Direct(pd) => d.equivalent(pd), SignedNodeInfo::Direct(pd) => d.equivalent(pd),
SignedNodeInfo::Relayed(_) => true, SignedNodeInfo::Relayed(_) => false,
}, },
SignedNodeInfo::Relayed(r) => match other { SignedNodeInfo::Relayed(r) => match other {
SignedNodeInfo::Direct(_) => true, SignedNodeInfo::Direct(_) => false,
SignedNodeInfo::Relayed(pr) => r.equivalent(pr), SignedNodeInfo::Relayed(pr) => r.equivalent(pr),
}, },
} }

View File

@ -0,0 +1,24 @@
use super::*;
#[derive(Clone, Debug, Default)]
pub struct Answer<T> {
/// Hpw long it took to get this answer
pub _latency: TimestampDuration,
/// The private route requested to receive the reply
pub reply_private_route: Option<PublicKey>,
/// The answer itself
pub answer: T,
}
impl<T> Answer<T> {
pub fn new(
latency: TimestampDuration,
reply_private_route: Option<PublicKey>,
answer: T,
) -> Self {
Self {
_latency: latency,
reply_private_route,
answer,
}
}
}

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
use core::convert::TryInto; use core::convert::TryInto;
pub(crate) fn encode_address( pub fn encode_address(
address: &Address, address: &Address,
builder: &mut veilid_capnp::address::Builder, builder: &mut veilid_capnp::address::Builder,
) -> Result<(), RPCError> { ) -> Result<(), RPCError> {
@ -37,7 +37,7 @@ pub(crate) fn encode_address(
Ok(()) Ok(())
} }
pub(crate) fn decode_address(reader: &veilid_capnp::address::Reader) -> Result<Address, RPCError> { pub fn decode_address(reader: &veilid_capnp::address::Reader) -> Result<Address, RPCError> {
match reader.reborrow().which() { match reader.reborrow().which() {
Ok(veilid_capnp::address::Which::Ipv4(Ok(v4))) => { Ok(veilid_capnp::address::Which::Ipv4(Ok(v4))) => {
let v4b = v4.get_addr().to_be_bytes(); let v4b = v4.get_addr().to_be_bytes();

View File

@ -1,6 +1,6 @@
use super::*; use super::*;
pub(crate) fn encode_address_type_set( pub fn encode_address_type_set(
address_type_set: &AddressTypeSet, address_type_set: &AddressTypeSet,
builder: &mut veilid_capnp::address_type_set::Builder, builder: &mut veilid_capnp::address_type_set::Builder,
) -> Result<(), RPCError> { ) -> Result<(), RPCError> {
@ -10,7 +10,7 @@ pub(crate) fn encode_address_type_set(
Ok(()) Ok(())
} }
pub(crate) fn decode_address_type_set( pub fn decode_address_type_set(
reader: &veilid_capnp::address_type_set::Reader, reader: &veilid_capnp::address_type_set::Reader,
) -> Result<AddressTypeSet, RPCError> { ) -> Result<AddressTypeSet, RPCError> {
let mut out = AddressTypeSet::new(); let mut out = AddressTypeSet::new();

View File

@ -1,9 +1,7 @@
use super::*; use super::*;
use core::convert::TryInto; use core::convert::TryInto;
pub(crate) fn decode_dial_info( pub fn decode_dial_info(reader: &veilid_capnp::dial_info::Reader) -> Result<DialInfo, RPCError> {
reader: &veilid_capnp::dial_info::Reader,
) -> Result<DialInfo, RPCError> {
match reader match reader
.reborrow() .reborrow()
.which() .which()
@ -62,7 +60,7 @@ pub(crate) fn decode_dial_info(
} }
} }
pub(crate) fn encode_dial_info( pub fn encode_dial_info(
dial_info: &DialInfo, dial_info: &DialInfo,
builder: &mut veilid_capnp::dial_info::Builder, builder: &mut veilid_capnp::dial_info::Builder,
) -> Result<(), RPCError> { ) -> Result<(), RPCError> {

View File

@ -1,8 +1,6 @@
use super::*; use super::*;
pub(crate) fn encode_dial_info_class( pub fn encode_dial_info_class(dial_info_class: DialInfoClass) -> veilid_capnp::DialInfoClass {
dial_info_class: DialInfoClass,
) -> veilid_capnp::DialInfoClass {
match dial_info_class { match dial_info_class {
DialInfoClass::Direct => veilid_capnp::DialInfoClass::Direct, DialInfoClass::Direct => veilid_capnp::DialInfoClass::Direct,
DialInfoClass::Mapped => veilid_capnp::DialInfoClass::Mapped, DialInfoClass::Mapped => veilid_capnp::DialInfoClass::Mapped,
@ -13,9 +11,7 @@ pub(crate) fn encode_dial_info_class(
} }
} }
pub(crate) fn decode_dial_info_class( pub fn decode_dial_info_class(dial_info_class: veilid_capnp::DialInfoClass) -> DialInfoClass {
dial_info_class: veilid_capnp::DialInfoClass,
) -> DialInfoClass {
match dial_info_class { match dial_info_class {
veilid_capnp::DialInfoClass::Direct => DialInfoClass::Direct, veilid_capnp::DialInfoClass::Direct => DialInfoClass::Direct,
veilid_capnp::DialInfoClass::Mapped => DialInfoClass::Mapped, veilid_capnp::DialInfoClass::Mapped => DialInfoClass::Mapped,

View File

@ -1,6 +1,6 @@
use super::*; use super::*;
pub(crate) fn encode_dial_info_detail( pub fn encode_dial_info_detail(
dial_info_detail: &DialInfoDetail, dial_info_detail: &DialInfoDetail,
builder: &mut veilid_capnp::dial_info_detail::Builder, builder: &mut veilid_capnp::dial_info_detail::Builder,
) -> Result<(), RPCError> { ) -> Result<(), RPCError> {
@ -11,7 +11,7 @@ pub(crate) fn encode_dial_info_detail(
Ok(()) Ok(())
} }
pub(crate) fn decode_dial_info_detail( pub fn decode_dial_info_detail(
reader: &veilid_capnp::dial_info_detail::Reader, reader: &veilid_capnp::dial_info_detail::Reader,
) -> Result<DialInfoDetail, RPCError> { ) -> Result<DialInfoDetail, RPCError> {
let dial_info = decode_dial_info( let dial_info = decode_dial_info(

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
use core::convert::TryInto; use core::convert::TryInto;
pub(crate) fn decode_key256(public_key: &veilid_capnp::key256::Reader) -> PublicKey { pub fn decode_key256(public_key: &veilid_capnp::key256::Reader) -> PublicKey {
let u0 = public_key.get_u0().to_be_bytes(); let u0 = public_key.get_u0().to_be_bytes();
let u1 = public_key.get_u1().to_be_bytes(); let u1 = public_key.get_u1().to_be_bytes();
let u2 = public_key.get_u2().to_be_bytes(); let u2 = public_key.get_u2().to_be_bytes();
@ -16,7 +16,7 @@ pub(crate) fn decode_key256(public_key: &veilid_capnp::key256::Reader) -> Public
PublicKey::new(x) PublicKey::new(x)
} }
pub(crate) fn encode_key256(key: &PublicKey, builder: &mut veilid_capnp::key256::Builder) { pub fn encode_key256(key: &PublicKey, builder: &mut veilid_capnp::key256::Builder) {
builder.set_u0(u64::from_be_bytes( builder.set_u0(u64::from_be_bytes(
key.bytes[0..8] key.bytes[0..8]
.try_into() .try_into()

View File

@ -27,23 +27,21 @@ mod tunnel;
mod typed_key; mod typed_key;
mod typed_signature; mod typed_signature;
pub(crate) use operations::MAX_INSPECT_VALUE_A_SEQS_LEN; pub use address::*;
pub(in crate::rpc_processor) use operations::*; pub use address_type_set::*;
pub use dial_info::*;
pub(crate) use address::*; pub use dial_info_class::*;
pub(crate) use address_type_set::*; pub use dial_info_detail::*;
pub(crate) use dial_info::*; pub use key256::*;
pub(crate) use dial_info_class::*; pub use network_class::*;
pub(crate) use dial_info_detail::*; pub use node_info::*;
pub(crate) use key256::*; pub use node_status::*;
pub(crate) use network_class::*; pub use nonce::*;
pub(crate) use node_info::*; pub use operations::*;
pub(crate) use node_status::*; pub use peer_info::*;
pub(crate) use nonce::*; pub use private_safety_route::*;
pub(crate) use peer_info::*; pub use protocol_type_set::*;
pub(crate) use private_safety_route::*; pub use sender_info::*;
pub(crate) use protocol_type_set::*;
pub(crate) use sender_info::*;
pub use sequencing::*; pub use sequencing::*;
pub use signal_info::*; pub use signal_info::*;
pub use signature512::*; pub use signature512::*;
@ -62,20 +60,30 @@ use super::*;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
#[allow(clippy::enum_variant_names)] #[allow(clippy::enum_variant_names)]
pub(in crate::rpc_processor) enum QuestionContext { pub enum QuestionContext {
GetValue(ValidateGetValueContext), GetValue(ValidateGetValueContext),
SetValue(ValidateSetValueContext), SetValue(ValidateSetValueContext),
InspectValue(ValidateInspectValueContext), InspectValue(ValidateInspectValueContext),
} }
#[derive(Clone)] #[derive(Clone)]
pub(in crate::rpc_processor) struct RPCValidateContext { pub struct RPCValidateContext {
pub crypto: Crypto, pub crypto: Crypto,
// pub rpc_processor: RPCProcessor, // pub rpc_processor: RPCProcessor,
pub question_context: Option<QuestionContext>, pub question_context: Option<QuestionContext>,
} }
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct RPCDecodeContext { pub struct RPCDecodeContext {
pub routing_domain: RoutingDomain, pub routing_domain: RoutingDomain,
} }
#[instrument(level = "trace", target = "rpc", skip_all, err)]
pub fn builder_to_vec<'a, T>(builder: capnp::message::Builder<T>) -> Result<Vec<u8>, RPCError>
where
T: capnp::message::Allocator + 'a,
{
let mut buffer = vec![];
capnp::serialize_packed::write_message(&mut buffer, &builder).map_err(RPCError::protocol)?;
Ok(buffer)
}

View File

@ -30,6 +30,8 @@ mod operation_complete_tunnel;
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
mod operation_start_tunnel; mod operation_start_tunnel;
pub use operation_inspect_value::MAX_INSPECT_VALUE_A_SEQS_LEN;
pub(in crate::rpc_processor) use answer::*; pub(in crate::rpc_processor) use answer::*;
pub(in crate::rpc_processor) use operation::*; pub(in crate::rpc_processor) use operation::*;
pub(in crate::rpc_processor) use operation_app_call::*; pub(in crate::rpc_processor) use operation_app_call::*;
@ -62,5 +64,3 @@ pub(in crate::rpc_processor) use operation_complete_tunnel::*;
pub(in crate::rpc_processor) use operation_start_tunnel::*; pub(in crate::rpc_processor) use operation_start_tunnel::*;
use super::*; use super::*;
pub(crate) use operation_inspect_value::MAX_INSPECT_VALUE_A_SEQS_LEN;

View File

@ -43,7 +43,7 @@ impl RPCOperationCancelTunnelQ {
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(in crate::rpc_processor) enum RPCOperationCancelTunnelA { pub enum RPCOperationCancelTunnelA {
Tunnel(TunnelId), Tunnel(TunnelId),
Error(TunnelError), Error(TunnelError),
} }

View File

@ -80,7 +80,7 @@ impl RPCOperationCompleteTunnelQ {
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(in crate::rpc_processor) enum RPCOperationCompleteTunnelA { pub enum RPCOperationCompleteTunnelA {
Tunnel(FullTunnel), Tunnel(FullTunnel),
Error(TunnelError), Error(TunnelError),
} }

View File

@ -2,7 +2,7 @@ use super::*;
use crate::storage_manager::SignedValueDescriptor; use crate::storage_manager::SignedValueDescriptor;
const MAX_INSPECT_VALUE_Q_SUBKEY_RANGES_LEN: usize = 512; const MAX_INSPECT_VALUE_Q_SUBKEY_RANGES_LEN: usize = 512;
pub(crate) const MAX_INSPECT_VALUE_A_SEQS_LEN: usize = 512; pub const MAX_INSPECT_VALUE_A_SEQS_LEN: usize = 512;
const MAX_INSPECT_VALUE_A_PEERS_LEN: usize = 20; const MAX_INSPECT_VALUE_A_PEERS_LEN: usize = 20;
#[derive(Clone)] #[derive(Clone)]

View File

@ -2,7 +2,7 @@ use super::*;
//////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////
pub(crate) fn encode_route_hop_data( pub fn encode_route_hop_data(
route_hop_data: &RouteHopData, route_hop_data: &RouteHopData,
builder: &mut veilid_capnp::route_hop_data::Builder, builder: &mut veilid_capnp::route_hop_data::Builder,
) -> Result<(), RPCError> { ) -> Result<(), RPCError> {
@ -24,7 +24,7 @@ pub(crate) fn encode_route_hop_data(
Ok(()) Ok(())
} }
pub(crate) fn decode_route_hop_data( pub fn decode_route_hop_data(
reader: &veilid_capnp::route_hop_data::Reader, reader: &veilid_capnp::route_hop_data::Reader,
) -> Result<RouteHopData, RPCError> { ) -> Result<RouteHopData, RPCError> {
let nonce = decode_nonce( let nonce = decode_nonce(
@ -45,7 +45,7 @@ pub(crate) fn decode_route_hop_data(
//////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////
pub(crate) fn encode_route_hop( pub fn encode_route_hop(
route_hop: &RouteHop, route_hop: &RouteHop,
builder: &mut veilid_capnp::route_hop::Builder, builder: &mut veilid_capnp::route_hop::Builder,
) -> Result<(), RPCError> { ) -> Result<(), RPCError> {
@ -67,7 +67,7 @@ pub(crate) fn encode_route_hop(
Ok(()) Ok(())
} }
pub(crate) fn decode_route_hop( pub fn decode_route_hop(
decode_context: &RPCDecodeContext, decode_context: &RPCDecodeContext,
reader: &veilid_capnp::route_hop::Reader, reader: &veilid_capnp::route_hop::Reader,
) -> Result<RouteHop, RPCError> { ) -> Result<RouteHop, RPCError> {
@ -100,7 +100,7 @@ pub(crate) fn decode_route_hop(
//////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////
pub(crate) fn encode_private_route( pub fn encode_private_route(
private_route: &PrivateRoute, private_route: &PrivateRoute,
builder: &mut veilid_capnp::private_route::Builder, builder: &mut veilid_capnp::private_route::Builder,
) -> Result<(), RPCError> { ) -> Result<(), RPCError> {
@ -126,7 +126,7 @@ pub(crate) fn encode_private_route(
Ok(()) Ok(())
} }
pub(crate) fn decode_private_route( pub fn decode_private_route(
decode_context: &RPCDecodeContext, decode_context: &RPCDecodeContext,
reader: &veilid_capnp::private_route::Reader, reader: &veilid_capnp::private_route::Reader,
) -> Result<PrivateRoute, RPCError> { ) -> Result<PrivateRoute, RPCError> {
@ -156,7 +156,7 @@ pub(crate) fn decode_private_route(
//////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////
pub(crate) fn encode_safety_route( pub fn encode_safety_route(
safety_route: &SafetyRoute, safety_route: &SafetyRoute,
builder: &mut veilid_capnp::safety_route::Builder, builder: &mut veilid_capnp::safety_route::Builder,
) -> Result<(), RPCError> { ) -> Result<(), RPCError> {
@ -180,7 +180,7 @@ pub(crate) fn encode_safety_route(
Ok(()) Ok(())
} }
pub(crate) fn decode_safety_route( pub fn decode_safety_route(
decode_context: &RPCDecodeContext, decode_context: &RPCDecodeContext,
reader: &veilid_capnp::safety_route::Reader, reader: &veilid_capnp::safety_route::Reader,
) -> Result<SafetyRoute, RPCError> { ) -> Result<SafetyRoute, RPCError> {

View File

@ -30,7 +30,7 @@ pub(crate) enum Destination {
/// Routing configuration for destination /// Routing configuration for destination
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct UnsafeRoutingInfo { pub(crate) struct UnsafeRoutingInfo {
pub opt_node: Option<NodeRef>, pub opt_node: Option<NodeRef>,
pub opt_relay: Option<NodeRef>, pub opt_relay: Option<NodeRef>,
pub opt_routing_domain: Option<RoutingDomain>, pub opt_routing_domain: Option<RoutingDomain>,
@ -450,7 +450,7 @@ impl RPCProcessor {
/// Convert the 'RespondTo' into a 'Destination' for a response /// Convert the 'RespondTo' into a 'Destination' for a response
pub(super) fn get_respond_to_destination( pub(super) fn get_respond_to_destination(
&self, &self,
request: &RPCMessage, request: &Message,
) -> NetworkResult<Destination> { ) -> NetworkResult<Destination> {
// Get the question 'respond to' // Get the question 'respond to'
let respond_to = match request.operation.kind() { let respond_to = match request.operation.kind() {
@ -487,7 +487,7 @@ impl RPCProcessor {
NetworkResult::value(Destination::direct(peer_noderef)) NetworkResult::value(Destination::direct(peer_noderef))
} else { } else {
// Look up the sender node, we should have added it via senderNodeInfo before getting here. // Look up the sender node, we should have added it via senderNodeInfo before getting here.
let res = match self.routing_table.lookup_node_ref(sender_node_id) { let res = match self.routing_table().lookup_node_ref(sender_node_id) {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
return NetworkResult::invalid_message(format!( return NetworkResult::invalid_message(format!(

View File

@ -2,7 +2,7 @@ use super::*;
#[derive(ThisError, Debug, Clone, PartialOrd, PartialEq, Eq, Ord)] #[derive(ThisError, Debug, Clone, PartialOrd, PartialEq, Eq, Ord)]
#[must_use] #[must_use]
pub enum RPCError { pub(crate) enum RPCError {
#[error("[RPCError: Unimplemented({0})]")] #[error("[RPCError: Unimplemented({0})]")]
Unimplemented(String), Unimplemented(String),
#[error("[RPCError: InvalidFormat({0})]")] #[error("[RPCError: InvalidFormat({0})]")]
@ -20,6 +20,7 @@ pub enum RPCError {
} }
impl RPCError { impl RPCError {
#[expect(dead_code)]
pub fn unimplemented<X: ToString>(x: X) -> Self { pub fn unimplemented<X: ToString>(x: X) -> Self {
Self::Unimplemented(x.to_string()) Self::Unimplemented(x.to_string())
} }
@ -47,9 +48,11 @@ impl RPCError {
pub fn network<X: ToString>(x: X) -> Self { pub fn network<X: ToString>(x: X) -> Self {
Self::Network(x.to_string()) Self::Network(x.to_string())
} }
#[expect(dead_code)]
pub fn map_network<M: ToString, X: ToString>(message: M) -> impl FnOnce(X) -> Self { pub fn map_network<M: ToString, X: ToString>(message: M) -> impl FnOnce(X) -> Self {
move |x| Self::Network(format!("{}: {}", message.to_string(), x.to_string())) move |x| Self::Network(format!("{}: {}", message.to_string(), x.to_string()))
} }
#[cfg_attr(target_arch = "wasm32", expect(dead_code))]
pub fn try_again<X: ToString>(x: X) -> Self { pub fn try_again<X: ToString>(x: X) -> Self {
Self::TryAgain(x.to_string()) Self::TryAgain(x.to_string())
} }
@ -59,6 +62,7 @@ impl RPCError {
pub fn ignore<X: ToString>(x: X) -> Self { pub fn ignore<X: ToString>(x: X) -> Self {
Self::Ignore(x.to_string()) Self::Ignore(x.to_string())
} }
#[expect(dead_code)]
pub fn map_ignore<M: ToString, X: ToString>(message: M) -> impl FnOnce(X) -> Self { pub fn map_ignore<M: ToString, X: ToString>(message: M) -> impl FnOnce(X) -> Self {
move |x| Self::Ignore(format!("{}: {}", message.to_string(), x.to_string())) move |x| Self::Ignore(format!("{}: {}", message.to_string(), x.to_string()))
} }
@ -81,7 +85,7 @@ impl From<RPCError> for VeilidAPIError {
} }
} }
pub(crate) type RPCNetworkResult<T> = Result<NetworkResult<T>, RPCError>; pub type RPCNetworkResult<T> = Result<NetworkResult<T>, RPCError>;
pub(crate) trait ToRPCNetworkResult<T> { pub(crate) trait ToRPCNetworkResult<T> {
fn to_rpc_network_result(self) -> RPCNetworkResult<T>; fn to_rpc_network_result(self) -> RPCNetworkResult<T>;

View File

@ -9,7 +9,7 @@ where
} }
#[derive(Debug, Copy, Clone)] #[derive(Debug, Copy, Clone)]
pub(crate) enum FanoutResultKind { pub enum FanoutResultKind {
Partial, Partial,
Timeout, Timeout,
Finished, Finished,
@ -22,12 +22,12 @@ impl FanoutResultKind {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct FanoutResult { pub struct FanoutResult {
pub kind: FanoutResultKind, pub kind: FanoutResultKind,
pub value_nodes: Vec<NodeRef>, pub value_nodes: Vec<NodeRef>,
} }
pub(crate) fn debug_fanout_result(result: &FanoutResult) -> String { pub fn debug_fanout_result(result: &FanoutResult) -> String {
let kc = match result.kind { let kc = match result.kind {
FanoutResultKind::Partial => "P", FanoutResultKind::Partial => "P",
FanoutResultKind::Timeout => "T", FanoutResultKind::Timeout => "T",
@ -37,7 +37,7 @@ pub(crate) fn debug_fanout_result(result: &FanoutResult) -> String {
format!("{}:{}", kc, result.value_nodes.len()) format!("{}:{}", kc, result.value_nodes.len())
} }
pub(crate) fn debug_fanout_results(results: &[FanoutResult]) -> String { pub fn debug_fanout_results(results: &[FanoutResult]) -> String {
let mut col = 0; let mut col = 0;
let mut out = String::new(); let mut out = String::new();
let mut left = results.len(); let mut left = results.len();
@ -59,18 +59,18 @@ pub(crate) fn debug_fanout_results(results: &[FanoutResult]) -> String {
} }
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct FanoutCallOutput { pub struct FanoutCallOutput {
pub peer_info_list: Vec<Arc<PeerInfo>>, pub peer_info_list: Vec<Arc<PeerInfo>>,
} }
pub(crate) type FanoutCallResult = RPCNetworkResult<FanoutCallOutput>; pub type FanoutCallResult = RPCNetworkResult<FanoutCallOutput>;
pub(crate) type FanoutNodeInfoFilter = Arc<dyn Fn(&[TypedKey], &NodeInfo) -> bool + Send + Sync>; pub type FanoutNodeInfoFilter = Arc<dyn Fn(&[TypedKey], &NodeInfo) -> bool + Send + Sync>;
pub(crate) fn empty_fanout_node_info_filter() -> FanoutNodeInfoFilter { pub fn empty_fanout_node_info_filter() -> FanoutNodeInfoFilter {
Arc::new(|_, _| true) Arc::new(|_, _| true)
} }
pub(crate) fn capability_fanout_node_info_filter(caps: Vec<Capability>) -> FanoutNodeInfoFilter { pub fn capability_fanout_node_info_filter(caps: Vec<Capability>) -> FanoutNodeInfoFilter {
Arc::new(move |_, ni| ni.has_all_capabilities(&caps)) Arc::new(move |_, ni| ni.has_all_capabilities(&caps))
} }

View File

@ -5,7 +5,7 @@
use super::*; use super::*;
#[derive(Debug)] #[derive(Debug)]
pub(in crate::rpc_processor) struct FanoutQueue { pub struct FanoutQueue {
crypto_kind: CryptoKind, crypto_kind: CryptoKind,
current_nodes: VecDeque<NodeRef>, current_nodes: VecDeque<NodeRef>,
returned_nodes: HashSet<TypedKey>, returned_nodes: HashSet<TypedKey>,

View File

@ -0,0 +1,8 @@
mod fanout_call;
mod fanout_queue;
pub(crate) use fanout_call::*;
use super::*;
use fanout_queue::*;

View File

@ -0,0 +1,35 @@
use super::*;
#[derive(Debug)]
pub(in crate::rpc_processor) struct MessageData {
pub contents: Vec<u8>, // rpc messages must be a canonicalized single segment
}
impl MessageData {
pub fn new(contents: Vec<u8>) -> Self {
Self { contents }
}
pub fn get_reader(
&self,
) -> Result<capnp::message::Reader<capnp::serialize::OwnedSegments>, RPCError> {
capnp::serialize_packed::read_message(
self.contents.as_slice(),
capnp::message::ReaderOptions::new(),
)
.map_err(RPCError::protocol)
}
}
#[derive(Debug)]
pub(in crate::rpc_processor) struct MessageEncoded {
pub header: MessageHeader,
pub data: MessageData,
}
#[derive(Debug)]
pub(in crate::rpc_processor) struct Message {
pub header: MessageHeader,
pub operation: RPCOperation,
pub opt_sender_nr: Option<NodeRef>,
}

View File

@ -0,0 +1,89 @@
use super::*;
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct RPCMessageHeaderDetailDirect {
/// The decoded header of the envelope
pub envelope: Envelope,
/// The noderef of the peer that sent the message (not the original sender).
/// Ensures node doesn't get evicted from routing table until we're done with it
/// Should be filtered to the routing domain of the peer that we received from
pub peer_noderef: FilteredNodeRef,
/// The flow from the peer sent the message (not the original sender)
pub flow: Flow,
/// The routing domain of the peer that we received from
pub routing_domain: RoutingDomain,
}
/// Header details for rpc messages received over only a safety route but not a private route
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct RPCMessageHeaderDetailSafetyRouted {
/// Direct header
pub direct: RPCMessageHeaderDetailDirect,
/// Remote safety route used
pub remote_safety_route: PublicKey,
/// The sequencing used for this route
pub sequencing: Sequencing,
}
/// Header details for rpc messages received over a private route
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct RPCMessageHeaderDetailPrivateRouted {
/// Direct header
pub direct: RPCMessageHeaderDetailDirect,
/// Remote safety route used (or possibly node id the case of no safety route)
pub remote_safety_route: PublicKey,
/// The private route we received the rpc over
pub private_route: PublicKey,
// The safety spec for replying to this private routed rpc
pub safety_spec: SafetySpec,
}
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) enum RPCMessageHeaderDetail {
Direct(RPCMessageHeaderDetailDirect),
SafetyRouted(RPCMessageHeaderDetailSafetyRouted),
PrivateRouted(RPCMessageHeaderDetailPrivateRouted),
}
/// The decoded header of an RPC message
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct MessageHeader {
/// Time the message was received, not sent
pub timestamp: Timestamp,
/// The length in bytes of the rpc message body
pub body_len: ByteCount,
/// The header detail depending on which way the message was received
pub detail: RPCMessageHeaderDetail,
}
impl MessageHeader {
/// The crypto kind used on the RPC
pub fn crypto_kind(&self) -> CryptoKind {
match &self.detail {
RPCMessageHeaderDetail::Direct(d) => d.envelope.get_crypto_kind(),
RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.envelope.get_crypto_kind(),
RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.envelope.get_crypto_kind(),
}
}
// pub fn direct_peer_noderef(&self) -> NodeRef {
// match &self.detail {
// RPCMessageHeaderDetail::Direct(d) => d.peer_noderef.clone(),
// RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.peer_noderef.clone(),
// RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.peer_noderef.clone(),
// }
// }
pub fn routing_domain(&self) -> RoutingDomain {
match &self.detail {
RPCMessageHeaderDetail::Direct(d) => d.routing_domain,
RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.routing_domain,
RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.routing_domain,
}
}
pub fn direct_sender_node_id(&self) -> TypedKey {
match &self.detail {
RPCMessageHeaderDetail::Direct(d) => d.envelope.get_sender_typed_id(),
RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.envelope.get_sender_typed_id(),
RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.envelope.get_sender_typed_id(),
}
}
}

View File

@ -1,11 +1,14 @@
mod answer;
mod coders; mod coders;
mod destination; mod destination;
mod fanout_call; mod error;
mod fanout_queue; mod fanout;
mod message;
mod message_header;
mod operation_waiter; mod operation_waiter;
mod rendered_operation;
mod rpc_app_call; mod rpc_app_call;
mod rpc_app_message; mod rpc_app_message;
mod rpc_error;
mod rpc_find_node; mod rpc_find_node;
mod rpc_get_value; mod rpc_get_value;
mod rpc_inspect_value; mod rpc_inspect_value;
@ -17,6 +20,8 @@ mod rpc_status;
mod rpc_validate_dial_info; mod rpc_validate_dial_info;
mod rpc_value_changed; mod rpc_value_changed;
mod rpc_watch_value; mod rpc_watch_value;
mod sender_info;
mod sender_peer_info;
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
mod rpc_find_block; mod rpc_find_block;
@ -30,160 +35,38 @@ mod rpc_complete_tunnel;
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
mod rpc_start_tunnel; mod rpc_start_tunnel;
pub(crate) use coders::*; pub(crate) use answer::*;
pub(crate) use coders::{
builder_to_vec, decode_private_route, encode_node_info, encode_private_route, encode_route_hop,
encode_signed_direct_node_info, encode_typed_key, RPCDecodeContext,
MAX_INSPECT_VALUE_A_SEQS_LEN,
};
pub(crate) use destination::*; pub(crate) use destination::*;
pub(crate) use fanout_call::*; pub(crate) use error::*;
pub(crate) use operation_waiter::*; pub(crate) use fanout::*;
pub(crate) use rpc_error::*; pub(crate) use sender_info::*;
pub(crate) use rpc_status::*;
use super::*; use super::*;
use futures_util::StreamExt;
use stop_token::future::FutureExt as _;
use coders::*;
use message::*;
use message_header::*;
use operation_waiter::*;
use rendered_operation::*;
use sender_peer_info::*;
use crypto::*; use crypto::*;
use fanout_queue::*;
use futures_util::StreamExt;
use network_manager::*; use network_manager::*;
use routing_table::*; use routing_table::*;
use stop_token::future::FutureExt;
use storage_manager::*; use storage_manager::*;
///////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone)]
struct RPCMessageHeaderDetailDirect {
/// The decoded header of the envelope
envelope: Envelope,
/// The noderef of the peer that sent the message (not the original sender).
/// Ensures node doesn't get evicted from routing table until we're done with it
/// Should be filtered to the routing domain of the peer that we received from
peer_noderef: FilteredNodeRef,
/// The flow from the peer sent the message (not the original sender)
flow: Flow,
/// The routing domain of the peer that we received from
routing_domain: RoutingDomain,
}
/// Header details for rpc messages received over only a safety route but not a private route
#[derive(Debug, Clone)]
struct RPCMessageHeaderDetailSafetyRouted {
/// Direct header
direct: RPCMessageHeaderDetailDirect,
/// Remote safety route used
remote_safety_route: PublicKey,
/// The sequencing used for this route
sequencing: Sequencing,
}
/// Header details for rpc messages received over a private route
#[derive(Debug, Clone)]
struct RPCMessageHeaderDetailPrivateRouted {
/// Direct header
direct: RPCMessageHeaderDetailDirect,
/// Remote safety route used (or possibly node id the case of no safety route)
remote_safety_route: PublicKey,
/// The private route we received the rpc over
private_route: PublicKey,
// The safety spec for replying to this private routed rpc
safety_spec: SafetySpec,
}
#[derive(Debug, Clone)]
enum RPCMessageHeaderDetail {
Direct(RPCMessageHeaderDetailDirect),
SafetyRouted(RPCMessageHeaderDetailSafetyRouted),
PrivateRouted(RPCMessageHeaderDetailPrivateRouted),
}
/// The decoded header of an RPC message
#[derive(Debug, Clone)]
struct RPCMessageHeader {
/// Time the message was received, not sent
timestamp: Timestamp,
/// The length in bytes of the rpc message body
body_len: ByteCount,
/// The header detail depending on which way the message was received
detail: RPCMessageHeaderDetail,
}
impl RPCMessageHeader {
/// The crypto kind used on the RPC
pub fn crypto_kind(&self) -> CryptoKind {
match &self.detail {
RPCMessageHeaderDetail::Direct(d) => d.envelope.get_crypto_kind(),
RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.envelope.get_crypto_kind(),
RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.envelope.get_crypto_kind(),
}
}
// pub fn direct_peer_noderef(&self) -> NodeRef {
// match &self.detail {
// RPCMessageHeaderDetail::Direct(d) => d.peer_noderef.clone(),
// RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.peer_noderef.clone(),
// RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.peer_noderef.clone(),
// }
// }
pub fn routing_domain(&self) -> RoutingDomain {
match &self.detail {
RPCMessageHeaderDetail::Direct(d) => d.routing_domain,
RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.routing_domain,
RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.routing_domain,
}
}
pub fn direct_sender_node_id(&self) -> TypedKey {
match &self.detail {
RPCMessageHeaderDetail::Direct(d) => d.envelope.get_sender_typed_id(),
RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.envelope.get_sender_typed_id(),
RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.envelope.get_sender_typed_id(),
}
}
}
#[derive(Debug)]
pub struct RPCMessageData {
contents: Vec<u8>, // rpc messages must be a canonicalized single segment
}
impl RPCMessageData {
pub fn new(contents: Vec<u8>) -> Self {
Self { contents }
}
pub fn get_reader(
&self,
) -> Result<capnp::message::Reader<capnp::serialize::OwnedSegments>, RPCError> {
capnp::serialize_packed::read_message(
self.contents.as_slice(),
capnp::message::ReaderOptions::new(),
)
.map_err(RPCError::protocol)
}
}
#[derive(Debug)]
struct RPCMessageEncoded {
header: RPCMessageHeader,
data: RPCMessageData,
}
#[derive(Debug)]
pub(crate) struct RPCMessage {
header: RPCMessageHeader,
operation: RPCOperation,
opt_sender_nr: Option<NodeRef>,
}
#[instrument(level = "trace", target = "rpc", skip_all, err)]
pub fn builder_to_vec<'a, T>(builder: capnp::message::Builder<T>) -> Result<Vec<u8>, RPCError>
where
T: capnp::message::Allocator + 'a,
{
let mut buffer = vec![];
capnp::serialize_packed::write_message(&mut buffer, &builder).map_err(RPCError::protocol)?;
Ok(buffer)
}
#[derive(Debug)] #[derive(Debug)]
struct WaitableReply { struct WaitableReply {
handle: OperationWaitHandle<RPCMessage, Option<QuestionContext>>, handle: OperationWaitHandle<Message, Option<QuestionContext>>,
timeout_us: TimestampDuration, timeout_us: TimestampDuration,
node_ref: NodeRef, node_ref: NodeRef,
send_ts: Timestamp, send_ts: Timestamp,
@ -196,84 +79,6 @@ struct WaitableReply {
///////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////
#[derive(Clone, Debug, Default)]
pub struct Answer<T> {
/// Hpw long it took to get this answer
pub _latency: TimestampDuration,
/// The private route requested to receive the reply
pub reply_private_route: Option<PublicKey>,
/// The answer itself
pub answer: T,
}
impl<T> Answer<T> {
pub fn new(
latency: TimestampDuration,
reply_private_route: Option<PublicKey>,
answer: T,
) -> Self {
Self {
_latency: latency,
reply_private_route,
answer,
}
}
}
/// An operation that has been fully prepared for envelope
struct RenderedOperation {
/// The rendered operation bytes
message: Vec<u8>,
/// Destination node we're sending to
destination_node_ref: NodeRef,
/// Node to send envelope to (may not be destination node in case of relay)
node_ref: FilteredNodeRef,
/// Total safety + private route hop count + 1 hop for the initial send
hop_count: usize,
/// The safety route used to send the message
safety_route: Option<PublicKey>,
/// The private route used to send the message
remote_private_route: Option<PublicKey>,
/// The private route requested to receive the reply
reply_private_route: Option<PublicKey>,
}
impl fmt::Debug for RenderedOperation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RenderedOperation")
.field("message(len)", &self.message.len())
.field("destination_node_ref", &self.destination_node_ref)
.field("node_ref", &self.node_ref)
.field("hop_count", &self.hop_count)
.field("safety_route", &self.safety_route)
.field("remote_private_route", &self.remote_private_route)
.field("reply_private_route", &self.reply_private_route)
.finish()
}
}
/// Node information exchanged during every RPC message
#[derive(Default, Debug, Clone)]
pub struct SenderPeerInfo {
/// The current peer info of the sender if required
opt_peer_info: Option<Arc<PeerInfo>>,
/// The last timestamp of the target's node info to assist remote node with sending its latest node info
target_node_info_ts: Timestamp,
}
impl SenderPeerInfo {
pub fn new_no_peer_info(target_node_info_ts: Timestamp) -> Self {
Self {
opt_peer_info: None,
target_node_info_ts,
}
}
pub fn new(peer_info: Arc<PeerInfo>, target_node_info_ts: Timestamp) -> Self {
Self {
opt_peer_info: Some(peer_info),
target_node_info_ts,
}
}
}
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
enum RPCKind { enum RPCKind {
Question, Question,
@ -284,31 +89,25 @@ enum RPCKind {
///////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////
struct RPCProcessorInner { struct RPCProcessorInner {
send_channel: Option<flume::Sender<(Span, RPCMessageEncoded)>>, send_channel: Option<flume::Sender<(Span, MessageEncoded)>>,
stop_source: Option<StopSource>, stop_source: Option<StopSource>,
worker_join_handles: Vec<MustJoinHandle<()>>, worker_join_handles: Vec<MustJoinHandle<()>>,
} }
struct RPCProcessorUnlockedInner { struct RPCProcessorUnlockedInner {
network_manager: NetworkManager,
timeout_us: TimestampDuration, timeout_us: TimestampDuration,
queue_size: u32, queue_size: u32,
concurrency: u32, concurrency: u32,
max_route_hop_count: usize, max_route_hop_count: usize,
#[cfg_attr(target_arch = "wasm32", expect(dead_code))]
validate_dial_info_receipt_time_ms: u32,
update_callback: UpdateCallback, update_callback: UpdateCallback,
waiting_rpc_table: OperationWaiter<RPCMessage, Option<QuestionContext>>, waiting_rpc_table: OperationWaiter<Message, Option<QuestionContext>>,
waiting_app_call_table: OperationWaiter<Vec<u8>, ()>, waiting_app_call_table: OperationWaiter<Vec<u8>, ()>,
startup_lock: StartupLock, startup_lock: StartupLock,
} }
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct RPCProcessor { pub(crate) struct RPCProcessor {
crypto: Crypto,
config: VeilidConfig,
network_manager: NetworkManager,
storage_manager: StorageManager,
routing_table: RoutingTable,
inner: Arc<Mutex<RPCProcessorInner>>, inner: Arc<Mutex<RPCProcessorInner>>,
unlocked_inner: Arc<RPCProcessorUnlockedInner>, unlocked_inner: Arc<RPCProcessorUnlockedInner>,
} }
@ -322,10 +121,12 @@ impl RPCProcessor {
} }
} }
fn new_unlocked_inner( fn new_unlocked_inner(
config: VeilidConfig, network_manager: NetworkManager,
update_callback: UpdateCallback, update_callback: UpdateCallback,
) -> RPCProcessorUnlockedInner { ) -> RPCProcessorUnlockedInner {
// make local copy of node id for easy access // make local copy of node id for easy access
let (concurrency, queue_size, max_route_hop_count, timeout_us) = {
let config = network_manager.config();
let c = config.get(); let c = config.get();
// set up channel // set up channel
@ -342,14 +143,15 @@ impl RPCProcessor {
// Default RPC concurrency is the number of CPUs * 16 rpc workers per core, as a single worker takes about 1% CPU when relaying and 16% is reasonable for baseline plus relay // Default RPC concurrency is the number of CPUs * 16 rpc workers per core, as a single worker takes about 1% CPU when relaying and 16% is reasonable for baseline plus relay
concurrency *= 16; concurrency *= 16;
} }
let validate_dial_info_receipt_time_ms = c.network.dht.validate_dial_info_receipt_time_ms; (concurrency, queue_size, max_route_hop_count, timeout_us)
};
RPCProcessorUnlockedInner { RPCProcessorUnlockedInner {
network_manager,
timeout_us, timeout_us,
queue_size, queue_size,
concurrency, concurrency,
max_route_hop_count, max_route_hop_count,
validate_dial_info_receipt_time_ms,
update_callback, update_callback,
waiting_rpc_table: OperationWaiter::new(), waiting_rpc_table: OperationWaiter::new(),
waiting_app_call_table: OperationWaiter::new(), waiting_app_call_table: OperationWaiter::new(),
@ -357,28 +159,36 @@ impl RPCProcessor {
} }
} }
pub fn new(network_manager: NetworkManager, update_callback: UpdateCallback) -> Self { pub fn new(network_manager: NetworkManager, update_callback: UpdateCallback) -> Self {
let config = network_manager.config();
Self { Self {
crypto: network_manager.crypto(),
config: config.clone(),
network_manager: network_manager.clone(),
routing_table: network_manager.routing_table(),
storage_manager: network_manager.storage_manager(),
inner: Arc::new(Mutex::new(Self::new_inner())), inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(Self::new_unlocked_inner(config, update_callback)), unlocked_inner: Arc::new(Self::new_unlocked_inner(network_manager, update_callback)),
} }
} }
pub fn network_manager(&self) -> NetworkManager { pub fn network_manager(&self) -> NetworkManager {
self.network_manager.clone() self.unlocked_inner.network_manager.clone()
}
pub fn crypto(&self) -> Crypto {
self.unlocked_inner.network_manager.crypto()
}
pub fn event_bus(&self) -> EventBus {
self.unlocked_inner.network_manager.event_bus()
} }
pub fn routing_table(&self) -> RoutingTable { pub fn routing_table(&self) -> RoutingTable {
self.routing_table.clone() self.unlocked_inner.network_manager.routing_table()
} }
pub fn storage_manager(&self) -> StorageManager { pub fn storage_manager(&self) -> StorageManager {
self.storage_manager.clone() self.unlocked_inner.network_manager.storage_manager()
}
pub fn with_config<R, F: FnOnce(&VeilidConfigInner) -> R>(&self, func: F) -> R {
let config = self.unlocked_inner.network_manager.config();
let c = config.get();
func(&c)
} }
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////
@ -411,7 +221,7 @@ impl RPCProcessor {
} }
// Inform storage manager we are up // Inform storage manager we are up
self.storage_manager self.storage_manager()
.set_rpc_processor(Some(self.clone())) .set_rpc_processor(Some(self.clone()))
.await; .await;
@ -428,7 +238,7 @@ impl RPCProcessor {
}; };
// Stop storage manager from using us // Stop storage manager from using us
self.storage_manager.set_rpc_processor(None).await; self.storage_manager().set_rpc_processor(None).await;
// Stop the rpc workers // Stop the rpc workers
let mut unord = FuturesUnordered::new(); let mut unord = FuturesUnordered::new();
@ -458,7 +268,7 @@ impl RPCProcessor {
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////
/// Get waiting app call id for debugging purposes /// Get waiting app call id for debugging purposes
pub(crate) fn get_app_call_ids(&self) -> Vec<OperationId> { pub fn get_app_call_ids(&self) -> Vec<OperationId> {
self.unlocked_inner self.unlocked_inner
.waiting_app_call_table .waiting_app_call_table
.get_operation_ids() .get_operation_ids()
@ -490,7 +300,7 @@ impl RPCProcessor {
let Some(peer_info) = sender_peer_info.opt_peer_info.clone() else { let Some(peer_info) = sender_peer_info.opt_peer_info.clone() else {
return Ok(NetworkResult::value(None)); return Ok(NetworkResult::value(None));
}; };
let address_filter = self.network_manager.address_filter(); let address_filter = self.network_manager().address_filter();
// Ensure the sender peer info is for the actual sender specified in the envelope // Ensure the sender peer info is for the actual sender specified in the envelope
if !peer_info.node_ids().contains(&sender_node_id) { if !peer_info.node_ids().contains(&sender_node_id) {
@ -641,15 +451,14 @@ impl RPCProcessor {
} }
// If nobody knows where this node is, ask the DHT for it // If nobody knows where this node is, ask the DHT for it
let (node_count, _consensus_count, fanout, timeout) = { let (node_count, _consensus_count, fanout, timeout) = this.with_config(|c| {
let c = this.config.get();
( (
c.network.dht.max_find_node_count as usize, c.network.dht.max_find_node_count as usize,
c.network.dht.resolve_node_count as usize, c.network.dht.resolve_node_count as usize,
c.network.dht.resolve_node_fanout as usize, c.network.dht.resolve_node_fanout as usize,
TimestampDuration::from(ms_to_us(c.network.dht.resolve_node_timeout_ms)), TimestampDuration::from(ms_to_us(c.network.dht.resolve_node_timeout_ms)),
) )
}; });
// Search routing domains for peer // Search routing domains for peer
// xxx: Eventually add other routing domains here // xxx: Eventually add other routing domains here
@ -682,7 +491,7 @@ impl RPCProcessor {
&self, &self,
waitable_reply: WaitableReply, waitable_reply: WaitableReply,
debug_string: String, debug_string: String,
) -> Result<TimeoutOr<(RPCMessage, TimestampDuration)>, RPCError> { ) -> Result<TimeoutOr<(Message, TimestampDuration)>, RPCError> {
let id = waitable_reply.handle.id(); let id = waitable_reply.handle.id();
let out = self let out = self
.unlocked_inner .unlocked_inner
@ -735,7 +544,7 @@ impl RPCProcessor {
} }
RPCMessageHeaderDetail::SafetyRouted(sr) => { RPCMessageHeaderDetail::SafetyRouted(sr) => {
let node_id = self let node_id = self
.routing_table .routing_table()
.node_id(sr.direct.envelope.get_crypto_kind()); .node_id(sr.direct.envelope.get_crypto_kind());
if node_id.value != reply_private_route { if node_id.value != reply_private_route {
return Err(RPCError::protocol( return Err(RPCError::protocol(
@ -775,7 +584,7 @@ impl RPCProcessor {
let pr_hop_count = remote_private_route.hop_count; let pr_hop_count = remote_private_route.hop_count;
let pr_pubkey = remote_private_route.public_key.value; let pr_pubkey = remote_private_route.public_key.value;
let crypto_kind = remote_private_route.crypto_kind(); let crypto_kind = remote_private_route.crypto_kind();
let Some(vcrypto) = self.crypto.get(crypto_kind) else { let Some(vcrypto) = self.crypto().get(crypto_kind) else {
return Err(RPCError::internal( return Err(RPCError::internal(
"crypto not available for selected private route", "crypto not available for selected private route",
)); ));
@ -984,7 +793,7 @@ impl RPCProcessor {
opt_node, opt_node,
opt_relay: _, opt_relay: _,
opt_routing_domain, opt_routing_domain,
}) = dest.get_unsafe_routing_info(self.routing_table.clone()) }) = dest.get_unsafe_routing_info(self.routing_table())
else { else {
return SenderPeerInfo::default(); return SenderPeerInfo::default();
}; };
@ -1040,12 +849,12 @@ impl RPCProcessor {
// If safety route was in use, record failure to send there // If safety route was in use, record failure to send there
if let Some(sr_pubkey) = &safety_route { if let Some(sr_pubkey) = &safety_route {
let rss = self.routing_table.route_spec_store(); let rss = self.routing_table().route_spec_store();
rss.with_route_stats_mut(send_ts, sr_pubkey, |s| s.record_send_failed()); rss.with_route_stats_mut(send_ts, sr_pubkey, |s| s.record_send_failed());
} else { } else {
// If no safety route was in use, then it's the private route's fault if we have one // If no safety route was in use, then it's the private route's fault if we have one
if let Some(pr_pubkey) = &remote_private_route { if let Some(pr_pubkey) = &remote_private_route {
let rss = self.routing_table.route_spec_store(); let rss = self.routing_table().route_spec_store();
rss.with_route_stats_mut(send_ts, pr_pubkey, |s| s.record_send_failed()); rss.with_route_stats_mut(send_ts, pr_pubkey, |s| s.record_send_failed());
} }
} }
@ -1071,11 +880,10 @@ impl RPCProcessor {
return; return;
} }
// Get route spec store // Get route spec store
let rss = self.routing_table.route_spec_store(); let rss = self.routing_table().route_spec_store();
// If safety route was used, record question lost there // If safety route was used, record question lost there
if let Some(sr_pubkey) = &safety_route { if let Some(sr_pubkey) = &safety_route {
let rss = self.routing_table.route_spec_store();
rss.with_route_stats_mut(send_ts, sr_pubkey, |s| { rss.with_route_stats_mut(send_ts, sr_pubkey, |s| {
s.record_lost_answer(); s.record_lost_answer();
}); });
@ -1119,7 +927,7 @@ impl RPCProcessor {
} }
// Get route spec store // Get route spec store
let rss = self.routing_table.route_spec_store(); let rss = self.routing_table().route_spec_store();
// If safety route was used, record send there // If safety route was used, record send there
if let Some(sr_pubkey) = &safety_route { if let Some(sr_pubkey) = &safety_route {
@ -1130,7 +938,6 @@ impl RPCProcessor {
// If remote private route was used, record send there // If remote private route was used, record send there
if let Some(pr_pubkey) = &remote_private_route { if let Some(pr_pubkey) = &remote_private_route {
let rss = self.routing_table.route_spec_store();
rss.with_route_stats_mut(send_ts, pr_pubkey, |s| { rss.with_route_stats_mut(send_ts, pr_pubkey, |s| {
s.record_sent(send_ts, bytes); s.record_sent(send_ts, bytes);
}); });
@ -1157,7 +964,7 @@ impl RPCProcessor {
return; return;
} }
// Get route spec store // Get route spec store
let rss = self.routing_table.route_spec_store(); let rss = self.routing_table().route_spec_store();
// Get latency for all local routes // Get latency for all local routes
let mut total_local_latency = TimestampDuration::new(0u64); let mut total_local_latency = TimestampDuration::new(0u64);
@ -1211,7 +1018,7 @@ impl RPCProcessor {
// This is fine because if we sent with a local safety route, // This is fine because if we sent with a local safety route,
// then we must have received with a local private route too, per the design rules // then we must have received with a local private route too, per the design rules
if let Some(sr_pubkey) = &safety_route { if let Some(sr_pubkey) = &safety_route {
let rss = self.routing_table.route_spec_store(); let rss = self.routing_table().route_spec_store();
rss.with_route_stats_mut(send_ts, sr_pubkey, |s| { rss.with_route_stats_mut(send_ts, sr_pubkey, |s| {
s.record_latency(total_latency / 2u64); s.record_latency(total_latency / 2u64);
}); });
@ -1226,7 +1033,7 @@ impl RPCProcessor {
/// Record question or statement received from node or route /// Record question or statement received from node or route
#[instrument(level = "trace", target = "rpc", skip_all)] #[instrument(level = "trace", target = "rpc", skip_all)]
fn record_question_received(&self, msg: &RPCMessage) { fn record_question_received(&self, msg: &Message) {
let recv_ts = msg.header.timestamp; let recv_ts = msg.header.timestamp;
let bytes = msg.header.body_len; let bytes = msg.header.body_len;
@ -1240,7 +1047,7 @@ impl RPCProcessor {
} }
// Process messages that arrived with no private route (private route stub) // Process messages that arrived with no private route (private route stub)
RPCMessageHeaderDetail::SafetyRouted(d) => { RPCMessageHeaderDetail::SafetyRouted(d) => {
let rss = self.routing_table.route_spec_store(); let rss = self.routing_table().route_spec_store();
// This may record nothing if the remote safety route is not also // This may record nothing if the remote safety route is not also
// a remote private route that been imported, but that's okay // a remote private route that been imported, but that's okay
@ -1250,7 +1057,7 @@ impl RPCProcessor {
} }
// Process messages that arrived to our private route // Process messages that arrived to our private route
RPCMessageHeaderDetail::PrivateRouted(d) => { RPCMessageHeaderDetail::PrivateRouted(d) => {
let rss = self.routing_table.route_spec_store(); let rss = self.routing_table().route_spec_store();
// This may record nothing if the remote safety route is not also // This may record nothing if the remote safety route is not also
// a remote private route that been imported, but that's okay // a remote private route that been imported, but that's okay
@ -1439,7 +1246,7 @@ impl RPCProcessor {
/// Issue a reply over the network, possibly using an anonymized route /// Issue a reply over the network, possibly using an anonymized route
/// The request must want a response, or this routine fails /// The request must want a response, or this routine fails
#[instrument(level = "trace", target = "rpc", skip_all)] #[instrument(level = "trace", target = "rpc", skip_all)]
async fn answer(&self, request: RPCMessage, answer: RPCAnswer) -> RPCNetworkResult<()> { async fn answer(&self, request: Message, answer: RPCAnswer) -> RPCNetworkResult<()> {
// Extract destination from respond_to // Extract destination from respond_to
let dest = network_result_try!(self.get_respond_to_destination(&request)); let dest = network_result_try!(self.get_respond_to_destination(&request));
@ -1511,10 +1318,7 @@ impl RPCProcessor {
/// This performs a capnp decode on the data, and if it passes the capnp schema /// This performs a capnp decode on the data, and if it passes the capnp schema
/// it performs the cryptographic validation required to pass the operation up for processing /// it performs the cryptographic validation required to pass the operation up for processing
#[instrument(level = "trace", target = "rpc", skip_all)] #[instrument(level = "trace", target = "rpc", skip_all)]
fn decode_rpc_operation( fn decode_rpc_operation(&self, encoded_msg: &MessageEncoded) -> Result<RPCOperation, RPCError> {
&self,
encoded_msg: &RPCMessageEncoded,
) -> Result<RPCOperation, RPCError> {
let reader = encoded_msg.data.get_reader()?; let reader = encoded_msg.data.get_reader()?;
let op_reader = reader let op_reader = reader
.get_root::<veilid_capnp::operation::Reader>() .get_root::<veilid_capnp::operation::Reader>()
@ -1556,7 +1360,7 @@ impl RPCProcessor {
// Validate the RPC operation // Validate the RPC operation
let validate_context = RPCValidateContext { let validate_context = RPCValidateContext {
crypto: self.crypto.clone(), crypto: self.crypto(),
// rpc_processor: self.clone(), // rpc_processor: self.clone(),
question_context, question_context,
}; };
@ -1567,8 +1371,8 @@ impl RPCProcessor {
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////
#[instrument(level = "trace", target = "rpc", skip_all)] #[instrument(level = "trace", target = "rpc", skip_all)]
async fn process_rpc_message(&self, encoded_msg: RPCMessageEncoded) -> RPCNetworkResult<()> { async fn process_rpc_message(&self, encoded_msg: MessageEncoded) -> RPCNetworkResult<()> {
let address_filter = self.network_manager.address_filter(); let address_filter = self.network_manager().address_filter();
// Decode operation appropriately based on header detail // Decode operation appropriately based on header detail
let msg = match &encoded_msg.header.detail { let msg = match &encoded_msg.header.detail {
@ -1639,7 +1443,7 @@ impl RPCProcessor {
} }
// Make the RPC message // Make the RPC message
RPCMessage { Message {
header: encoded_msg.header, header: encoded_msg.header,
operation, operation,
opt_sender_nr, opt_sender_nr,
@ -1660,7 +1464,7 @@ impl RPCProcessor {
}; };
// Make the RPC message // Make the RPC message
RPCMessage { Message {
header: encoded_msg.header, header: encoded_msg.header,
operation, operation,
opt_sender_nr: None, opt_sender_nr: None,
@ -1758,7 +1562,7 @@ impl RPCProcessor {
async fn rpc_worker( async fn rpc_worker(
self, self,
stop_token: StopToken, stop_token: StopToken,
receiver: flume::Receiver<(Span, RPCMessageEncoded)>, receiver: flume::Receiver<(Span, MessageEncoded)>,
) { ) {
while let Ok(Ok((prev_span, msg))) = while let Ok(Ok((prev_span, msg))) =
receiver.recv_async().timeout_at(stop_token.clone()).await receiver.recv_async().timeout_at(stop_token.clone()).await
@ -1801,7 +1605,7 @@ impl RPCProcessor {
bail!("routing domain should match peer noderef filter"); bail!("routing domain should match peer noderef filter");
} }
let header = RPCMessageHeader { let header = MessageHeader {
detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect { detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect {
envelope, envelope,
peer_noderef, peer_noderef,
@ -1812,9 +1616,9 @@ impl RPCProcessor {
body_len: ByteCount::new(body.len() as u64), body_len: ByteCount::new(body.len() as u64),
}; };
let msg = RPCMessageEncoded { let msg = MessageEncoded {
header, header,
data: RPCMessageData { contents: body }, data: MessageData { contents: body },
}; };
let send_channel = { let send_channel = {
@ -1838,7 +1642,7 @@ impl RPCProcessor {
sequencing: Sequencing, sequencing: Sequencing,
body: Vec<u8>, body: Vec<u8>,
) -> EyreResult<()> { ) -> EyreResult<()> {
let header = RPCMessageHeader { let header = MessageHeader {
detail: RPCMessageHeaderDetail::SafetyRouted(RPCMessageHeaderDetailSafetyRouted { detail: RPCMessageHeaderDetail::SafetyRouted(RPCMessageHeaderDetailSafetyRouted {
direct, direct,
remote_safety_route, remote_safety_route,
@ -1848,9 +1652,9 @@ impl RPCProcessor {
body_len: (body.len() as u64).into(), body_len: (body.len() as u64).into(),
}; };
let msg = RPCMessageEncoded { let msg = MessageEncoded {
header, header,
data: RPCMessageData { contents: body }, data: MessageData { contents: body },
}; };
let send_channel = { let send_channel = {
let inner = self.inner.lock(); let inner = self.inner.lock();
@ -1874,7 +1678,7 @@ impl RPCProcessor {
safety_spec: SafetySpec, safety_spec: SafetySpec,
body: Vec<u8>, body: Vec<u8>,
) -> EyreResult<()> { ) -> EyreResult<()> {
let header = RPCMessageHeader { let header = MessageHeader {
detail: RPCMessageHeaderDetail::PrivateRouted(RPCMessageHeaderDetailPrivateRouted { detail: RPCMessageHeaderDetail::PrivateRouted(RPCMessageHeaderDetailPrivateRouted {
direct, direct,
remote_safety_route, remote_safety_route,
@ -1885,9 +1689,9 @@ impl RPCProcessor {
body_len: (body.len() as u64).into(), body_len: (body.len() as u64).into(),
}; };
let msg = RPCMessageEncoded { let msg = MessageEncoded {
header, header,
data: RPCMessageData { contents: body }, data: MessageData { contents: body },
}; };
let send_channel = { let send_channel = {

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Debug)] #[derive(Debug)]
pub struct OperationWaitHandle<T, C> pub(super) struct OperationWaitHandle<T, C>
where where
T: Unpin, T: Unpin,
C: Unpin + Clone, C: Unpin + Clone,
@ -34,7 +34,7 @@ where
} }
#[derive(Debug)] #[derive(Debug)]
pub struct OperationWaitingOp<T, C> struct OperationWaitingOp<T, C>
where where
T: Unpin, T: Unpin,
C: Unpin + Clone, C: Unpin + Clone,
@ -45,7 +45,7 @@ where
} }
#[derive(Debug)] #[derive(Debug)]
pub struct OperationWaiterInner<T, C> struct OperationWaiterInner<T, C>
where where
T: Unpin, T: Unpin,
C: Unpin + Clone, C: Unpin + Clone,
@ -54,7 +54,7 @@ where
} }
#[derive(Debug)] #[derive(Debug)]
pub struct OperationWaiter<T, C> pub(super) struct OperationWaiter<T, C>
where where
T: Unpin, T: Unpin,
C: Unpin + Clone, C: Unpin + Clone,

Some files were not shown because too many files have changed in this diff Show More