Merge branch 'deadlock-fix' into 'main'

0.4.1 changes

See merge request veilid/veilid!329
This commit is contained in:
Christien Rioux 2024-10-23 23:27:34 +00:00
commit 5dffb30603
140 changed files with 1272 additions and 710 deletions

View File

@ -1,3 +1,12 @@
**Changed in Veilid 0.4.1**
- Implement top level event bus to do asynchronous lock-free communication between subsystems
- Fix deadlock in socket address change event
- Fix deadlock in peer info change event
- Fix incorrect node info equivalence check
- Ping relays every second instead of every 10 seconds
- MR !328 'tiny improvements'
**Changed in Veilid 0.4.0**
- RFC-0001: Constrain DHT Subkey Size, issue #406

View File

@ -15,18 +15,20 @@ struct AttachmentManagerInner {
}
struct AttachmentManagerUnlockedInner {
_event_bus: EventBus,
config: VeilidConfig,
network_manager: NetworkManager,
}
#[derive(Clone)]
pub(crate) struct AttachmentManager {
pub struct AttachmentManager {
inner: Arc<Mutex<AttachmentManagerInner>>,
unlocked_inner: Arc<AttachmentManagerUnlockedInner>,
}
impl AttachmentManager {
fn new_unlocked_inner(
event_bus: EventBus,
config: VeilidConfig,
storage_manager: StorageManager,
table_store: TableStore,
@ -34,8 +36,10 @@ impl AttachmentManager {
crypto: Crypto,
) -> AttachmentManagerUnlockedInner {
AttachmentManagerUnlockedInner {
_event_bus: event_bus.clone(),
config: config.clone(),
network_manager: NetworkManager::new(
event_bus,
config,
storage_manager,
table_store,
@ -57,6 +61,7 @@ impl AttachmentManager {
}
}
pub fn new(
event_bus: EventBus,
config: VeilidConfig,
storage_manager: StorageManager,
table_store: TableStore,
@ -66,6 +71,7 @@ impl AttachmentManager {
Self {
inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(Self::new_unlocked_inner(
event_bus,
config,
storage_manager,
table_store,

View File

@ -11,10 +11,11 @@ pub type UpdateCallback = Arc<dyn Fn(VeilidUpdate) + Send + Sync>;
/// Internal services startup mechanism.
/// Ensures that everything is started up, and shut down in the right order
/// and provides an atomic state for if the system is properly operational.
struct ServicesContext {
struct StartupShutdownContext {
pub config: VeilidConfig,
pub update_callback: UpdateCallback,
pub event_bus: Option<EventBus>,
pub protected_store: Option<ProtectedStore>,
pub table_store: Option<TableStore>,
#[cfg(feature = "unstable-blockstore")]
@ -24,11 +25,12 @@ struct ServicesContext {
pub storage_manager: Option<StorageManager>,
}
impl ServicesContext {
impl StartupShutdownContext {
pub fn new_empty(config: VeilidConfig, update_callback: UpdateCallback) -> Self {
Self {
config,
update_callback,
event_bus: None,
protected_store: None,
table_store: None,
#[cfg(feature = "unstable-blockstore")]
@ -39,9 +41,11 @@ impl ServicesContext {
}
}
#[allow(clippy::too_many_arguments)]
pub fn new_full(
config: VeilidConfig,
update_callback: UpdateCallback,
event_bus: EventBus,
protected_store: ProtectedStore,
table_store: TableStore,
#[cfg(feature = "unstable-blockstore")] block_store: BlockStore,
@ -52,6 +56,7 @@ impl ServicesContext {
Self {
config,
update_callback,
event_bus: Some(event_bus),
protected_store: Some(protected_store),
table_store: Some(table_store),
#[cfg(feature = "unstable-blockstore")]
@ -75,8 +80,17 @@ impl ServicesContext {
ApiTracingLayer::add_callback(program_name, namespace, self.update_callback.clone())
.await?;
// Add the event bus
let event_bus = EventBus::new();
if let Err(e) = event_bus.startup().await {
error!("failed to start up event bus: {}", e);
self.shutdown().await;
return Err(e.into());
}
self.event_bus = Some(event_bus.clone());
// Set up protected store
let protected_store = ProtectedStore::new(self.config.clone());
let protected_store = ProtectedStore::new(event_bus.clone(), self.config.clone());
if let Err(e) = protected_store.init().await {
error!("failed to init protected store: {}", e);
self.shutdown().await;
@ -85,8 +99,12 @@ impl ServicesContext {
self.protected_store = Some(protected_store.clone());
// Set up tablestore and crypto system
let table_store = TableStore::new(self.config.clone(), protected_store.clone());
let crypto = Crypto::new(self.config.clone(), table_store.clone());
let table_store = TableStore::new(
event_bus.clone(),
self.config.clone(),
protected_store.clone(),
);
let crypto = Crypto::new(event_bus.clone(), self.config.clone(), table_store.clone());
table_store.set_crypto(crypto.clone());
// Initialize table store first, so crypto code can load caches
@ -110,7 +128,7 @@ impl ServicesContext {
// Set up block store
#[cfg(feature = "unstable-blockstore")]
{
let block_store = BlockStore::new(self.config.clone());
let block_store = BlockStore::new(event_bus.clone(), self.config.clone());
if let Err(e) = block_store.init().await {
error!("failed to init block store: {}", e);
self.shutdown().await;
@ -123,6 +141,7 @@ impl ServicesContext {
let update_callback = self.update_callback.clone();
let storage_manager = StorageManager::new(
event_bus.clone(),
self.config.clone(),
self.crypto.clone().unwrap(),
self.table_store.clone().unwrap(),
@ -139,6 +158,7 @@ impl ServicesContext {
// Set up attachment manager
let update_callback = self.update_callback.clone();
let attachment_manager = AttachmentManager::new(
event_bus.clone(),
self.config.clone(),
storage_manager,
table_store,
@ -180,6 +200,9 @@ impl ServicesContext {
if let Some(protected_store) = &mut self.protected_store {
protected_store.terminate().await;
}
if let Some(event_bus) = &mut self.event_bus {
event_bus.shutdown().await;
}
info!("Veilid API shutdown complete");
@ -199,9 +222,11 @@ impl ServicesContext {
}
/////////////////////////////////////////////////////////////////////////////
pub(crate) struct VeilidCoreContext {
pub struct VeilidCoreContext {
pub config: VeilidConfig,
pub update_callback: UpdateCallback,
// Event bus
pub event_bus: EventBus,
// Services
pub storage_manager: StorageManager,
pub protected_store: ProtectedStore,
@ -249,12 +274,13 @@ impl VeilidCoreContext {
}
}
let mut sc = ServicesContext::new_empty(config.clone(), update_callback);
let mut sc = StartupShutdownContext::new_empty(config.clone(), update_callback);
sc.startup().await.map_err(VeilidAPIError::generic)?;
Ok(VeilidCoreContext {
config: sc.config,
update_callback: sc.update_callback,
event_bus: sc.event_bus.unwrap(),
storage_manager: sc.storage_manager.unwrap(),
protected_store: sc.protected_store.unwrap(),
table_store: sc.table_store.unwrap(),
@ -267,9 +293,10 @@ impl VeilidCoreContext {
#[instrument(level = "trace", target = "core_context", skip_all)]
async fn shutdown(self) {
let mut sc = ServicesContext::new_full(
let mut sc = StartupShutdownContext::new_full(
self.config.clone(),
self.update_callback.clone(),
self.event_bus,
self.protected_store,
self.table_store,
#[cfg(feature = "unstable-blockstore")]
@ -397,7 +424,7 @@ pub async fn api_startup_config(
}
#[instrument(level = "trace", target = "core_context", skip_all)]
pub(crate) async fn api_shutdown(context: VeilidCoreContext) {
pub async fn api_shutdown(context: VeilidCoreContext) {
let mut initialized_lock = INITIALIZED.lock().await;
let init_key = {

View File

@ -79,6 +79,7 @@ struct CryptoInner {
}
struct CryptoUnlockedInner {
_event_bus: EventBus,
config: VeilidConfig,
table_store: TableStore,
}
@ -102,9 +103,10 @@ impl Crypto {
}
}
pub fn new(config: VeilidConfig, table_store: TableStore) -> Self {
pub fn new(event_bus: EventBus, config: VeilidConfig, table_store: TableStore) -> Self {
let out = Self {
unlocked_inner: Arc::new(CryptoUnlockedInner {
_event_bus: event_bus,
config,
table_store,
}),

View File

@ -6,6 +6,7 @@ struct BlockStoreInner {
#[derive(Clone)]
pub struct BlockStore {
event_bus: EventBus,
config: VeilidConfig,
inner: Arc<Mutex<BlockStoreInner>>,
}
@ -14,8 +15,9 @@ impl BlockStore {
fn new_inner() -> BlockStoreInner {
BlockStoreInner {}
}
pub fn new(config: VeilidConfig) -> Self {
pub fn new(event_bus: EventBus, config: VeilidConfig) -> Self {
Self {
event_bus,
config,
inner: Arc::new(Mutex::new(Self::new_inner())),
}

View File

@ -9,6 +9,7 @@ pub struct ProtectedStoreInner {
#[derive(Clone)]
pub struct ProtectedStore {
_event_bus: EventBus,
config: VeilidConfig,
inner: Arc<Mutex<ProtectedStoreInner>>,
}
@ -20,8 +21,9 @@ impl ProtectedStore {
}
}
pub fn new(config: VeilidConfig) -> Self {
pub fn new(event_bus: EventBus, config: VeilidConfig) -> Self {
Self {
_event_bus: event_bus,
config,
inner: Arc::new(Mutex::new(Self::new_inner())),
}

View File

@ -6,6 +6,7 @@ struct BlockStoreInner {
#[derive(Clone)]
pub struct BlockStore {
event_bus: EventBus,
config: VeilidConfig,
inner: Arc<Mutex<BlockStoreInner>>,
}
@ -14,8 +15,9 @@ impl BlockStore {
fn new_inner() -> BlockStoreInner {
BlockStoreInner {}
}
pub fn new(config: VeilidConfig) -> Self {
pub fn new(event_bus: EventBus, config: VeilidConfig) -> Self {
Self {
event_bus,
config,
inner: Arc::new(Mutex::new(Self::new_inner())),
}

View File

@ -5,12 +5,16 @@ use web_sys::*;
#[derive(Clone)]
pub struct ProtectedStore {
_event_bus: EventBus,
config: VeilidConfig,
}
impl ProtectedStore {
pub fn new(config: VeilidConfig) -> Self {
Self { config }
pub fn new(event_bus: EventBus, config: VeilidConfig) -> Self {
Self {
_event_bus: event_bus,
config,
}
}
#[instrument(level = "trace", skip(self), err)]

View File

@ -23,9 +23,9 @@ pub const ADDRESS_CHECK_CACHE_SIZE: usize = 10;
// TimestampDuration::new(3_600_000_000_u64); // 60 minutes
/// Address checker config
pub(crate) struct AddressCheckConfig {
pub(crate) detect_address_changes: bool,
pub(crate) ip6_prefix_size: usize,
pub struct AddressCheckConfig {
pub detect_address_changes: bool,
pub ip6_prefix_size: usize,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
@ -33,7 +33,7 @@ struct AddressCheckCacheKey(RoutingDomain, ProtocolType, AddressType);
/// Address checker - keep track of how other nodes are seeing our node's address on a per-protocol basis
/// Used to determine if our address has changed and if we should re-publish new PeerInfo
pub(crate) struct AddressCheck {
pub struct AddressCheck {
config: AddressCheckConfig,
net: Network,
current_network_class: BTreeMap<RoutingDomain, NetworkClass>,

View File

@ -7,7 +7,7 @@ const DIAL_INFO_FAILURE_DURATION_MIN: usize = 10;
const MAX_DIAL_INFO_FAILURES: usize = 65536;
#[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)]
pub enum AddressFilterError {
pub(crate) enum AddressFilterError {
#[error("Count exceeded")]
CountExceeded,
#[error("Rate exceeded")]

View File

@ -1,5 +1,5 @@
use super::*;
pub(crate) use connection_table::ConnectionRefKind;
use connection_table::ConnectionRefKind;
use connection_table::*;
use network_connection::*;
use stop_token::future::FutureExt;
@ -19,7 +19,7 @@ enum ConnectionManagerEvent {
}
#[derive(Debug)]
pub(crate) struct ConnectionRefScope {
pub struct ConnectionRefScope {
connection_manager: ConnectionManager,
id: NetworkConnectionId,
}
@ -78,7 +78,7 @@ impl core::fmt::Debug for ConnectionManagerArc {
}
#[derive(Debug, Clone)]
pub(crate) struct ConnectionManager {
pub struct ConnectionManager {
arc: Arc<ConnectionManagerArc>,
}

View File

@ -8,7 +8,7 @@ const PRIORITY_FLOW_PERCENTAGE: usize = 25;
///////////////////////////////////////////////////////////////////////////////
#[derive(ThisError, Debug)]
pub(in crate::network_manager) enum ConnectionTableAddError {
pub enum ConnectionTableAddError {
#[error("Connection already added to table")]
AlreadyExists(NetworkConnection),
#[error("Connection address was filtered")]
@ -30,7 +30,7 @@ impl ConnectionTableAddError {
}
#[derive(Clone, Copy, Debug)]
pub(crate) enum ConnectionRefKind {
pub enum ConnectionRefKind {
AddRef,
RemoveRef,
}
@ -49,7 +49,7 @@ struct ConnectionTableInner {
}
#[derive(Debug)]
pub(in crate::network_manager) struct ConnectionTable {
pub struct ConnectionTable {
inner: Arc<Mutex<ConnectionTableInner>>,
}

View File

@ -3,7 +3,7 @@ use super::*;
impl NetworkManager {
// Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism)
#[instrument(level = "trace", target = "net", skip(self), ret, err)]
pub(crate) async fn handle_boot_request(&self, flow: Flow) -> EyreResult<NetworkResult<()>> {
pub async fn handle_boot_request(&self, flow: Flow) -> EyreResult<NetworkResult<()>> {
let routing_table = self.routing_table();
// Get a bunch of nodes with the various

View File

@ -23,12 +23,11 @@ pub mod tests;
////////////////////////////////////////////////////////////////////////////////////////
pub(crate) use connection_manager::*;
pub(crate) use network_connection::*;
pub(crate) use receipt_manager::*;
pub(crate) use stats::*;
pub use types::*;
pub use connection_manager::*;
pub use network_connection::*;
pub use receipt_manager::*;
pub use stats::*;
pub(crate) use types::*;
////////////////////////////////////////////////////////////////////////////////////////
use address_check::*;
@ -76,7 +75,7 @@ struct ClientAllowlistEntry {
}
#[derive(Clone, Debug)]
pub(crate) struct SendDataMethod {
pub struct SendDataMethod {
/// How the data was sent, possibly to a relay
pub contact_method: NodeContactMethod,
/// Pre-relayed contact method
@ -87,7 +86,7 @@ pub(crate) struct SendDataMethod {
/// Mechanism required to contact another node
#[derive(Clone, Debug)]
pub(crate) enum NodeContactMethod {
pub enum NodeContactMethod {
/// Node is not reachable by any means
Unreachable,
/// Connection should have already existed
@ -134,6 +133,7 @@ struct NetworkManagerInner {
struct NetworkManagerUnlockedInner {
// Handles
event_bus: EventBus,
config: VeilidConfig,
storage_manager: StorageManager,
table_store: TableStore,
@ -170,6 +170,7 @@ impl NetworkManager {
}
}
fn new_unlocked_inner(
event_bus: EventBus,
config: VeilidConfig,
storage_manager: StorageManager,
table_store: TableStore,
@ -178,6 +179,7 @@ impl NetworkManager {
network_key: Option<SharedSecret>,
) -> NetworkManagerUnlockedInner {
NetworkManagerUnlockedInner {
event_bus,
config: config.clone(),
storage_manager,
table_store,
@ -202,6 +204,7 @@ impl NetworkManager {
}
pub fn new(
event_bus: EventBus,
config: VeilidConfig,
storage_manager: StorageManager,
table_store: TableStore,
@ -238,6 +241,7 @@ impl NetworkManager {
let this = Self {
inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(Self::new_unlocked_inner(
event_bus,
config,
storage_manager,
table_store,
@ -252,6 +256,9 @@ impl NetworkManager {
this
}
pub fn event_bus(&self) -> EventBus {
self.unlocked_inner.event_bus.clone()
}
pub fn config(&self) -> VeilidConfig {
self.unlocked_inner.config.clone()
}
@ -404,7 +411,7 @@ impl NetworkManager {
.unwrap()
.clone(),
);
let receipt_manager = ReceiptManager::new(self.clone());
let receipt_manager = ReceiptManager::new();
*self.unlocked_inner.components.write() = Some(NetworkComponents {
net: net.clone(),
connection_manager: connection_manager.clone(),
@ -437,6 +444,22 @@ impl NetworkManager {
rpc_processor.startup().await?;
receipt_manager.startup().await?;
// Register event handlers
let this = self.clone();
self.event_bus().subscribe(move |evt| {
let this = this.clone();
Box::pin(async move {
this.peer_info_change_event_handler(evt);
})
});
let this = self.clone();
self.event_bus().subscribe(move |evt| {
let this = this.clone();
Box::pin(async move {
this.socket_address_change_event_handler(evt);
})
});
log_net!("NetworkManager::internal_startup end");
Ok(StartupDisposition::Success)
@ -1260,32 +1283,25 @@ impl NetworkManager {
}
// Report peer info changes
pub fn report_peer_info_change(&mut self, peer_info: Arc<PeerInfo>) {
fn peer_info_change_event_handler(&self, evt: Arc<PeerInfoChangeEvent>) {
let mut inner = self.inner.lock();
if let Some(address_check) = inner.address_check.as_mut() {
address_check.report_peer_info_change(peer_info);
address_check.report_peer_info_change(evt.peer_info.clone());
}
}
// Determine if our IP address has changed
// this means we should recreate our public dial info if it is not static and rediscover it
// Wait until we have received confirmation from N different peers
pub fn report_socket_address_change(
&self,
routing_domain: RoutingDomain, // the routing domain this flow is over
socket_address: SocketAddress, // the socket address as seen by the remote peer
old_socket_address: Option<SocketAddress>, // the socket address previously for this peer
flow: Flow, // the flow used
reporting_peer: NodeRef, // the peer's noderef reporting the socket address
) {
fn socket_address_change_event_handler(&self, evt: Arc<SocketAddressChangeEvent>) {
let mut inner = self.inner.lock();
if let Some(address_check) = inner.address_check.as_mut() {
address_check.report_socket_address_change(
routing_domain,
socket_address,
old_socket_address,
flow,
reporting_peer,
evt.routing_domain,
evt.socket_address,
evt.old_socket_address,
evt.flow,
evt.reporting_peer.clone(),
);
}
}

View File

@ -7,6 +7,8 @@ mod protocol;
mod start_protocols;
mod tasks;
pub(super) use protocol::*;
use super::*;
use crate::routing_table::*;
use connection_manager::*;
@ -16,7 +18,6 @@ use network_tcp::*;
use protocol::tcp::RawTcpProtocolHandler;
use protocol::udp::RawUdpProtocolHandler;
use protocol::ws::WebsocketProtocolHandler;
pub(in crate::network_manager) use protocol::*;
use start_protocols::*;
use async_tls::TlsAcceptor;
@ -133,7 +134,7 @@ struct NetworkUnlockedInner {
}
#[derive(Clone)]
pub(in crate::network_manager) struct Network {
pub(super) struct Network {
config: VeilidConfig,
inner: Arc<Mutex<NetworkInner>>,
unlocked_inner: Arc<NetworkUnlockedInner>,

View File

@ -6,7 +6,7 @@ use stop_token::future::FutureExt;
/////////////////////////////////////////////////////////////////
#[derive(Clone)]
pub(in crate::network_manager) struct ListenerState {
pub struct ListenerState {
pub protocol_accept_handlers: Vec<Box<dyn ProtocolAcceptHandler + 'static>>,
pub tls_protocol_handlers: Vec<Box<dyn ProtocolAcceptHandler + 'static>>,
pub tls_acceptor: Option<TlsAcceptor>,

View File

@ -8,7 +8,7 @@ use super::*;
use std::io;
#[derive(Debug)]
pub(in crate::network_manager) enum ProtocolNetworkConnection {
pub(crate) enum ProtocolNetworkConnection {
// Dummy(DummyNetworkConnection),
RawTcp(tcp::RawTcpNetworkConnection),
WsAccepted(ws::WebSocketNetworkConnectionAccepted),

View File

@ -42,11 +42,16 @@ impl RawTcpNetworkConnection {
if message.len() > MAX_MESSAGE_SIZE {
bail_io_error_other!("sending too large TCP message");
}
let len = message.len() as u16;
let header = [b'V', b'L', len as u8, (len >> 8) as u8];
network_result_try!(stream.write_all(&header).await.into_network_result()?);
network_result_try!(stream.write_all(&message).await.into_network_result()?);
let mut data = Vec::with_capacity(message.len() + 4);
data.extend_from_slice(&header);
data.extend_from_slice(&message);
network_result_try!(stream.write_all(&data).await.into_network_result()?);
stream.flush().await.into_network_result()
}
@ -100,7 +105,7 @@ impl RawTcpNetworkConnection {
///////////////////////////////////////////////////////////
#[derive(Clone)]
pub(in crate::network_manager) struct RawTcpProtocolHandler
pub struct RawTcpProtocolHandler
where
Self: ProtocolAcceptHandler,
{

View File

@ -2,7 +2,7 @@ use super::*;
use sockets::*;
#[derive(Clone)]
pub(in crate::network_manager) struct RawUdpProtocolHandler {
pub struct RawUdpProtocolHandler {
socket: Arc<UdpSocket>,
assembly_buffer: AssemblyBuffer,
address_filter: Option<AddressFilter>,

View File

@ -181,7 +181,7 @@ struct WebsocketProtocolHandlerArc {
}
#[derive(Clone)]
pub(in crate::network_manager) struct WebsocketProtocolHandler
pub struct WebsocketProtocolHandler
where
Self: ProtocolAcceptHandler,
{

View File

@ -5,7 +5,7 @@ mod upnp_task;
use super::*;
impl Network {
pub(crate) fn setup_tasks(&self) {
pub fn setup_tasks(&self) {
// Set update network class tick task
{
let this = self.clone();
@ -45,7 +45,7 @@ impl Network {
}
#[instrument(level = "trace", target = "net", name = "Network::tick", skip_all, err)]
pub(crate) async fn tick(&self) -> EyreResult<()> {
pub async fn tick(&self) -> EyreResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return Ok(());

View File

@ -11,7 +11,7 @@ cfg_if::cfg_if! {
///////////////////////////////////////////////////////////
// Accept
pub(in crate::network_manager) trait ProtocolAcceptHandler: ProtocolAcceptHandlerClone + Send + Sync {
pub(crate) trait ProtocolAcceptHandler: ProtocolAcceptHandlerClone + Send + Sync {
fn on_accept(
&self,
stream: AsyncPeekStream,
@ -20,7 +20,7 @@ cfg_if::cfg_if! {
) -> SendPinBoxFuture<io::Result<Option<ProtocolNetworkConnection>>>;
}
pub(in crate::network_manager) trait ProtocolAcceptHandlerClone {
pub(crate) trait ProtocolAcceptHandlerClone {
fn clone_box(&self) -> Box<dyn ProtocolAcceptHandler>;
}
@ -38,7 +38,7 @@ cfg_if::cfg_if! {
}
}
pub(in crate::network_manager) type NewProtocolAcceptHandler =
pub(crate) type NewProtocolAcceptHandler =
dyn Fn(VeilidConfig, bool) -> Box<dyn ProtocolAcceptHandler> + Send;
}
}
@ -84,7 +84,7 @@ pub struct NetworkConnectionStats {
/// Represents a connection in the connection table for connection-oriented protocols
#[derive(Debug)]
pub(in crate::network_manager) struct NetworkConnection {
pub(crate) struct NetworkConnection {
/// A unique id for this connection
connection_id: NetworkConnectionId,
/// The dial info used to make this connection if it was made with 'connect'

View File

@ -7,7 +7,7 @@ use routing_table::*;
use stop_token::future::FutureExt;
#[derive(Clone, Debug)]
pub(crate) enum ReceiptEvent {
pub enum ReceiptEvent {
ReturnedOutOfBand,
ReturnedInBand {
inbound_noderef: FilteredNodeRef,
@ -29,7 +29,7 @@ pub(super) enum ReceiptReturned {
Private { private_route: PublicKey },
}
pub(crate) trait ReceiptCallback: Send + 'static {
pub trait ReceiptCallback: Send + 'static {
fn call(
&self,
event: ReceiptEvent,
@ -143,7 +143,6 @@ impl PartialOrd for ReceiptRecordTimestampSort {
///////////////////////////////////
struct ReceiptManagerInner {
network_manager: NetworkManager,
records_by_nonce: BTreeMap<Nonce, Arc<Mutex<ReceiptRecord>>>,
next_oldest_ts: Option<Timestamp>,
stop_source: Option<StopSource>,
@ -161,9 +160,8 @@ pub(super) struct ReceiptManager {
}
impl ReceiptManager {
fn new_inner(network_manager: NetworkManager) -> ReceiptManagerInner {
fn new_inner() -> ReceiptManagerInner {
ReceiptManagerInner {
network_manager,
records_by_nonce: BTreeMap::new(),
next_oldest_ts: None,
stop_source: None,
@ -171,19 +169,15 @@ impl ReceiptManager {
}
}
pub fn new(network_manager: NetworkManager) -> Self {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(Self::new_inner(network_manager))),
inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(ReceiptManagerUnlockedInner {
startup_lock: StartupLock::new(),
}),
}
}
pub fn network_manager(&self) -> NetworkManager {
self.inner.lock().network_manager.clone()
}
pub async fn startup(&self) -> EyreResult<()> {
let guard = self.unlocked_inner.startup_lock.startup()?;
log_net!(debug "startup receipt manager");
@ -322,8 +316,6 @@ impl ReceiptManager {
return;
};
let network_manager = self.network_manager();
// Stop all tasks
let timeout_task = {
let mut inner = self.inner.lock();
@ -338,7 +330,7 @@ impl ReceiptManager {
panic!("joining timeout task failed");
}
*self.inner.lock() = Self::new_inner(network_manager);
*self.inner.lock() = Self::new_inner();
guard.success();
log_net!(debug "finished receipt manager shutdown");

View File

@ -16,7 +16,7 @@ impl NetworkManager {
/// NodeContactMethod calculation requires first calculating the per-RoutingDomain ContactMethod
/// between the source and destination PeerInfo, which is a stateless operation.
#[instrument(level = "trace", target = "net", skip_all, err)]
pub(crate) async fn send_data(
pub async fn send_data(
&self,
destination_node_ref: FilteredNodeRef,
data: Vec<u8>,
@ -34,7 +34,7 @@ impl NetworkManager {
}
#[instrument(level = "trace", target = "net", skip_all)]
pub(crate) fn try_possibly_relayed_contact_method(
pub fn try_possibly_relayed_contact_method(
&self,
possibly_relayed_contact_method: NodeContactMethod,
destination_node_ref: FilteredNodeRef,
@ -400,7 +400,7 @@ impl NetworkManager {
/// Uses NodeRefs to ensure nodes are referenced, this is not a part of 'RoutingTable' because RoutingTable is not
/// allowed to use NodeRefs due to recursive locking
#[instrument(level = "trace", target = "net", skip_all, err)]
pub(crate) fn get_node_contact_method(
pub fn get_node_contact_method(
&self,
target_node_ref: FilteredNodeRef,
) -> EyreResult<NodeContactMethod> {

View File

@ -35,7 +35,7 @@ impl Default for NetworkManagerStats {
impl NetworkManager {
// Callbacks from low level network for statistics gathering
pub(crate) fn stats_packet_sent(&self, addr: IpAddr, bytes: ByteCount) {
pub fn stats_packet_sent(&self, addr: IpAddr, bytes: ByteCount) {
let inner = &mut *self.inner.lock();
inner
.stats
@ -52,7 +52,7 @@ impl NetworkManager {
.add_up(bytes);
}
pub(crate) fn stats_packet_rcvd(&self, addr: IpAddr, bytes: ByteCount) {
pub fn stats_packet_rcvd(&self, addr: IpAddr, bytes: ByteCount) {
let inner = &mut *self.inner.lock();
inner
.stats

View File

@ -3,7 +3,7 @@ pub mod rolling_transfers;
use super::*;
impl NetworkManager {
pub(crate) fn setup_tasks(&self) {
pub fn setup_tasks(&self) {
// Set rolling transfers tick task
{
let this = self.clone();
@ -60,7 +60,7 @@ impl NetworkManager {
Ok(())
}
pub(crate) async fn cancel_tasks(&self) {
pub async fn cancel_tasks(&self) {
log_net!(debug "stopping rolling transfers task");
if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await {
warn!("rolling_transfers_task not stopped: {}", e);

View File

@ -3,7 +3,7 @@ use super::*;
impl NetworkManager {
// Compute transfer statistics for the low level network
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn rolling_transfers_task_routine(
pub async fn rolling_transfers_task_routine(
self,
_stop_token: StopToken,
last_ts: Timestamp,

View File

@ -2,7 +2,7 @@ use super::*;
// Ordering here matters, IPV6 is preferred to IPV4 in dial info sorts
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
pub enum Address {
pub(crate) enum Address {
IPV6(Ipv6Addr),
IPV4(Ipv4Addr),
}

View File

@ -4,8 +4,8 @@ use super::*;
#[allow(clippy::derived_hash_with_manual_eq)]
#[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)]
#[enumset(repr = "u8")]
pub enum AddressType {
pub(crate) enum AddressType {
IPV6 = 0,
IPV4 = 1,
}
pub type AddressTypeSet = EnumSet<AddressType>;
pub(crate) type AddressTypeSet = EnumSet<AddressType>;

View File

@ -5,16 +5,16 @@ mod wss;
use super::*;
pub use tcp::*;
pub use udp::*;
pub use ws::*;
pub use wss::*;
pub(crate) use tcp::*;
pub(crate) use udp::*;
pub(crate) use ws::*;
pub(crate) use wss::*;
// Keep member order appropriate for sorting < preference
// Must match ProtocolType order
#[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
#[serde(tag = "kind")]
pub enum DialInfo {
pub(crate) enum DialInfo {
UDP(DialInfoUDP),
TCP(DialInfoTCP),
WS(DialInfoWS),

View File

@ -1,6 +1,6 @@
use super::*;
#[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
pub struct DialInfoTCP {
pub(crate) struct DialInfoTCP {
pub socket_address: SocketAddress,
}

View File

@ -1,6 +1,6 @@
use super::*;
#[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
pub struct DialInfoUDP {
pub(crate) struct DialInfoUDP {
pub socket_address: SocketAddress,
}

View File

@ -1,7 +1,7 @@
use super::*;
#[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
pub struct DialInfoWS {
pub(crate) struct DialInfoWS {
pub socket_address: SocketAddress,
pub request: String,
}

View File

@ -1,7 +1,7 @@
use super::*;
#[derive(Clone, Default, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
pub struct DialInfoWSS {
pub(crate) struct DialInfoWSS {
pub socket_address: SocketAddress,
pub request: String,
}

View File

@ -2,7 +2,7 @@ use super::*;
// Keep member order appropriate for sorting < preference
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum DialInfoClass {
pub(crate) enum DialInfoClass {
Direct = 0, // D = Directly reachable with public IP and no firewall, with statically configured port
Mapped = 1, // M = Directly reachable with via portmap behind any NAT or firewalled with dynamically negotiated port
FullConeNAT = 2, // F = Directly reachable device without portmap behind full-cone NAT (or manually mapped firewall port with no configuration change)

View File

@ -1,7 +1,7 @@
use super::*;
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct DialInfoFilter {
pub(crate) struct DialInfoFilter {
pub protocol_type_set: ProtocolTypeSet,
pub address_type_set: AddressTypeSet,
}

View File

@ -0,0 +1,13 @@
use super::*;
pub(crate) struct PeerInfoChangeEvent {
pub peer_info: Arc<PeerInfo>,
}
pub(crate) struct SocketAddressChangeEvent {
pub routing_domain: RoutingDomain, // the routing domain this flow is over
pub socket_address: SocketAddress, // the socket address as seen by the remote peer
pub old_socket_address: Option<SocketAddress>, // the socket address previously for this peer
pub flow: Flow, // the flow used
pub reporting_peer: NodeRef, // the peer's noderef reporting the socket address
}

View File

@ -12,7 +12,7 @@ use super::*;
///
#[derive(Copy, Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct Flow {
pub(crate) struct Flow {
remote: PeerAddress,
local: Option<SocketAddress>,
}
@ -75,7 +75,7 @@ impl MatchesDialInfoFilter for Flow {
/// The NetworkConnectionId associated with each flow may represent a low level network connection
/// and will be unique with high probability per low-level connection
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct UniqueFlow {
pub(crate) struct UniqueFlow {
pub flow: Flow,
pub connection_id: Option<NetworkConnectionId>,
}
@ -95,4 +95,4 @@ impl fmt::Display for UniqueFlow {
}
}
pub type NetworkConnectionId = AlignedU64;
pub(crate) type NetworkConnectionId = AlignedU64;

View File

@ -5,7 +5,7 @@ use super::*;
// Keep member order appropriate for sorting < preference
// Must match DialInfo order
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub enum LowLevelProtocolType {
pub(crate) enum LowLevelProtocolType {
UDP = 0,
TCP = 1,
}

View File

@ -3,6 +3,7 @@ mod address_type;
mod dial_info;
mod dial_info_class;
mod dial_info_filter;
mod events;
mod flow;
mod low_level_protocol_type;
mod network_class;
@ -15,17 +16,18 @@ mod socket_address;
use super::*;
pub use address::*;
pub use address_type::*;
pub use dial_info::*;
pub use dial_info_class::*;
pub use dial_info_filter::*;
pub use flow::*;
pub use low_level_protocol_type::*;
pub use network_class::*;
pub use peer_address::*;
pub use protocol_type::*;
pub use punishment::*;
pub use relay_kind::*;
pub use signal_info::*;
pub use socket_address::*;
pub(crate) use address::*;
pub(crate) use address_type::*;
pub(crate) use dial_info::*;
pub(crate) use dial_info_class::*;
pub(crate) use dial_info_filter::*;
pub(crate) use events::*;
pub(crate) use flow::*;
pub(crate) use low_level_protocol_type::*;
pub(crate) use network_class::*;
pub(crate) use peer_address::*;
pub(crate) use protocol_type::*;
pub(crate) use punishment::*;
pub(crate) use relay_kind::*;
pub(crate) use signal_info::*;
pub(crate) use socket_address::*;

View File

@ -1,7 +1,7 @@
use super::*;
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum NetworkClass {
pub(crate) enum NetworkClass {
InboundCapable = 0, // I = Inbound capable without relay, may require signal
OutboundOnly = 1, // O = Outbound only, inbound relay required except with reverse connect signal
WebApp = 2, // W = PWA, outbound relay is required in most cases

View File

@ -1,7 +1,7 @@
use super::*;
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize)]
pub struct PeerAddress {
pub(crate) struct PeerAddress {
protocol_type: ProtocolType,
#[serde(with = "as_human_string")]
socket_address: SocketAddress,

View File

@ -6,7 +6,7 @@ use super::*;
#[allow(clippy::derived_hash_with_manual_eq)]
#[derive(Debug, PartialOrd, Ord, Hash, EnumSetType, Serialize, Deserialize)]
#[enumset(repr = "u8")]
pub enum ProtocolType {
pub(crate) enum ProtocolType {
UDP = 0,
TCP = 1,
WS = 2,

View File

@ -1,7 +1,7 @@
use super::*;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PunishmentReason {
pub(crate) enum PunishmentReason {
// IP-level punishments
FailedToDecryptEnvelopeBody,
FailedToDecodeEnvelope,
@ -17,7 +17,7 @@ pub enum PunishmentReason {
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Punishment {
pub(crate) struct Punishment {
pub reason: PunishmentReason,
pub timestamp: Timestamp,
}

View File

@ -1,7 +1,7 @@
use super::*;
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub enum RelayKind {
pub(crate) enum RelayKind {
Inbound = 0,
Outbound = 1,
}

View File

@ -2,7 +2,7 @@ use super::*;
/// Parameter for Signal operation
#[derive(Clone, Debug)]
pub enum SignalInfo {
pub(crate) enum SignalInfo {
/// UDP Hole Punch Request
HolePunch {
/// /// Receipt to be returned after the hole punch

View File

@ -3,7 +3,7 @@ use super::*;
#[derive(
Copy, Default, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize,
)]
pub struct SocketAddress {
pub(crate) struct SocketAddress {
address: Address,
port: u16,
}

View File

@ -75,7 +75,7 @@ struct NetworkUnlockedInner {
}
#[derive(Clone)]
pub(in crate::network_manager) struct Network {
pub(super) struct Network {
config: VeilidConfig,
inner: Arc<Mutex<NetworkInner>>,
unlocked_inner: Arc<NetworkUnlockedInner>,
@ -510,7 +510,7 @@ impl Network {
//////////////////////////////////////////
#[instrument(level = "trace", target = "net", name = "Network::tick", skip_all, err)]
pub(crate) async fn tick(&self) -> EyreResult<()> {
pub async fn tick(&self) -> EyreResult<()> {
let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else {
log_net!(debug "ignoring due to not started up");
return Ok(());

View File

@ -2,7 +2,7 @@ use super::*;
use routing_table::tasks::bootstrap::BOOTSTRAP_TXT_VERSION_0;
impl RoutingTable {
pub(crate) async fn debug_info_txtrecord(&self) -> String {
pub async fn debug_info_txtrecord(&self) -> String {
let mut out = String::new();
let gdis = self.dial_info_details(RoutingDomain::PublicInternet);
@ -55,7 +55,7 @@ impl RoutingTable {
out
}
pub(crate) fn debug_info_nodeid(&self) -> String {
pub fn debug_info_nodeid(&self) -> String {
let mut out = String::new();
for nid in self.unlocked_inner.node_ids().iter() {
out += &format!("{}\n", nid);
@ -63,7 +63,7 @@ impl RoutingTable {
out
}
pub(crate) fn debug_info_nodeinfo(&self) -> String {
pub fn debug_info_nodeinfo(&self) -> String {
let mut out = String::new();
let inner = self.inner.read();
out += &format!("Node Ids: {}\n", self.unlocked_inner.node_ids());
@ -76,7 +76,7 @@ impl RoutingTable {
out
}
pub(crate) fn debug_info_dialinfo(&self) -> String {
pub fn debug_info_dialinfo(&self) -> String {
let ldis = self.dial_info_details(RoutingDomain::LocalNetwork);
let gdis = self.dial_info_details(RoutingDomain::PublicInternet);
let mut out = String::new();
@ -92,11 +92,7 @@ impl RoutingTable {
out
}
pub(crate) fn debug_info_peerinfo(
&self,
routing_domain: RoutingDomain,
published: bool,
) -> String {
pub fn debug_info_peerinfo(&self, routing_domain: RoutingDomain, published: bool) -> String {
let mut out = String::new();
if published {
let pistr = if let Some(pi) = self.get_published_peer_info(routing_domain) {
@ -189,7 +185,7 @@ impl RoutingTable {
)
}
pub(crate) fn debug_info_entries(
pub fn debug_info_entries(
&self,
min_state: BucketEntryState,
capabilities: Vec<FourCC>,
@ -272,7 +268,7 @@ impl RoutingTable {
out
}
pub(crate) fn debug_info_entries_fastest(
pub fn debug_info_entries_fastest(
&self,
min_state: BucketEntryState,
capabilities: Vec<FourCC>,
@ -354,7 +350,7 @@ impl RoutingTable {
out
}
pub(crate) fn debug_info_entry(&self, node_ref: NodeRef) -> String {
pub fn debug_info_entry(&self, node_ref: NodeRef) -> String {
let cur_ts = Timestamp::now();
let mut out = String::new();
@ -369,7 +365,7 @@ impl RoutingTable {
out
}
pub(crate) fn debug_info_buckets(&self, min_state: BucketEntryState) -> String {
pub fn debug_info_buckets(&self, min_state: BucketEntryState) -> String {
let inner = self.inner.read();
let inner = &*inner;
let cur_ts = Timestamp::now();

View File

@ -165,7 +165,7 @@ impl RoutingTable {
/// Determine if set of peers is closer to key_near than key_far is to key_near
#[instrument(level = "trace", target = "rtab", skip_all, err)]
pub(crate) fn verify_peers_closer(
pub fn verify_peers_closer(
vcrypto: CryptoSystemVersion,
key_far: TypedKey,
key_near: TypedKey,

View File

@ -12,6 +12,14 @@ mod types;
pub mod tests;
pub(crate) use bucket_entry::*;
pub(crate) use node_ref::*;
pub(crate) use privacy::*;
pub(crate) use route_spec_store::*;
pub(crate) use routing_table_inner::*;
pub(crate) use stats_accounting::*;
pub use types::*;
use super::*;
use crate::crypto::*;
@ -21,15 +29,6 @@ use crate::rpc_processor::*;
use bucket::*;
use hashlink::LruCache;
pub(crate) use bucket_entry::*;
pub(crate) use node_ref::*;
pub(crate) use privacy::*;
pub(crate) use route_spec_store::*;
pub(crate) use routing_table_inner::*;
pub(crate) use stats_accounting::*;
pub use types::*;
//////////////////////////////////////////////////////////////////////////
/// How many nodes in our routing table we require for a functional PublicInternet RoutingDomain
@ -61,14 +60,14 @@ pub struct LowLevelPortInfo {
pub low_level_protocol_ports: LowLevelProtocolPorts,
pub protocol_to_port: ProtocolToPortMapping,
}
pub(crate) type RoutingTableEntryFilter<'t> =
pub type RoutingTableEntryFilter<'t> =
Box<dyn FnMut(&RoutingTableInner, Option<Arc<BucketEntry>>) -> bool + Send + 't>;
type SerializedBuckets = Vec<Vec<u8>>;
type SerializedBucketMap = BTreeMap<CryptoKind, SerializedBuckets>;
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub(crate) struct RoutingTableHealth {
pub struct RoutingTableHealth {
/// Number of reliable (long-term responsive) entries in the routing table
pub reliable_entry_count: usize,
/// Number of unreliable (occasionally unresponsive) entries in the routing table
@ -86,12 +85,13 @@ pub(crate) struct RoutingTableHealth {
pub type BucketIndex = (CryptoKind, usize);
#[derive(Debug, Clone, Copy)]
pub(crate) struct RecentPeersEntry {
pub struct RecentPeersEntry {
pub last_connection: Flow,
}
pub(crate) struct RoutingTableUnlockedInner {
// Accessors
event_bus: EventBus,
config: VeilidConfig,
network_manager: NetworkManager,
@ -214,12 +214,14 @@ pub(crate) struct RoutingTable {
impl RoutingTable {
fn new_unlocked_inner(
event_bus: EventBus,
config: VeilidConfig,
network_manager: NetworkManager,
) -> RoutingTableUnlockedInner {
let c = config.get();
RoutingTableUnlockedInner {
event_bus,
config: config.clone(),
network_manager,
node_id: c.network.routing_table.node_id.clone(),
@ -268,8 +270,9 @@ impl RoutingTable {
}
}
pub fn new(network_manager: NetworkManager) -> Self {
let event_bus = network_manager.event_bus();
let config = network_manager.config();
let unlocked_inner = Arc::new(Self::new_unlocked_inner(config, network_manager));
let unlocked_inner = Arc::new(Self::new_unlocked_inner(event_bus, config, network_manager));
let inner = Arc::new(RwLock::new(RoutingTableInner::new(unlocked_inner.clone())));
let this = Self {
inner,

View File

@ -1,6 +1,6 @@
use super::*;
pub struct FilteredNodeRef {
pub(crate) struct FilteredNodeRef {
routing_table: RoutingTable,
entry: Arc<BucketEntry>,
filter: NodeRefFilter,

View File

@ -6,16 +6,16 @@ mod traits;
use super::*;
pub use filtered_node_ref::*;
pub use node_ref_filter::*;
pub use node_ref_lock::*;
pub use node_ref_lock_mut::*;
pub use traits::*;
pub(crate) use filtered_node_ref::*;
pub(crate) use node_ref_filter::*;
pub(crate) use node_ref_lock::*;
pub(crate) use node_ref_lock_mut::*;
pub(crate) use traits::*;
///////////////////////////////////////////////////////////////////////////
// Default NodeRef
pub struct NodeRef {
pub(crate) struct NodeRef {
routing_table: RoutingTable,
entry: Arc<BucketEntry>,
#[cfg(feature = "tracking")]

View File

@ -1,13 +1,13 @@
use super::*;
pub type LockedNodeRef<'a> = NodeRefLock<'a, NodeRef>;
pub type LockedFilteredNodeRef<'a> = NodeRefLock<'a, FilteredNodeRef>;
pub(crate) type LockedNodeRef<'a> = NodeRefLock<'a, NodeRef>;
pub(crate) type LockedFilteredNodeRef<'a> = NodeRefLock<'a, FilteredNodeRef>;
/// Locked reference to a routing table entry
/// For internal use inside the RoutingTable module where you have
/// already locked a RoutingTableInner
/// Keeps entry in the routing table until all references are gone
pub struct NodeRefLock<
pub(crate) struct NodeRefLock<
'a,
N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone,
> {

View File

@ -1,13 +1,13 @@
use super::*;
pub type LockedMutNodeRef<'a> = NodeRefLockMut<'a, NodeRef>;
pub type LockedMutFilteredNodeRef<'a> = NodeRefLockMut<'a, FilteredNodeRef>;
pub(crate) type LockedMutNodeRef<'a> = NodeRefLockMut<'a, NodeRef>;
pub(crate) type LockedMutFilteredNodeRef<'a> = NodeRefLockMut<'a, FilteredNodeRef>;
/// Mutable locked reference to a routing table entry
/// For internal use inside the RoutingTable module where you have
/// already locked a RoutingTableInner
/// Keeps entry in the routing table until all references are gone
pub struct NodeRefLockMut<
pub(crate) struct NodeRefLockMut<
'a,
N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone,
> {

View File

@ -1,7 +1,7 @@
use super::*;
// Field accessors
pub trait NodeRefAccessorsTrait {
pub(crate) trait NodeRefAccessorsTrait {
fn routing_table(&self) -> RoutingTable;
fn entry(&self) -> Arc<BucketEntry>;
fn sequencing(&self) -> Sequencing;
@ -14,7 +14,7 @@ pub trait NodeRefAccessorsTrait {
}
// Operate on entry
pub trait NodeRefOperateTrait {
pub(crate) trait NodeRefOperateTrait {
fn operate<T, F>(&self, f: F) -> T
where
F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> T;
@ -24,7 +24,7 @@ pub trait NodeRefOperateTrait {
}
// Common Operations
pub trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait {
pub(crate) trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait {
fn same_entry<T: NodeRefAccessorsTrait>(&self, other: &T) -> bool {
Arc::ptr_eq(&self.entry(), &other.entry())
}

View File

@ -15,7 +15,7 @@ use route_spec_store_cache::*;
use route_spec_store_content::*;
pub(crate) use route_spec_store_cache::CompiledRoute;
pub(crate) use route_stats::*;
pub use route_stats::*;
/// The size of the remote private route cache
const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024;

View File

@ -2,7 +2,7 @@ use super::*;
/// What remote private routes have seen
#[derive(Debug, Clone, Default)]
pub(crate) struct RemotePrivateRouteInfo {
pub struct RemotePrivateRouteInfo {
/// The private routes themselves
private_routes: Vec<PrivateRoute>,
/// Did this remote private route see our node info due to no safety route in use

View File

@ -1,7 +1,7 @@
use super::*;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct RouteSpecDetail {
pub struct RouteSpecDetail {
/// Crypto kind
pub crypto_kind: CryptoKind,
/// Secret key
@ -11,7 +11,7 @@ pub(crate) struct RouteSpecDetail {
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct RouteSetSpecDetail {
pub struct RouteSetSpecDetail {
/// Route set per crypto kind
route_set: BTreeMap<PublicKey, RouteSpecDetail>,
/// Route noderefs

View File

@ -9,7 +9,7 @@ struct CompiledRouteCacheKey {
/// Compiled route (safety route + private route)
#[derive(Clone, Debug)]
pub(crate) struct CompiledRoute {
pub struct CompiledRoute {
/// The safety route attached to the private route
pub safety_route: SafetyRoute,
/// The secret used to encrypt the message payload
@ -21,7 +21,7 @@ pub(crate) struct CompiledRoute {
/// Ephemeral data used to help the RouteSpecStore operate efficiently
#[derive(Debug)]
pub(super) struct RouteSpecStoreCache {
pub struct RouteSpecStoreCache {
/// How many times nodes have been used
used_nodes: HashMap<PublicKey, usize>,
/// How many times nodes have been used at the terminal point of a route

View File

@ -1,7 +1,7 @@
use super::*;
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub(crate) struct RouteStats {
pub struct RouteStats {
/// Consecutive failed to send count
#[serde(skip)]
pub failed_to_send: u32,

View File

@ -14,7 +14,7 @@ pub type EntryCounts = BTreeMap<(RoutingDomain, CryptoKind), usize>;
//////////////////////////////////////////////////////////////////////////
/// RoutingTable rwlock-internal data
pub(crate) struct RoutingTableInner {
pub struct RoutingTableInner {
/// Extra pointer to unlocked members to simplify access
pub(super) unlocked_inner: Arc<RoutingTableUnlockedInner>,
/// Routing table buckets that hold references to entries, per crypto kind
@ -1388,7 +1388,7 @@ impl RoutingTableInner {
}
#[instrument(level = "trace", skip_all)]
pub(crate) fn make_closest_noderef_sort(
pub fn make_closest_noderef_sort(
crypto: Crypto,
node_id: TypedKey,
) -> impl Fn(&LockedNodeRef, &LockedNodeRef) -> core::cmp::Ordering {
@ -1417,7 +1417,7 @@ pub(crate) fn make_closest_noderef_sort(
}
}
pub(crate) fn make_closest_node_id_sort(
pub fn make_closest_node_id_sort(
crypto: Crypto,
node_id: TypedKey,
) -> impl Fn(&CryptoKey, &CryptoKey) -> core::cmp::Ordering {

View File

@ -144,9 +144,13 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
pi
};
rti.unlocked_inner
.network_manager()
.report_peer_info_change(peer_info);
if let Err(e) = rti
.unlocked_inner
.event_bus
.post(PeerInfoChangeEvent { peer_info })
{
log_rtab!(debug "Failed to post event: {}", e);
}
true
}
@ -162,7 +166,7 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
let can_contain_address = self.can_contain_address(address);
if !can_contain_address {
log_rtab!(debug "[LocalNetwork] can not add dial info to this routing domain: {:?}", dial_info);
log_network_result!(debug "[LocalNetwork] can not add dial info to this routing domain: {:?}", dial_info);
return false;
}
if !dial_info.is_valid() {

View File

@ -9,7 +9,7 @@ pub use local_network::*;
pub use public_internet::*;
/// General trait for all routing domains
pub(crate) trait RoutingDomainDetail {
pub trait RoutingDomainDetail {
// Common accessors
#[expect(dead_code)]
fn routing_domain(&self) -> RoutingDomain;

View File

@ -122,9 +122,13 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
pi
};
rti.unlocked_inner
.network_manager()
.report_peer_info_change(peer_info);
if let Err(e) = rti
.unlocked_inner
.event_bus
.post(PeerInfoChangeEvent { peer_info })
{
log_rtab!(debug "Failed to post event: {}", e);
}
true
}
@ -140,7 +144,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
let can_contain_address = self.can_contain_address(address);
if !can_contain_address {
log_rtab!(debug "[PublicInternet] can not add dial info to this routing domain: {:?}", dial_info);
log_network_result!(debug "[PublicInternet] can not add dial info to this routing domain: {:?}", dial_info);
return false;
}
if !dial_info.is_valid() {

View File

@ -117,7 +117,7 @@ impl RoutingTable {
// Bootstrap lookup process
#[instrument(level = "trace", skip(self), ret, err)]
pub(crate) async fn resolve_bootstrap(
pub async fn resolve_bootstrap(
&self,
bootstrap: Vec<String>,
) -> EyreResult<Vec<BootstrapRecord>> {
@ -254,7 +254,7 @@ impl RoutingTable {
}
//#[instrument(level = "trace", skip(self), err)]
pub(crate) fn bootstrap_with_peer(
pub fn bootstrap_with_peer(
self,
crypto_kinds: Vec<CryptoKind>,
pi: Arc<PeerInfo>,
@ -324,7 +324,7 @@ impl RoutingTable {
}
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn bootstrap_with_peer_list(
pub async fn bootstrap_with_peer_list(
self,
peers: Vec<Arc<PeerInfo>>,
stop_token: StopToken,
@ -364,7 +364,7 @@ impl RoutingTable {
}
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn bootstrap_task_routine(self, stop_token: StopToken) -> EyreResult<()> {
pub async fn bootstrap_task_routine(self, stop_token: StopToken) -> EyreResult<()> {
let bootstrap = self
.unlocked_inner
.with_config(|c| c.network.routing_table.bootstrap.clone());

View File

@ -10,10 +10,7 @@ impl RoutingTable {
/// Ask our closest peers to give us more peers close to ourselves. This will
/// assist with the DHT and other algorithms that utilize the distance metric.
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn closest_peers_refresh_task_routine(
self,
stop_token: StopToken,
) -> EyreResult<()> {
pub async fn closest_peers_refresh_task_routine(self, stop_token: StopToken) -> EyreResult<()> {
let mut unord = FuturesUnordered::new();
for crypto_kind in VALID_CRYPTO_KINDS {

View File

@ -10,7 +10,7 @@ impl RoutingTable {
// Kick the queued buckets in the routing table to free dead nodes if necessary
// Attempts to keep the size of the routing table down to the bucket depth
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn kick_buckets_task_routine(
pub async fn kick_buckets_task_routine(
self,
_stop_token: StopToken,
_last_ts: Timestamp,

View File

@ -10,7 +10,7 @@ pub mod update_statistics;
use super::*;
impl RoutingTable {
pub(crate) fn setup_tasks(&self) {
pub fn setup_tasks(&self) {
// Set rolling transfers tick task
{
let this = self.clone();
@ -287,12 +287,12 @@ impl RoutingTable {
Ok(())
}
pub(crate) async fn pause_tasks(&self) -> AsyncTagLockGuard<&'static str> {
pub async fn pause_tasks(&self) -> AsyncTagLockGuard<&'static str> {
let critical_sections = self.inner.read().critical_sections.clone();
critical_sections.lock_tag(LOCK_TAG_TICK).await
}
pub(crate) async fn cancel_tasks(&self) {
pub async fn cancel_tasks(&self) {
// Cancel all tasks being ticked
log_rtab!(debug "stopping rolling transfers task");
if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await {

View File

@ -12,10 +12,7 @@ impl RoutingTable {
// nodes for their PublicInternet peers, which is a very fast way to get
// a new node online.
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn peer_minimum_refresh_task_routine(
self,
stop_token: StopToken,
) -> EyreResult<()> {
pub async fn peer_minimum_refresh_task_routine(self, stop_token: StopToken) -> EyreResult<()> {
// Get counts by crypto kind
let entry_count = self.inner.read().cached_entry_counts();

View File

@ -2,10 +2,10 @@ use super::*;
/// Keepalive pings are done occasionally to ensure holepunched public dialinfo
/// remains valid, as well as to make sure we remain in any relay node's routing table
const RELAY_KEEPALIVE_PING_INTERVAL_SECS: u32 = 10;
const RELAY_KEEPALIVE_PING_INTERVAL_SECS: u32 = 1;
/// Keepalive pings are done for active watch nodes to make sure they are still there
const ACTIVE_WATCH_KEEPALIVE_PING_INTERVAL_SECS: u32 = 10;
const ACTIVE_WATCH_KEEPALIVE_PING_INTERVAL_SECS: u32 = 1;
/// Ping queue processing depth per validator
const MAX_PARALLEL_PINGS: usize = 8;
@ -17,7 +17,7 @@ type PingValidatorFuture = SendPinBoxFuture<Result<(), RPCError>>;
impl RoutingTable {
// Task routine for PublicInternet status pings
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn ping_validator_public_internet_task_routine(
pub async fn ping_validator_public_internet_task_routine(
self,
stop_token: StopToken,
_last_ts: Timestamp,
@ -36,7 +36,7 @@ impl RoutingTable {
// Task routine for LocalNetwork status pings
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn ping_validator_local_network_task_routine(
pub async fn ping_validator_local_network_task_routine(
self,
stop_token: StopToken,
_last_ts: Timestamp,
@ -55,7 +55,7 @@ impl RoutingTable {
// Task routine for PublicInternet relay keepalive pings
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn ping_validator_public_internet_relay_task_routine(
pub async fn ping_validator_public_internet_relay_task_routine(
self,
stop_token: StopToken,
_last_ts: Timestamp,
@ -74,7 +74,7 @@ impl RoutingTable {
// Task routine for active watch keepalive pings
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn ping_validator_active_watch_task_routine(
pub async fn ping_validator_active_watch_task_routine(
self,
stop_token: StopToken,
_last_ts: Timestamp,

View File

@ -168,7 +168,7 @@ impl RoutingTable {
/// Keep private routes assigned and accessible
#[instrument(level = "trace", skip(self, stop_token), err)]
pub(crate) async fn private_route_management_task_routine(
pub async fn private_route_management_task_routine(
self,
stop_token: StopToken,
_last_ts: Timestamp,

View File

@ -53,7 +53,7 @@ impl RoutingTable {
// Keep relays assigned and accessible
#[instrument(level = "trace", skip_all, err)]
pub(crate) async fn relay_management_task_routine(
pub async fn relay_management_task_routine(
self,
_stop_token: StopToken,
_last_ts: Timestamp,

View File

@ -3,7 +3,7 @@ use super::*;
impl RoutingTable {
// Compute transfer statistics to determine how 'fast' a node is
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn rolling_transfers_task_routine(
pub async fn rolling_transfers_task_routine(
self,
_stop_token: StopToken,
last_ts: Timestamp,
@ -35,7 +35,7 @@ impl RoutingTable {
// Update state statistics in PeerStats
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn update_state_stats_task_routine(
pub async fn update_state_stats_task_routine(
self,
_stop_token: StopToken,
_last_ts: Timestamp,
@ -56,7 +56,7 @@ impl RoutingTable {
// Update rolling answers in PeerStats
#[instrument(level = "trace", skip(self), err)]
pub(crate) async fn rolling_answers_task_routine(
pub async fn rolling_answers_task_routine(
self,
_stop_token: StopToken,
_last_ts: Timestamp,

View File

@ -3,13 +3,23 @@ use super::*;
pub mod test_serialize_routing_table;
pub(crate) fn mock_routing_table() -> routing_table::RoutingTable {
let event_bus = EventBus::new();
let veilid_config = VeilidConfig::new();
#[cfg(feature = "unstable-blockstore")]
let block_store = BlockStore::new(veilid_config.clone());
let protected_store = ProtectedStore::new(veilid_config.clone());
let table_store = TableStore::new(veilid_config.clone(), protected_store.clone());
let crypto = Crypto::new(veilid_config.clone(), table_store.clone());
let block_store = BlockStore::new(event_bus.clone(), veilid_config.clone());
let protected_store = ProtectedStore::new(event_bus.clone(), veilid_config.clone());
let table_store = TableStore::new(
event_bus.clone(),
veilid_config.clone(),
protected_store.clone(),
);
let crypto = Crypto::new(
event_bus.clone(),
veilid_config.clone(),
table_store.clone(),
);
let storage_manager = storage_manager::StorageManager::new(
event_bus.clone(),
veilid_config.clone(),
crypto.clone(),
table_store.clone(),
@ -17,6 +27,7 @@ pub(crate) fn mock_routing_table() -> routing_table::RoutingTable {
block_store.clone(),
);
let network_manager = network_manager::NetworkManager::new(
event_bus.clone(),
veilid_config.clone(),
storage_manager,
table_store.clone(),

View File

@ -2,7 +2,7 @@ use super::*;
/// Mechanism required to contact another node
#[derive(Clone, Debug)]
pub(crate) enum ContactMethod {
pub enum ContactMethod {
/// Node is not reachable by any means
Unreachable,
/// Connection should have already existed

View File

@ -11,7 +11,7 @@ mod signed_relayed_node_info;
use super::*;
pub(crate) use contact_method::*;
pub use contact_method::*;
pub use dial_info_detail::*;
pub use direction::*;
pub use node_info::*;

View File

@ -122,10 +122,10 @@ impl SignedNodeInfo {
match self {
SignedNodeInfo::Direct(d) => match other {
SignedNodeInfo::Direct(pd) => d.equivalent(pd),
SignedNodeInfo::Relayed(_) => true,
SignedNodeInfo::Relayed(_) => false,
},
SignedNodeInfo::Relayed(r) => match other {
SignedNodeInfo::Direct(_) => true,
SignedNodeInfo::Direct(_) => false,
SignedNodeInfo::Relayed(pr) => r.equivalent(pr),
},
}

View File

@ -0,0 +1,24 @@
use super::*;
#[derive(Clone, Debug, Default)]
pub struct Answer<T> {
/// Hpw long it took to get this answer
pub _latency: TimestampDuration,
/// The private route requested to receive the reply
pub reply_private_route: Option<PublicKey>,
/// The answer itself
pub answer: T,
}
impl<T> Answer<T> {
pub fn new(
latency: TimestampDuration,
reply_private_route: Option<PublicKey>,
answer: T,
) -> Self {
Self {
_latency: latency,
reply_private_route,
answer,
}
}
}

View File

@ -1,7 +1,7 @@
use super::*;
use core::convert::TryInto;
pub(crate) fn encode_address(
pub fn encode_address(
address: &Address,
builder: &mut veilid_capnp::address::Builder,
) -> Result<(), RPCError> {
@ -37,7 +37,7 @@ pub(crate) fn encode_address(
Ok(())
}
pub(crate) fn decode_address(reader: &veilid_capnp::address::Reader) -> Result<Address, RPCError> {
pub fn decode_address(reader: &veilid_capnp::address::Reader) -> Result<Address, RPCError> {
match reader.reborrow().which() {
Ok(veilid_capnp::address::Which::Ipv4(Ok(v4))) => {
let v4b = v4.get_addr().to_be_bytes();

View File

@ -1,6 +1,6 @@
use super::*;
pub(crate) fn encode_address_type_set(
pub fn encode_address_type_set(
address_type_set: &AddressTypeSet,
builder: &mut veilid_capnp::address_type_set::Builder,
) -> Result<(), RPCError> {
@ -10,7 +10,7 @@ pub(crate) fn encode_address_type_set(
Ok(())
}
pub(crate) fn decode_address_type_set(
pub fn decode_address_type_set(
reader: &veilid_capnp::address_type_set::Reader,
) -> Result<AddressTypeSet, RPCError> {
let mut out = AddressTypeSet::new();

View File

@ -1,9 +1,7 @@
use super::*;
use core::convert::TryInto;
pub(crate) fn decode_dial_info(
reader: &veilid_capnp::dial_info::Reader,
) -> Result<DialInfo, RPCError> {
pub fn decode_dial_info(reader: &veilid_capnp::dial_info::Reader) -> Result<DialInfo, RPCError> {
match reader
.reborrow()
.which()
@ -62,7 +60,7 @@ pub(crate) fn decode_dial_info(
}
}
pub(crate) fn encode_dial_info(
pub fn encode_dial_info(
dial_info: &DialInfo,
builder: &mut veilid_capnp::dial_info::Builder,
) -> Result<(), RPCError> {

View File

@ -1,8 +1,6 @@
use super::*;
pub(crate) fn encode_dial_info_class(
dial_info_class: DialInfoClass,
) -> veilid_capnp::DialInfoClass {
pub fn encode_dial_info_class(dial_info_class: DialInfoClass) -> veilid_capnp::DialInfoClass {
match dial_info_class {
DialInfoClass::Direct => veilid_capnp::DialInfoClass::Direct,
DialInfoClass::Mapped => veilid_capnp::DialInfoClass::Mapped,
@ -13,9 +11,7 @@ pub(crate) fn encode_dial_info_class(
}
}
pub(crate) fn decode_dial_info_class(
dial_info_class: veilid_capnp::DialInfoClass,
) -> DialInfoClass {
pub fn decode_dial_info_class(dial_info_class: veilid_capnp::DialInfoClass) -> DialInfoClass {
match dial_info_class {
veilid_capnp::DialInfoClass::Direct => DialInfoClass::Direct,
veilid_capnp::DialInfoClass::Mapped => DialInfoClass::Mapped,

View File

@ -1,6 +1,6 @@
use super::*;
pub(crate) fn encode_dial_info_detail(
pub fn encode_dial_info_detail(
dial_info_detail: &DialInfoDetail,
builder: &mut veilid_capnp::dial_info_detail::Builder,
) -> Result<(), RPCError> {
@ -11,7 +11,7 @@ pub(crate) fn encode_dial_info_detail(
Ok(())
}
pub(crate) fn decode_dial_info_detail(
pub fn decode_dial_info_detail(
reader: &veilid_capnp::dial_info_detail::Reader,
) -> Result<DialInfoDetail, RPCError> {
let dial_info = decode_dial_info(

View File

@ -1,7 +1,7 @@
use super::*;
use core::convert::TryInto;
pub(crate) fn decode_key256(public_key: &veilid_capnp::key256::Reader) -> PublicKey {
pub fn decode_key256(public_key: &veilid_capnp::key256::Reader) -> PublicKey {
let u0 = public_key.get_u0().to_be_bytes();
let u1 = public_key.get_u1().to_be_bytes();
let u2 = public_key.get_u2().to_be_bytes();
@ -16,7 +16,7 @@ pub(crate) fn decode_key256(public_key: &veilid_capnp::key256::Reader) -> Public
PublicKey::new(x)
}
pub(crate) fn encode_key256(key: &PublicKey, builder: &mut veilid_capnp::key256::Builder) {
pub fn encode_key256(key: &PublicKey, builder: &mut veilid_capnp::key256::Builder) {
builder.set_u0(u64::from_be_bytes(
key.bytes[0..8]
.try_into()

View File

@ -27,23 +27,21 @@ mod tunnel;
mod typed_key;
mod typed_signature;
pub(crate) use operations::MAX_INSPECT_VALUE_A_SEQS_LEN;
pub(in crate::rpc_processor) use operations::*;
pub(crate) use address::*;
pub(crate) use address_type_set::*;
pub(crate) use dial_info::*;
pub(crate) use dial_info_class::*;
pub(crate) use dial_info_detail::*;
pub(crate) use key256::*;
pub(crate) use network_class::*;
pub(crate) use node_info::*;
pub(crate) use node_status::*;
pub(crate) use nonce::*;
pub(crate) use peer_info::*;
pub(crate) use private_safety_route::*;
pub(crate) use protocol_type_set::*;
pub(crate) use sender_info::*;
pub use address::*;
pub use address_type_set::*;
pub use dial_info::*;
pub use dial_info_class::*;
pub use dial_info_detail::*;
pub use key256::*;
pub use network_class::*;
pub use node_info::*;
pub use node_status::*;
pub use nonce::*;
pub use operations::*;
pub use peer_info::*;
pub use private_safety_route::*;
pub use protocol_type_set::*;
pub use sender_info::*;
pub use sequencing::*;
pub use signal_info::*;
pub use signature512::*;
@ -62,20 +60,30 @@ use super::*;
#[derive(Debug, Clone)]
#[allow(clippy::enum_variant_names)]
pub(in crate::rpc_processor) enum QuestionContext {
pub enum QuestionContext {
GetValue(ValidateGetValueContext),
SetValue(ValidateSetValueContext),
InspectValue(ValidateInspectValueContext),
}
#[derive(Clone)]
pub(in crate::rpc_processor) struct RPCValidateContext {
pub struct RPCValidateContext {
pub crypto: Crypto,
// pub rpc_processor: RPCProcessor,
pub question_context: Option<QuestionContext>,
}
#[derive(Clone)]
pub(crate) struct RPCDecodeContext {
pub struct RPCDecodeContext {
pub routing_domain: RoutingDomain,
}
#[instrument(level = "trace", target = "rpc", skip_all, err)]
pub fn builder_to_vec<'a, T>(builder: capnp::message::Builder<T>) -> Result<Vec<u8>, RPCError>
where
T: capnp::message::Allocator + 'a,
{
let mut buffer = vec![];
capnp::serialize_packed::write_message(&mut buffer, &builder).map_err(RPCError::protocol)?;
Ok(buffer)
}

View File

@ -30,6 +30,8 @@ mod operation_complete_tunnel;
#[cfg(feature = "unstable-tunnels")]
mod operation_start_tunnel;
pub use operation_inspect_value::MAX_INSPECT_VALUE_A_SEQS_LEN;
pub(in crate::rpc_processor) use answer::*;
pub(in crate::rpc_processor) use operation::*;
pub(in crate::rpc_processor) use operation_app_call::*;
@ -62,5 +64,3 @@ pub(in crate::rpc_processor) use operation_complete_tunnel::*;
pub(in crate::rpc_processor) use operation_start_tunnel::*;
use super::*;
pub(crate) use operation_inspect_value::MAX_INSPECT_VALUE_A_SEQS_LEN;

View File

@ -43,7 +43,7 @@ impl RPCOperationCancelTunnelQ {
#[cfg(feature = "unstable-tunnels")]
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) enum RPCOperationCancelTunnelA {
pub enum RPCOperationCancelTunnelA {
Tunnel(TunnelId),
Error(TunnelError),
}

View File

@ -80,7 +80,7 @@ impl RPCOperationCompleteTunnelQ {
#[cfg(feature = "unstable-tunnels")]
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) enum RPCOperationCompleteTunnelA {
pub enum RPCOperationCompleteTunnelA {
Tunnel(FullTunnel),
Error(TunnelError),
}

View File

@ -2,7 +2,7 @@ use super::*;
use crate::storage_manager::SignedValueDescriptor;
const MAX_INSPECT_VALUE_Q_SUBKEY_RANGES_LEN: usize = 512;
pub(crate) const MAX_INSPECT_VALUE_A_SEQS_LEN: usize = 512;
pub const MAX_INSPECT_VALUE_A_SEQS_LEN: usize = 512;
const MAX_INSPECT_VALUE_A_PEERS_LEN: usize = 20;
#[derive(Clone)]

View File

@ -2,7 +2,7 @@ use super::*;
////////////////////////////////////////////////////////////////////////////////////////////////////
pub(crate) fn encode_route_hop_data(
pub fn encode_route_hop_data(
route_hop_data: &RouteHopData,
builder: &mut veilid_capnp::route_hop_data::Builder,
) -> Result<(), RPCError> {
@ -24,7 +24,7 @@ pub(crate) fn encode_route_hop_data(
Ok(())
}
pub(crate) fn decode_route_hop_data(
pub fn decode_route_hop_data(
reader: &veilid_capnp::route_hop_data::Reader,
) -> Result<RouteHopData, RPCError> {
let nonce = decode_nonce(
@ -45,7 +45,7 @@ pub(crate) fn decode_route_hop_data(
////////////////////////////////////////////////////////////////////////////////////////////////////
pub(crate) fn encode_route_hop(
pub fn encode_route_hop(
route_hop: &RouteHop,
builder: &mut veilid_capnp::route_hop::Builder,
) -> Result<(), RPCError> {
@ -67,7 +67,7 @@ pub(crate) fn encode_route_hop(
Ok(())
}
pub(crate) fn decode_route_hop(
pub fn decode_route_hop(
decode_context: &RPCDecodeContext,
reader: &veilid_capnp::route_hop::Reader,
) -> Result<RouteHop, RPCError> {
@ -100,7 +100,7 @@ pub(crate) fn decode_route_hop(
////////////////////////////////////////////////////////////////////////////////////////////////////
pub(crate) fn encode_private_route(
pub fn encode_private_route(
private_route: &PrivateRoute,
builder: &mut veilid_capnp::private_route::Builder,
) -> Result<(), RPCError> {
@ -126,7 +126,7 @@ pub(crate) fn encode_private_route(
Ok(())
}
pub(crate) fn decode_private_route(
pub fn decode_private_route(
decode_context: &RPCDecodeContext,
reader: &veilid_capnp::private_route::Reader,
) -> Result<PrivateRoute, RPCError> {
@ -156,7 +156,7 @@ pub(crate) fn decode_private_route(
////////////////////////////////////////////////////////////////////////////////////////////////////
pub(crate) fn encode_safety_route(
pub fn encode_safety_route(
safety_route: &SafetyRoute,
builder: &mut veilid_capnp::safety_route::Builder,
) -> Result<(), RPCError> {
@ -180,7 +180,7 @@ pub(crate) fn encode_safety_route(
Ok(())
}
pub(crate) fn decode_safety_route(
pub fn decode_safety_route(
decode_context: &RPCDecodeContext,
reader: &veilid_capnp::safety_route::Reader,
) -> Result<SafetyRoute, RPCError> {

View File

@ -30,7 +30,7 @@ pub(crate) enum Destination {
/// Routing configuration for destination
#[derive(Debug, Clone)]
pub struct UnsafeRoutingInfo {
pub(crate) struct UnsafeRoutingInfo {
pub opt_node: Option<NodeRef>,
pub opt_relay: Option<NodeRef>,
pub opt_routing_domain: Option<RoutingDomain>,
@ -450,7 +450,7 @@ impl RPCProcessor {
/// Convert the 'RespondTo' into a 'Destination' for a response
pub(super) fn get_respond_to_destination(
&self,
request: &RPCMessage,
request: &Message,
) -> NetworkResult<Destination> {
// Get the question 'respond to'
let respond_to = match request.operation.kind() {
@ -487,7 +487,7 @@ impl RPCProcessor {
NetworkResult::value(Destination::direct(peer_noderef))
} else {
// Look up the sender node, we should have added it via senderNodeInfo before getting here.
let res = match self.routing_table.lookup_node_ref(sender_node_id) {
let res = match self.routing_table().lookup_node_ref(sender_node_id) {
Ok(v) => v,
Err(e) => {
return NetworkResult::invalid_message(format!(

View File

@ -2,7 +2,7 @@ use super::*;
#[derive(ThisError, Debug, Clone, PartialOrd, PartialEq, Eq, Ord)]
#[must_use]
pub enum RPCError {
pub(crate) enum RPCError {
#[error("[RPCError: Unimplemented({0})]")]
Unimplemented(String),
#[error("[RPCError: InvalidFormat({0})]")]
@ -20,6 +20,7 @@ pub enum RPCError {
}
impl RPCError {
#[expect(dead_code)]
pub fn unimplemented<X: ToString>(x: X) -> Self {
Self::Unimplemented(x.to_string())
}
@ -47,9 +48,11 @@ impl RPCError {
pub fn network<X: ToString>(x: X) -> Self {
Self::Network(x.to_string())
}
#[expect(dead_code)]
pub fn map_network<M: ToString, X: ToString>(message: M) -> impl FnOnce(X) -> Self {
move |x| Self::Network(format!("{}: {}", message.to_string(), x.to_string()))
}
#[cfg_attr(target_arch = "wasm32", expect(dead_code))]
pub fn try_again<X: ToString>(x: X) -> Self {
Self::TryAgain(x.to_string())
}
@ -59,6 +62,7 @@ impl RPCError {
pub fn ignore<X: ToString>(x: X) -> Self {
Self::Ignore(x.to_string())
}
#[expect(dead_code)]
pub fn map_ignore<M: ToString, X: ToString>(message: M) -> impl FnOnce(X) -> Self {
move |x| Self::Ignore(format!("{}: {}", message.to_string(), x.to_string()))
}
@ -81,7 +85,7 @@ impl From<RPCError> for VeilidAPIError {
}
}
pub(crate) type RPCNetworkResult<T> = Result<NetworkResult<T>, RPCError>;
pub type RPCNetworkResult<T> = Result<NetworkResult<T>, RPCError>;
pub(crate) trait ToRPCNetworkResult<T> {
fn to_rpc_network_result(self) -> RPCNetworkResult<T>;

View File

@ -9,7 +9,7 @@ where
}
#[derive(Debug, Copy, Clone)]
pub(crate) enum FanoutResultKind {
pub enum FanoutResultKind {
Partial,
Timeout,
Finished,
@ -22,12 +22,12 @@ impl FanoutResultKind {
}
#[derive(Debug, Clone)]
pub(crate) struct FanoutResult {
pub struct FanoutResult {
pub kind: FanoutResultKind,
pub value_nodes: Vec<NodeRef>,
}
pub(crate) fn debug_fanout_result(result: &FanoutResult) -> String {
pub fn debug_fanout_result(result: &FanoutResult) -> String {
let kc = match result.kind {
FanoutResultKind::Partial => "P",
FanoutResultKind::Timeout => "T",
@ -37,7 +37,7 @@ pub(crate) fn debug_fanout_result(result: &FanoutResult) -> String {
format!("{}:{}", kc, result.value_nodes.len())
}
pub(crate) fn debug_fanout_results(results: &[FanoutResult]) -> String {
pub fn debug_fanout_results(results: &[FanoutResult]) -> String {
let mut col = 0;
let mut out = String::new();
let mut left = results.len();
@ -59,18 +59,18 @@ pub(crate) fn debug_fanout_results(results: &[FanoutResult]) -> String {
}
#[derive(Debug)]
pub(crate) struct FanoutCallOutput {
pub struct FanoutCallOutput {
pub peer_info_list: Vec<Arc<PeerInfo>>,
}
pub(crate) type FanoutCallResult = RPCNetworkResult<FanoutCallOutput>;
pub(crate) type FanoutNodeInfoFilter = Arc<dyn Fn(&[TypedKey], &NodeInfo) -> bool + Send + Sync>;
pub type FanoutCallResult = RPCNetworkResult<FanoutCallOutput>;
pub type FanoutNodeInfoFilter = Arc<dyn Fn(&[TypedKey], &NodeInfo) -> bool + Send + Sync>;
pub(crate) fn empty_fanout_node_info_filter() -> FanoutNodeInfoFilter {
pub fn empty_fanout_node_info_filter() -> FanoutNodeInfoFilter {
Arc::new(|_, _| true)
}
pub(crate) fn capability_fanout_node_info_filter(caps: Vec<Capability>) -> FanoutNodeInfoFilter {
pub fn capability_fanout_node_info_filter(caps: Vec<Capability>) -> FanoutNodeInfoFilter {
Arc::new(move |_, ni| ni.has_all_capabilities(&caps))
}

View File

@ -5,7 +5,7 @@
use super::*;
#[derive(Debug)]
pub(in crate::rpc_processor) struct FanoutQueue {
pub struct FanoutQueue {
crypto_kind: CryptoKind,
current_nodes: VecDeque<NodeRef>,
returned_nodes: HashSet<TypedKey>,

View File

@ -0,0 +1,8 @@
mod fanout_call;
mod fanout_queue;
pub(crate) use fanout_call::*;
use super::*;
use fanout_queue::*;

View File

@ -0,0 +1,35 @@
use super::*;
#[derive(Debug)]
pub(in crate::rpc_processor) struct MessageData {
pub contents: Vec<u8>, // rpc messages must be a canonicalized single segment
}
impl MessageData {
pub fn new(contents: Vec<u8>) -> Self {
Self { contents }
}
pub fn get_reader(
&self,
) -> Result<capnp::message::Reader<capnp::serialize::OwnedSegments>, RPCError> {
capnp::serialize_packed::read_message(
self.contents.as_slice(),
capnp::message::ReaderOptions::new(),
)
.map_err(RPCError::protocol)
}
}
#[derive(Debug)]
pub(in crate::rpc_processor) struct MessageEncoded {
pub header: MessageHeader,
pub data: MessageData,
}
#[derive(Debug)]
pub(in crate::rpc_processor) struct Message {
pub header: MessageHeader,
pub operation: RPCOperation,
pub opt_sender_nr: Option<NodeRef>,
}

View File

@ -0,0 +1,89 @@
use super::*;
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct RPCMessageHeaderDetailDirect {
/// The decoded header of the envelope
pub envelope: Envelope,
/// The noderef of the peer that sent the message (not the original sender).
/// Ensures node doesn't get evicted from routing table until we're done with it
/// Should be filtered to the routing domain of the peer that we received from
pub peer_noderef: FilteredNodeRef,
/// The flow from the peer sent the message (not the original sender)
pub flow: Flow,
/// The routing domain of the peer that we received from
pub routing_domain: RoutingDomain,
}
/// Header details for rpc messages received over only a safety route but not a private route
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct RPCMessageHeaderDetailSafetyRouted {
/// Direct header
pub direct: RPCMessageHeaderDetailDirect,
/// Remote safety route used
pub remote_safety_route: PublicKey,
/// The sequencing used for this route
pub sequencing: Sequencing,
}
/// Header details for rpc messages received over a private route
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct RPCMessageHeaderDetailPrivateRouted {
/// Direct header
pub direct: RPCMessageHeaderDetailDirect,
/// Remote safety route used (or possibly node id the case of no safety route)
pub remote_safety_route: PublicKey,
/// The private route we received the rpc over
pub private_route: PublicKey,
// The safety spec for replying to this private routed rpc
pub safety_spec: SafetySpec,
}
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) enum RPCMessageHeaderDetail {
Direct(RPCMessageHeaderDetailDirect),
SafetyRouted(RPCMessageHeaderDetailSafetyRouted),
PrivateRouted(RPCMessageHeaderDetailPrivateRouted),
}
/// The decoded header of an RPC message
#[derive(Debug, Clone)]
pub(in crate::rpc_processor) struct MessageHeader {
/// Time the message was received, not sent
pub timestamp: Timestamp,
/// The length in bytes of the rpc message body
pub body_len: ByteCount,
/// The header detail depending on which way the message was received
pub detail: RPCMessageHeaderDetail,
}
impl MessageHeader {
/// The crypto kind used on the RPC
pub fn crypto_kind(&self) -> CryptoKind {
match &self.detail {
RPCMessageHeaderDetail::Direct(d) => d.envelope.get_crypto_kind(),
RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.envelope.get_crypto_kind(),
RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.envelope.get_crypto_kind(),
}
}
// pub fn direct_peer_noderef(&self) -> NodeRef {
// match &self.detail {
// RPCMessageHeaderDetail::Direct(d) => d.peer_noderef.clone(),
// RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.peer_noderef.clone(),
// RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.peer_noderef.clone(),
// }
// }
pub fn routing_domain(&self) -> RoutingDomain {
match &self.detail {
RPCMessageHeaderDetail::Direct(d) => d.routing_domain,
RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.routing_domain,
RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.routing_domain,
}
}
pub fn direct_sender_node_id(&self) -> TypedKey {
match &self.detail {
RPCMessageHeaderDetail::Direct(d) => d.envelope.get_sender_typed_id(),
RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.envelope.get_sender_typed_id(),
RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.envelope.get_sender_typed_id(),
}
}
}

View File

@ -1,11 +1,14 @@
mod answer;
mod coders;
mod destination;
mod fanout_call;
mod fanout_queue;
mod error;
mod fanout;
mod message;
mod message_header;
mod operation_waiter;
mod rendered_operation;
mod rpc_app_call;
mod rpc_app_message;
mod rpc_error;
mod rpc_find_node;
mod rpc_get_value;
mod rpc_inspect_value;
@ -17,6 +20,8 @@ mod rpc_status;
mod rpc_validate_dial_info;
mod rpc_value_changed;
mod rpc_watch_value;
mod sender_info;
mod sender_peer_info;
#[cfg(feature = "unstable-blockstore")]
mod rpc_find_block;
@ -30,160 +35,38 @@ mod rpc_complete_tunnel;
#[cfg(feature = "unstable-tunnels")]
mod rpc_start_tunnel;
pub(crate) use coders::*;
pub(crate) use answer::*;
pub(crate) use coders::{
builder_to_vec, decode_private_route, encode_node_info, encode_private_route, encode_route_hop,
encode_signed_direct_node_info, encode_typed_key, RPCDecodeContext,
MAX_INSPECT_VALUE_A_SEQS_LEN,
};
pub(crate) use destination::*;
pub(crate) use fanout_call::*;
pub(crate) use operation_waiter::*;
pub(crate) use rpc_error::*;
pub(crate) use rpc_status::*;
pub(crate) use error::*;
pub(crate) use fanout::*;
pub(crate) use sender_info::*;
use super::*;
use futures_util::StreamExt;
use stop_token::future::FutureExt as _;
use coders::*;
use message::*;
use message_header::*;
use operation_waiter::*;
use rendered_operation::*;
use sender_peer_info::*;
use crypto::*;
use fanout_queue::*;
use futures_util::StreamExt;
use network_manager::*;
use routing_table::*;
use stop_token::future::FutureExt;
use storage_manager::*;
/////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone)]
struct RPCMessageHeaderDetailDirect {
/// The decoded header of the envelope
envelope: Envelope,
/// The noderef of the peer that sent the message (not the original sender).
/// Ensures node doesn't get evicted from routing table until we're done with it
/// Should be filtered to the routing domain of the peer that we received from
peer_noderef: FilteredNodeRef,
/// The flow from the peer sent the message (not the original sender)
flow: Flow,
/// The routing domain of the peer that we received from
routing_domain: RoutingDomain,
}
/// Header details for rpc messages received over only a safety route but not a private route
#[derive(Debug, Clone)]
struct RPCMessageHeaderDetailSafetyRouted {
/// Direct header
direct: RPCMessageHeaderDetailDirect,
/// Remote safety route used
remote_safety_route: PublicKey,
/// The sequencing used for this route
sequencing: Sequencing,
}
/// Header details for rpc messages received over a private route
#[derive(Debug, Clone)]
struct RPCMessageHeaderDetailPrivateRouted {
/// Direct header
direct: RPCMessageHeaderDetailDirect,
/// Remote safety route used (or possibly node id the case of no safety route)
remote_safety_route: PublicKey,
/// The private route we received the rpc over
private_route: PublicKey,
// The safety spec for replying to this private routed rpc
safety_spec: SafetySpec,
}
#[derive(Debug, Clone)]
enum RPCMessageHeaderDetail {
Direct(RPCMessageHeaderDetailDirect),
SafetyRouted(RPCMessageHeaderDetailSafetyRouted),
PrivateRouted(RPCMessageHeaderDetailPrivateRouted),
}
/// The decoded header of an RPC message
#[derive(Debug, Clone)]
struct RPCMessageHeader {
/// Time the message was received, not sent
timestamp: Timestamp,
/// The length in bytes of the rpc message body
body_len: ByteCount,
/// The header detail depending on which way the message was received
detail: RPCMessageHeaderDetail,
}
impl RPCMessageHeader {
/// The crypto kind used on the RPC
pub fn crypto_kind(&self) -> CryptoKind {
match &self.detail {
RPCMessageHeaderDetail::Direct(d) => d.envelope.get_crypto_kind(),
RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.envelope.get_crypto_kind(),
RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.envelope.get_crypto_kind(),
}
}
// pub fn direct_peer_noderef(&self) -> NodeRef {
// match &self.detail {
// RPCMessageHeaderDetail::Direct(d) => d.peer_noderef.clone(),
// RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.peer_noderef.clone(),
// RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.peer_noderef.clone(),
// }
// }
pub fn routing_domain(&self) -> RoutingDomain {
match &self.detail {
RPCMessageHeaderDetail::Direct(d) => d.routing_domain,
RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.routing_domain,
RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.routing_domain,
}
}
pub fn direct_sender_node_id(&self) -> TypedKey {
match &self.detail {
RPCMessageHeaderDetail::Direct(d) => d.envelope.get_sender_typed_id(),
RPCMessageHeaderDetail::SafetyRouted(s) => s.direct.envelope.get_sender_typed_id(),
RPCMessageHeaderDetail::PrivateRouted(p) => p.direct.envelope.get_sender_typed_id(),
}
}
}
#[derive(Debug)]
pub struct RPCMessageData {
contents: Vec<u8>, // rpc messages must be a canonicalized single segment
}
impl RPCMessageData {
pub fn new(contents: Vec<u8>) -> Self {
Self { contents }
}
pub fn get_reader(
&self,
) -> Result<capnp::message::Reader<capnp::serialize::OwnedSegments>, RPCError> {
capnp::serialize_packed::read_message(
self.contents.as_slice(),
capnp::message::ReaderOptions::new(),
)
.map_err(RPCError::protocol)
}
}
#[derive(Debug)]
struct RPCMessageEncoded {
header: RPCMessageHeader,
data: RPCMessageData,
}
#[derive(Debug)]
pub(crate) struct RPCMessage {
header: RPCMessageHeader,
operation: RPCOperation,
opt_sender_nr: Option<NodeRef>,
}
#[instrument(level = "trace", target = "rpc", skip_all, err)]
pub fn builder_to_vec<'a, T>(builder: capnp::message::Builder<T>) -> Result<Vec<u8>, RPCError>
where
T: capnp::message::Allocator + 'a,
{
let mut buffer = vec![];
capnp::serialize_packed::write_message(&mut buffer, &builder).map_err(RPCError::protocol)?;
Ok(buffer)
}
#[derive(Debug)]
struct WaitableReply {
handle: OperationWaitHandle<RPCMessage, Option<QuestionContext>>,
handle: OperationWaitHandle<Message, Option<QuestionContext>>,
timeout_us: TimestampDuration,
node_ref: NodeRef,
send_ts: Timestamp,
@ -196,84 +79,6 @@ struct WaitableReply {
/////////////////////////////////////////////////////////////////////
#[derive(Clone, Debug, Default)]
pub struct Answer<T> {
/// Hpw long it took to get this answer
pub _latency: TimestampDuration,
/// The private route requested to receive the reply
pub reply_private_route: Option<PublicKey>,
/// The answer itself
pub answer: T,
}
impl<T> Answer<T> {
pub fn new(
latency: TimestampDuration,
reply_private_route: Option<PublicKey>,
answer: T,
) -> Self {
Self {
_latency: latency,
reply_private_route,
answer,
}
}
}
/// An operation that has been fully prepared for envelope
struct RenderedOperation {
/// The rendered operation bytes
message: Vec<u8>,
/// Destination node we're sending to
destination_node_ref: NodeRef,
/// Node to send envelope to (may not be destination node in case of relay)
node_ref: FilteredNodeRef,
/// Total safety + private route hop count + 1 hop for the initial send
hop_count: usize,
/// The safety route used to send the message
safety_route: Option<PublicKey>,
/// The private route used to send the message
remote_private_route: Option<PublicKey>,
/// The private route requested to receive the reply
reply_private_route: Option<PublicKey>,
}
impl fmt::Debug for RenderedOperation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RenderedOperation")
.field("message(len)", &self.message.len())
.field("destination_node_ref", &self.destination_node_ref)
.field("node_ref", &self.node_ref)
.field("hop_count", &self.hop_count)
.field("safety_route", &self.safety_route)
.field("remote_private_route", &self.remote_private_route)
.field("reply_private_route", &self.reply_private_route)
.finish()
}
}
/// Node information exchanged during every RPC message
#[derive(Default, Debug, Clone)]
pub struct SenderPeerInfo {
/// The current peer info of the sender if required
opt_peer_info: Option<Arc<PeerInfo>>,
/// The last timestamp of the target's node info to assist remote node with sending its latest node info
target_node_info_ts: Timestamp,
}
impl SenderPeerInfo {
pub fn new_no_peer_info(target_node_info_ts: Timestamp) -> Self {
Self {
opt_peer_info: None,
target_node_info_ts,
}
}
pub fn new(peer_info: Arc<PeerInfo>, target_node_info_ts: Timestamp) -> Self {
Self {
opt_peer_info: Some(peer_info),
target_node_info_ts,
}
}
}
#[derive(Copy, Clone, Debug)]
enum RPCKind {
Question,
@ -284,31 +89,25 @@ enum RPCKind {
/////////////////////////////////////////////////////////////////////
struct RPCProcessorInner {
send_channel: Option<flume::Sender<(Span, RPCMessageEncoded)>>,
send_channel: Option<flume::Sender<(Span, MessageEncoded)>>,
stop_source: Option<StopSource>,
worker_join_handles: Vec<MustJoinHandle<()>>,
}
struct RPCProcessorUnlockedInner {
network_manager: NetworkManager,
timeout_us: TimestampDuration,
queue_size: u32,
concurrency: u32,
max_route_hop_count: usize,
#[cfg_attr(target_arch = "wasm32", expect(dead_code))]
validate_dial_info_receipt_time_ms: u32,
update_callback: UpdateCallback,
waiting_rpc_table: OperationWaiter<RPCMessage, Option<QuestionContext>>,
waiting_rpc_table: OperationWaiter<Message, Option<QuestionContext>>,
waiting_app_call_table: OperationWaiter<Vec<u8>, ()>,
startup_lock: StartupLock,
}
#[derive(Clone)]
pub(crate) struct RPCProcessor {
crypto: Crypto,
config: VeilidConfig,
network_manager: NetworkManager,
storage_manager: StorageManager,
routing_table: RoutingTable,
inner: Arc<Mutex<RPCProcessorInner>>,
unlocked_inner: Arc<RPCProcessorUnlockedInner>,
}
@ -322,34 +121,37 @@ impl RPCProcessor {
}
}
fn new_unlocked_inner(
config: VeilidConfig,
network_manager: NetworkManager,
update_callback: UpdateCallback,
) -> RPCProcessorUnlockedInner {
// make local copy of node id for easy access
let c = config.get();
let (concurrency, queue_size, max_route_hop_count, timeout_us) = {
let config = network_manager.config();
let c = config.get();
// set up channel
let mut concurrency = c.network.rpc.concurrency;
let queue_size = c.network.rpc.queue_size;
let timeout_us = TimestampDuration::new(ms_to_us(c.network.rpc.timeout_ms));
let max_route_hop_count = c.network.rpc.max_route_hop_count as usize;
if concurrency == 0 {
concurrency = get_concurrency();
// set up channel
let mut concurrency = c.network.rpc.concurrency;
let queue_size = c.network.rpc.queue_size;
let timeout_us = TimestampDuration::new(ms_to_us(c.network.rpc.timeout_ms));
let max_route_hop_count = c.network.rpc.max_route_hop_count as usize;
if concurrency == 0 {
concurrency = 1;
}
concurrency = get_concurrency();
if concurrency == 0 {
concurrency = 1;
}
// Default RPC concurrency is the number of CPUs * 16 rpc workers per core, as a single worker takes about 1% CPU when relaying and 16% is reasonable for baseline plus relay
concurrency *= 16;
}
let validate_dial_info_receipt_time_ms = c.network.dht.validate_dial_info_receipt_time_ms;
// Default RPC concurrency is the number of CPUs * 16 rpc workers per core, as a single worker takes about 1% CPU when relaying and 16% is reasonable for baseline plus relay
concurrency *= 16;
}
(concurrency, queue_size, max_route_hop_count, timeout_us)
};
RPCProcessorUnlockedInner {
network_manager,
timeout_us,
queue_size,
concurrency,
max_route_hop_count,
validate_dial_info_receipt_time_ms,
update_callback,
waiting_rpc_table: OperationWaiter::new(),
waiting_app_call_table: OperationWaiter::new(),
@ -357,28 +159,36 @@ impl RPCProcessor {
}
}
pub fn new(network_manager: NetworkManager, update_callback: UpdateCallback) -> Self {
let config = network_manager.config();
Self {
crypto: network_manager.crypto(),
config: config.clone(),
network_manager: network_manager.clone(),
routing_table: network_manager.routing_table(),
storage_manager: network_manager.storage_manager(),
inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(Self::new_unlocked_inner(config, update_callback)),
unlocked_inner: Arc::new(Self::new_unlocked_inner(network_manager, update_callback)),
}
}
pub fn network_manager(&self) -> NetworkManager {
self.network_manager.clone()
self.unlocked_inner.network_manager.clone()
}
pub fn crypto(&self) -> Crypto {
self.unlocked_inner.network_manager.crypto()
}
pub fn event_bus(&self) -> EventBus {
self.unlocked_inner.network_manager.event_bus()
}
pub fn routing_table(&self) -> RoutingTable {
self.routing_table.clone()
self.unlocked_inner.network_manager.routing_table()
}
pub fn storage_manager(&self) -> StorageManager {
self.storage_manager.clone()
self.unlocked_inner.network_manager.storage_manager()
}
pub fn with_config<R, F: FnOnce(&VeilidConfigInner) -> R>(&self, func: F) -> R {
let config = self.unlocked_inner.network_manager.config();
let c = config.get();
func(&c)
}
//////////////////////////////////////////////////////////////////////
@ -411,7 +221,7 @@ impl RPCProcessor {
}
// Inform storage manager we are up
self.storage_manager
self.storage_manager()
.set_rpc_processor(Some(self.clone()))
.await;
@ -428,7 +238,7 @@ impl RPCProcessor {
};
// Stop storage manager from using us
self.storage_manager.set_rpc_processor(None).await;
self.storage_manager().set_rpc_processor(None).await;
// Stop the rpc workers
let mut unord = FuturesUnordered::new();
@ -458,7 +268,7 @@ impl RPCProcessor {
//////////////////////////////////////////////////////////////////////
/// Get waiting app call id for debugging purposes
pub(crate) fn get_app_call_ids(&self) -> Vec<OperationId> {
pub fn get_app_call_ids(&self) -> Vec<OperationId> {
self.unlocked_inner
.waiting_app_call_table
.get_operation_ids()
@ -490,7 +300,7 @@ impl RPCProcessor {
let Some(peer_info) = sender_peer_info.opt_peer_info.clone() else {
return Ok(NetworkResult::value(None));
};
let address_filter = self.network_manager.address_filter();
let address_filter = self.network_manager().address_filter();
// Ensure the sender peer info is for the actual sender specified in the envelope
if !peer_info.node_ids().contains(&sender_node_id) {
@ -641,15 +451,14 @@ impl RPCProcessor {
}
// If nobody knows where this node is, ask the DHT for it
let (node_count, _consensus_count, fanout, timeout) = {
let c = this.config.get();
let (node_count, _consensus_count, fanout, timeout) = this.with_config(|c| {
(
c.network.dht.max_find_node_count as usize,
c.network.dht.resolve_node_count as usize,
c.network.dht.resolve_node_fanout as usize,
TimestampDuration::from(ms_to_us(c.network.dht.resolve_node_timeout_ms)),
)
};
});
// Search routing domains for peer
// xxx: Eventually add other routing domains here
@ -682,7 +491,7 @@ impl RPCProcessor {
&self,
waitable_reply: WaitableReply,
debug_string: String,
) -> Result<TimeoutOr<(RPCMessage, TimestampDuration)>, RPCError> {
) -> Result<TimeoutOr<(Message, TimestampDuration)>, RPCError> {
let id = waitable_reply.handle.id();
let out = self
.unlocked_inner
@ -735,7 +544,7 @@ impl RPCProcessor {
}
RPCMessageHeaderDetail::SafetyRouted(sr) => {
let node_id = self
.routing_table
.routing_table()
.node_id(sr.direct.envelope.get_crypto_kind());
if node_id.value != reply_private_route {
return Err(RPCError::protocol(
@ -775,7 +584,7 @@ impl RPCProcessor {
let pr_hop_count = remote_private_route.hop_count;
let pr_pubkey = remote_private_route.public_key.value;
let crypto_kind = remote_private_route.crypto_kind();
let Some(vcrypto) = self.crypto.get(crypto_kind) else {
let Some(vcrypto) = self.crypto().get(crypto_kind) else {
return Err(RPCError::internal(
"crypto not available for selected private route",
));
@ -984,7 +793,7 @@ impl RPCProcessor {
opt_node,
opt_relay: _,
opt_routing_domain,
}) = dest.get_unsafe_routing_info(self.routing_table.clone())
}) = dest.get_unsafe_routing_info(self.routing_table())
else {
return SenderPeerInfo::default();
};
@ -1040,12 +849,12 @@ impl RPCProcessor {
// If safety route was in use, record failure to send there
if let Some(sr_pubkey) = &safety_route {
let rss = self.routing_table.route_spec_store();
let rss = self.routing_table().route_spec_store();
rss.with_route_stats_mut(send_ts, sr_pubkey, |s| s.record_send_failed());
} else {
// If no safety route was in use, then it's the private route's fault if we have one
if let Some(pr_pubkey) = &remote_private_route {
let rss = self.routing_table.route_spec_store();
let rss = self.routing_table().route_spec_store();
rss.with_route_stats_mut(send_ts, pr_pubkey, |s| s.record_send_failed());
}
}
@ -1071,11 +880,10 @@ impl RPCProcessor {
return;
}
// Get route spec store
let rss = self.routing_table.route_spec_store();
let rss = self.routing_table().route_spec_store();
// If safety route was used, record question lost there
if let Some(sr_pubkey) = &safety_route {
let rss = self.routing_table.route_spec_store();
rss.with_route_stats_mut(send_ts, sr_pubkey, |s| {
s.record_lost_answer();
});
@ -1119,7 +927,7 @@ impl RPCProcessor {
}
// Get route spec store
let rss = self.routing_table.route_spec_store();
let rss = self.routing_table().route_spec_store();
// If safety route was used, record send there
if let Some(sr_pubkey) = &safety_route {
@ -1130,7 +938,6 @@ impl RPCProcessor {
// If remote private route was used, record send there
if let Some(pr_pubkey) = &remote_private_route {
let rss = self.routing_table.route_spec_store();
rss.with_route_stats_mut(send_ts, pr_pubkey, |s| {
s.record_sent(send_ts, bytes);
});
@ -1157,7 +964,7 @@ impl RPCProcessor {
return;
}
// Get route spec store
let rss = self.routing_table.route_spec_store();
let rss = self.routing_table().route_spec_store();
// Get latency for all local routes
let mut total_local_latency = TimestampDuration::new(0u64);
@ -1211,7 +1018,7 @@ impl RPCProcessor {
// This is fine because if we sent with a local safety route,
// then we must have received with a local private route too, per the design rules
if let Some(sr_pubkey) = &safety_route {
let rss = self.routing_table.route_spec_store();
let rss = self.routing_table().route_spec_store();
rss.with_route_stats_mut(send_ts, sr_pubkey, |s| {
s.record_latency(total_latency / 2u64);
});
@ -1226,7 +1033,7 @@ impl RPCProcessor {
/// Record question or statement received from node or route
#[instrument(level = "trace", target = "rpc", skip_all)]
fn record_question_received(&self, msg: &RPCMessage) {
fn record_question_received(&self, msg: &Message) {
let recv_ts = msg.header.timestamp;
let bytes = msg.header.body_len;
@ -1240,7 +1047,7 @@ impl RPCProcessor {
}
// Process messages that arrived with no private route (private route stub)
RPCMessageHeaderDetail::SafetyRouted(d) => {
let rss = self.routing_table.route_spec_store();
let rss = self.routing_table().route_spec_store();
// This may record nothing if the remote safety route is not also
// a remote private route that been imported, but that's okay
@ -1250,7 +1057,7 @@ impl RPCProcessor {
}
// Process messages that arrived to our private route
RPCMessageHeaderDetail::PrivateRouted(d) => {
let rss = self.routing_table.route_spec_store();
let rss = self.routing_table().route_spec_store();
// This may record nothing if the remote safety route is not also
// a remote private route that been imported, but that's okay
@ -1439,7 +1246,7 @@ impl RPCProcessor {
/// Issue a reply over the network, possibly using an anonymized route
/// The request must want a response, or this routine fails
#[instrument(level = "trace", target = "rpc", skip_all)]
async fn answer(&self, request: RPCMessage, answer: RPCAnswer) -> RPCNetworkResult<()> {
async fn answer(&self, request: Message, answer: RPCAnswer) -> RPCNetworkResult<()> {
// Extract destination from respond_to
let dest = network_result_try!(self.get_respond_to_destination(&request));
@ -1511,10 +1318,7 @@ impl RPCProcessor {
/// This performs a capnp decode on the data, and if it passes the capnp schema
/// it performs the cryptographic validation required to pass the operation up for processing
#[instrument(level = "trace", target = "rpc", skip_all)]
fn decode_rpc_operation(
&self,
encoded_msg: &RPCMessageEncoded,
) -> Result<RPCOperation, RPCError> {
fn decode_rpc_operation(&self, encoded_msg: &MessageEncoded) -> Result<RPCOperation, RPCError> {
let reader = encoded_msg.data.get_reader()?;
let op_reader = reader
.get_root::<veilid_capnp::operation::Reader>()
@ -1556,7 +1360,7 @@ impl RPCProcessor {
// Validate the RPC operation
let validate_context = RPCValidateContext {
crypto: self.crypto.clone(),
crypto: self.crypto(),
// rpc_processor: self.clone(),
question_context,
};
@ -1567,8 +1371,8 @@ impl RPCProcessor {
//////////////////////////////////////////////////////////////////////
#[instrument(level = "trace", target = "rpc", skip_all)]
async fn process_rpc_message(&self, encoded_msg: RPCMessageEncoded) -> RPCNetworkResult<()> {
let address_filter = self.network_manager.address_filter();
async fn process_rpc_message(&self, encoded_msg: MessageEncoded) -> RPCNetworkResult<()> {
let address_filter = self.network_manager().address_filter();
// Decode operation appropriately based on header detail
let msg = match &encoded_msg.header.detail {
@ -1639,7 +1443,7 @@ impl RPCProcessor {
}
// Make the RPC message
RPCMessage {
Message {
header: encoded_msg.header,
operation,
opt_sender_nr,
@ -1660,7 +1464,7 @@ impl RPCProcessor {
};
// Make the RPC message
RPCMessage {
Message {
header: encoded_msg.header,
operation,
opt_sender_nr: None,
@ -1758,7 +1562,7 @@ impl RPCProcessor {
async fn rpc_worker(
self,
stop_token: StopToken,
receiver: flume::Receiver<(Span, RPCMessageEncoded)>,
receiver: flume::Receiver<(Span, MessageEncoded)>,
) {
while let Ok(Ok((prev_span, msg))) =
receiver.recv_async().timeout_at(stop_token.clone()).await
@ -1801,7 +1605,7 @@ impl RPCProcessor {
bail!("routing domain should match peer noderef filter");
}
let header = RPCMessageHeader {
let header = MessageHeader {
detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect {
envelope,
peer_noderef,
@ -1812,9 +1616,9 @@ impl RPCProcessor {
body_len: ByteCount::new(body.len() as u64),
};
let msg = RPCMessageEncoded {
let msg = MessageEncoded {
header,
data: RPCMessageData { contents: body },
data: MessageData { contents: body },
};
let send_channel = {
@ -1838,7 +1642,7 @@ impl RPCProcessor {
sequencing: Sequencing,
body: Vec<u8>,
) -> EyreResult<()> {
let header = RPCMessageHeader {
let header = MessageHeader {
detail: RPCMessageHeaderDetail::SafetyRouted(RPCMessageHeaderDetailSafetyRouted {
direct,
remote_safety_route,
@ -1848,9 +1652,9 @@ impl RPCProcessor {
body_len: (body.len() as u64).into(),
};
let msg = RPCMessageEncoded {
let msg = MessageEncoded {
header,
data: RPCMessageData { contents: body },
data: MessageData { contents: body },
};
let send_channel = {
let inner = self.inner.lock();
@ -1874,7 +1678,7 @@ impl RPCProcessor {
safety_spec: SafetySpec,
body: Vec<u8>,
) -> EyreResult<()> {
let header = RPCMessageHeader {
let header = MessageHeader {
detail: RPCMessageHeaderDetail::PrivateRouted(RPCMessageHeaderDetailPrivateRouted {
direct,
remote_safety_route,
@ -1885,9 +1689,9 @@ impl RPCProcessor {
body_len: (body.len() as u64).into(),
};
let msg = RPCMessageEncoded {
let msg = MessageEncoded {
header,
data: RPCMessageData { contents: body },
data: MessageData { contents: body },
};
let send_channel = {

View File

@ -1,7 +1,7 @@
use super::*;
#[derive(Debug)]
pub struct OperationWaitHandle<T, C>
pub(super) struct OperationWaitHandle<T, C>
where
T: Unpin,
C: Unpin + Clone,
@ -34,7 +34,7 @@ where
}
#[derive(Debug)]
pub struct OperationWaitingOp<T, C>
struct OperationWaitingOp<T, C>
where
T: Unpin,
C: Unpin + Clone,
@ -45,7 +45,7 @@ where
}
#[derive(Debug)]
pub struct OperationWaiterInner<T, C>
struct OperationWaiterInner<T, C>
where
T: Unpin,
C: Unpin + Clone,
@ -54,7 +54,7 @@ where
}
#[derive(Debug)]
pub struct OperationWaiter<T, C>
pub(super) struct OperationWaiter<T, C>
where
T: Unpin,
C: Unpin + Clone,

Some files were not shown because too many files have changed in this diff Show More