cleanup visibility

This commit is contained in:
Christien Rioux 2023-10-27 16:12:58 -04:00
parent 2051292a26
commit 74e2f9a2c0
49 changed files with 207 additions and 361 deletions

View File

@ -28,7 +28,6 @@ impl AttachmentManager {
fn new_unlocked_inner( fn new_unlocked_inner(
config: VeilidConfig, config: VeilidConfig,
storage_manager: StorageManager, storage_manager: StorageManager,
protected_store: ProtectedStore,
table_store: TableStore, table_store: TableStore,
#[cfg(feature = "unstable-blockstore")] block_store: BlockStore, #[cfg(feature = "unstable-blockstore")] block_store: BlockStore,
crypto: Crypto, crypto: Crypto,
@ -58,7 +57,6 @@ impl AttachmentManager {
pub fn new( pub fn new(
config: VeilidConfig, config: VeilidConfig,
storage_manager: StorageManager, storage_manager: StorageManager,
protected_store: ProtectedStore,
table_store: TableStore, table_store: TableStore,
#[cfg(feature = "unstable-blockstore")] block_store: BlockStore, #[cfg(feature = "unstable-blockstore")] block_store: BlockStore,
crypto: Crypto, crypto: Crypto,
@ -68,7 +66,6 @@ impl AttachmentManager {
unlocked_inner: Arc::new(Self::new_unlocked_inner( unlocked_inner: Arc::new(Self::new_unlocked_inner(
config, config,
storage_manager, storage_manager,
protected_store,
table_store, table_store,
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
block_store, block_store,

View File

@ -140,7 +140,6 @@ impl ServicesContext {
let attachment_manager = AttachmentManager::new( let attachment_manager = AttachmentManager::new(
self.config.clone(), self.config.clone(),
storage_manager, storage_manager,
protected_store,
table_store, table_store,
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
block_store, block_store,

View File

@ -2,7 +2,7 @@ use super::*;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct ConnectionHandle { pub struct ConnectionHandle {
id: NetworkConnectionId, _id: NetworkConnectionId,
descriptor: ConnectionDescriptor, descriptor: ConnectionDescriptor,
channel: flume::Sender<(Option<Id>, Vec<u8>)>, channel: flume::Sender<(Option<Id>, Vec<u8>)>,
} }
@ -20,27 +20,28 @@ impl ConnectionHandle {
channel: flume::Sender<(Option<Id>, Vec<u8>)>, channel: flume::Sender<(Option<Id>, Vec<u8>)>,
) -> Self { ) -> Self {
Self { Self {
id, _id: id,
descriptor, descriptor,
channel, channel,
} }
} }
pub fn connection_id(&self) -> NetworkConnectionId { // pub fn connection_id(&self) -> NetworkConnectionId {
self.id // self.id
} // }
pub fn connection_descriptor(&self) -> ConnectionDescriptor { pub fn connection_descriptor(&self) -> ConnectionDescriptor {
self.descriptor self.descriptor
} }
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", skip(self, message), fields(message.len = message.len())))] // #[cfg_attr(feature="verbose-tracing", instrument(level="trace", skip(self, message), fields(message.len = message.len())))]
pub fn send(&self, message: Vec<u8>) -> ConnectionHandleSendResult { // pub fn send(&self, message: Vec<u8>) -> ConnectionHandleSendResult {
match self.channel.send((Span::current().id(), message)) { // match self.channel.send((Span::current().id(), message)) {
Ok(()) => ConnectionHandleSendResult::Sent, // Ok(()) => ConnectionHandleSendResult::Sent,
Err(e) => ConnectionHandleSendResult::NotSent(e.0 .1), // Err(e) => ConnectionHandleSendResult::NotSent(e.0 .1),
} // }
} // }
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", skip(self, message), fields(message.len = message.len())))] #[cfg_attr(feature="verbose-tracing", instrument(level="trace", skip(self, message), fields(message.len = message.len())))]
pub async fn send_async(&self, message: Vec<u8>) -> ConnectionHandleSendResult { pub async fn send_async(&self, message: Vec<u8>) -> ConnectionHandleSendResult {
match self match self

View File

@ -84,10 +84,6 @@ impl ConnectionManager {
self.arc.network_manager.clone() self.arc.network_manager.clone()
} }
pub fn connection_initial_timeout_ms(&self) -> u32 {
self.arc.connection_initial_timeout_ms
}
pub fn connection_inactivity_timeout_ms(&self) -> u32 { pub fn connection_inactivity_timeout_ms(&self) -> u32 {
self.arc.connection_inactivity_timeout_ms self.arc.connection_inactivity_timeout_ms
} }

View File

@ -690,10 +690,6 @@ impl Network {
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
pub fn get_protocol_config(&self) -> ProtocolConfig {
self.inner.lock().protocol_config.clone()
}
#[instrument(level = "debug", err, skip_all)] #[instrument(level = "debug", err, skip_all)]
pub async fn startup(&self) -> EyreResult<()> { pub async fn startup(&self) -> EyreResult<()> {
// initialize interfaces // initialize interfaces

View File

@ -9,7 +9,7 @@ use std::io;
#[derive(Debug)] #[derive(Debug)]
pub(in crate::network_manager) enum ProtocolNetworkConnection { pub(in crate::network_manager) enum ProtocolNetworkConnection {
Dummy(DummyNetworkConnection), // Dummy(DummyNetworkConnection),
RawTcp(tcp::RawTcpNetworkConnection), RawTcp(tcp::RawTcpNetworkConnection),
WsAccepted(ws::WebSocketNetworkConnectionAccepted), WsAccepted(ws::WebSocketNetworkConnectionAccepted),
Ws(ws::WebsocketNetworkConnectionWS), Ws(ws::WebsocketNetworkConnectionWS),
@ -47,7 +47,7 @@ impl ProtocolNetworkConnection {
pub fn descriptor(&self) -> ConnectionDescriptor { pub fn descriptor(&self) -> ConnectionDescriptor {
match self { match self {
Self::Dummy(d) => d.descriptor(), // Self::Dummy(d) => d.descriptor(),
Self::RawTcp(t) => t.descriptor(), Self::RawTcp(t) => t.descriptor(),
Self::WsAccepted(w) => w.descriptor(), Self::WsAccepted(w) => w.descriptor(),
Self::Ws(w) => w.descriptor(), Self::Ws(w) => w.descriptor(),
@ -57,7 +57,7 @@ impl ProtocolNetworkConnection {
pub async fn close(&self) -> io::Result<NetworkResult<()>> { pub async fn close(&self) -> io::Result<NetworkResult<()>> {
match self { match self {
Self::Dummy(d) => d.close(), // Self::Dummy(d) => d.close(),
Self::RawTcp(t) => t.close().await, Self::RawTcp(t) => t.close().await,
Self::WsAccepted(w) => w.close().await, Self::WsAccepted(w) => w.close().await,
Self::Ws(w) => w.close().await, Self::Ws(w) => w.close().await,
@ -67,7 +67,7 @@ impl ProtocolNetworkConnection {
pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> { pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> {
match self { match self {
Self::Dummy(d) => d.send(message), // Self::Dummy(d) => d.send(message),
Self::RawTcp(t) => t.send(message).await, Self::RawTcp(t) => t.send(message).await,
Self::WsAccepted(w) => w.send(message).await, Self::WsAccepted(w) => w.send(message).await,
Self::Ws(w) => w.send(message).await, Self::Ws(w) => w.send(message).await,
@ -76,7 +76,7 @@ impl ProtocolNetworkConnection {
} }
pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> { pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> {
match self { match self {
Self::Dummy(d) => d.recv(), // Self::Dummy(d) => d.recv(),
Self::RawTcp(t) => t.recv().await, Self::RawTcp(t) => t.recv().await,
Self::WsAccepted(w) => w.recv().await, Self::WsAccepted(w) => w.recv().await,
Self::Ws(w) => w.recv().await, Self::Ws(w) => w.recv().await,

View File

@ -45,25 +45,25 @@ cfg_if::cfg_if! {
/////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////
// Dummy protocol network connection for testing // Dummy protocol network connection for testing
#[derive(Debug)] // #[derive(Debug)]
pub struct DummyNetworkConnection { // pub struct DummyNetworkConnection {
descriptor: ConnectionDescriptor, // descriptor: ConnectionDescriptor,
} // }
impl DummyNetworkConnection { // impl DummyNetworkConnection {
pub fn descriptor(&self) -> ConnectionDescriptor { // pub fn descriptor(&self) -> ConnectionDescriptor {
self.descriptor // self.descriptor
} // }
pub fn close(&self) -> io::Result<NetworkResult<()>> { // pub fn close(&self) -> io::Result<NetworkResult<()>> {
Ok(NetworkResult::Value(())) // Ok(NetworkResult::Value(()))
} // }
pub fn send(&self, _message: Vec<u8>) -> io::Result<NetworkResult<()>> { // pub fn send(&self, _message: Vec<u8>) -> io::Result<NetworkResult<()>> {
Ok(NetworkResult::Value(())) // Ok(NetworkResult::Value(()))
} // }
pub fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> { // pub fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> {
Ok(NetworkResult::Value(Vec::new())) // Ok(NetworkResult::Value(Vec::new()))
} // }
} // }
/////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////
// Top-level protocol independent network connection object // Top-level protocol independent network connection object

View File

@ -7,6 +7,7 @@ use routing_table::*;
use stop_token::future::FutureExt; use stop_token::future::FutureExt;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
#[allow(dead_code)]
pub(crate) enum ReceiptEvent { pub(crate) enum ReceiptEvent {
ReturnedOutOfBand, ReturnedOutOfBand,
ReturnedInBand { inbound_noderef: NodeRef }, ReturnedInBand { inbound_noderef: NodeRef },
@ -52,6 +53,7 @@ where
type ReceiptCallbackType = Box<dyn ReceiptCallback>; type ReceiptCallbackType = Box<dyn ReceiptCallback>;
type ReceiptSingleShotType = SingleShotEventual<ReceiptEvent>; type ReceiptSingleShotType = SingleShotEventual<ReceiptEvent>;
#[allow(dead_code)]
enum ReceiptRecordCallbackType { enum ReceiptRecordCallbackType {
Normal(ReceiptCallbackType), Normal(ReceiptCallbackType),
SingleShot(Option<ReceiptSingleShotType>), SingleShot(Option<ReceiptSingleShotType>),
@ -90,6 +92,7 @@ impl fmt::Debug for ReceiptRecord {
} }
impl ReceiptRecord { impl ReceiptRecord {
#[allow(dead_code)]
pub fn new( pub fn new(
receipt: Receipt, receipt: Receipt,
expiration_ts: Timestamp, expiration_ts: Timestamp,
@ -314,6 +317,7 @@ impl ReceiptManager {
debug!("finished receipt manager shutdown"); debug!("finished receipt manager shutdown");
} }
#[allow(dead_code)]
pub fn record_receipt( pub fn record_receipt(
&self, &self,
receipt: Receipt, receipt: Receipt,
@ -369,6 +373,7 @@ impl ReceiptManager {
inner.next_oldest_ts = new_next_oldest_ts; inner.next_oldest_ts = new_next_oldest_ts;
} }
#[allow(dead_code)]
pub async fn cancel_receipt(&self, nonce: &Nonce) -> EyreResult<()> { pub async fn cancel_receipt(&self, nonce: &Nonce) -> EyreResult<()> {
log_rpc!(debug "== Cancel Receipt {}", nonce.encode()); log_rpc!(debug "== Cancel Receipt {}", nonce.encode());

View File

@ -67,7 +67,7 @@ impl NetworkManager {
.add_down(bytes); .add_down(bytes);
} }
// Get stats #[allow(dead_code)]
pub fn get_stats(&self) -> NetworkManagerStats { pub fn get_stats(&self) -> NetworkManagerStats {
let inner = self.inner.lock(); let inner = self.inner.lock();
inner.stats.clone() inner.stats.clone()

View File

@ -1,7 +1,6 @@
use super::*; use super::*;
// Ordering here matters, IPV6 is preferred to IPV4 in dial info sorts // Ordering here matters, IPV6 is preferred to IPV4 in dial info sorts
// See issue #236 for eventual resolution of this unfortunate implementation
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)] #[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
pub enum Address { pub enum Address {
IPV6(Ipv6Addr), IPV6(Ipv6Addr),
@ -21,6 +20,7 @@ impl Address {
SocketAddr::V6(v6) => Address::IPV6(*v6.ip()), SocketAddr::V6(v6) => Address::IPV6(*v6.ip()),
} }
} }
#[cfg_attr(target_arch = "wasm32", allow(dead_code))]
pub fn from_ip_addr(addr: IpAddr) -> Address { pub fn from_ip_addr(addr: IpAddr) -> Address {
match addr { match addr {
IpAddr::V4(v4) => Address::IPV4(v4), IpAddr::V4(v4) => Address::IPV4(v4),
@ -33,18 +33,6 @@ impl Address {
Address::IPV6(_) => AddressType::IPV6, Address::IPV6(_) => AddressType::IPV6,
} }
} }
pub fn address_string(&self) -> String {
match self {
Address::IPV4(v4) => v4.to_string(),
Address::IPV6(v6) => v6.to_string(),
}
}
pub fn address_string_with_port(&self, port: u16) -> String {
match self {
Address::IPV4(v4) => format!("{}:{}", v4, port),
Address::IPV6(v6) => format!("[{}]:{}", v6, port),
}
}
pub fn is_unspecified(&self) -> bool { pub fn is_unspecified(&self) -> bool {
match self { match self {
Address::IPV4(v4) => ipv4addr_is_unspecified(v4), Address::IPV4(v4) => ipv4addr_is_unspecified(v4),

View File

@ -234,6 +234,7 @@ impl DialInfo {
Self::WSS(di) => di.socket_address.address(), Self::WSS(di) => di.socket_address.address(),
} }
} }
#[allow(dead_code)]
pub fn set_address(&mut self, address: Address) { pub fn set_address(&mut self, address: Address) {
match self { match self {
Self::UDP(di) => di.socket_address.set_address(address), Self::UDP(di) => di.socket_address.set_address(address),
@ -453,6 +454,7 @@ impl DialInfo {
} }
} }
} }
#[allow(dead_code)]
pub async fn to_url(&self) -> String { pub async fn to_url(&self) -> String {
match self { match self {
DialInfo::UDP(di) => intf::ptr_lookup(di.socket_address.ip_addr()) DialInfo::UDP(di) => intf::ptr_lookup(di.socket_address.ip_addr())

View File

@ -4,18 +4,8 @@ use super::*;
// Keep member order appropriate for sorting < preference // Keep member order appropriate for sorting < preference
// Must match DialInfo order // Must match DialInfo order
#[allow(clippy::derived_hash_with_manual_eq)] #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[derive(Debug, PartialOrd, Ord, Hash, EnumSetType, Serialize, Deserialize)]
#[enumset(repr = "u8")]
pub enum LowLevelProtocolType { pub enum LowLevelProtocolType {
UDP = 0, UDP = 0,
TCP = 1, TCP = 1,
} }
impl LowLevelProtocolType {
pub fn is_connection_oriented(&self) -> bool {
matches!(self, LowLevelProtocolType::TCP)
}
}
// pub type LowLevelProtocolTypeSet = EnumSet<LowLevelProtocolType>;

View File

@ -63,7 +63,7 @@ struct NetworkUnlockedInner {
} }
#[derive(Clone)] #[derive(Clone)]
pub struct Network { pub(in crate::network_manager) struct Network {
config: VeilidConfig, config: VeilidConfig,
inner: Arc<Mutex<NetworkInner>>, inner: Arc<Mutex<NetworkInner>>,
unlocked_inner: Arc<NetworkUnlockedInner>, unlocked_inner: Arc<NetworkUnlockedInner>,
@ -459,10 +459,6 @@ impl Network {
false false
} }
pub fn get_protocol_config(&self) -> ProtocolConfig {
self.inner.lock().protocol_config.clone()
}
////////////////////////////////////////// //////////////////////////////////////////
pub async fn tick(&self) -> EyreResult<()> { pub async fn tick(&self) -> EyreResult<()> {
Ok(()) Ok(())

View File

@ -5,9 +5,9 @@ use super::*;
use std::io; use std::io;
#[derive(Debug)] #[derive(Debug)]
pub enum ProtocolNetworkConnection { pub(in crate::network_manager) enum ProtocolNetworkConnection {
#[allow(dead_code)] #[allow(dead_code)]
Dummy(DummyNetworkConnection), //Dummy(DummyNetworkConnection),
Ws(ws::WebsocketNetworkConnection), Ws(ws::WebsocketNetworkConnection),
//WebRTC(wrtc::WebRTCNetworkConnection), //WebRTC(wrtc::WebRTCNetworkConnection),
} }
@ -37,26 +37,26 @@ impl ProtocolNetworkConnection {
pub fn descriptor(&self) -> ConnectionDescriptor { pub fn descriptor(&self) -> ConnectionDescriptor {
match self { match self {
Self::Dummy(d) => d.descriptor(), // Self::Dummy(d) => d.descriptor(),
Self::Ws(w) => w.descriptor(), Self::Ws(w) => w.descriptor(),
} }
} }
pub async fn close(&self) -> io::Result<NetworkResult<()>> { pub async fn close(&self) -> io::Result<NetworkResult<()>> {
match self { match self {
Self::Dummy(d) => d.close(), // Self::Dummy(d) => d.close(),
Self::Ws(w) => w.close().await, Self::Ws(w) => w.close().await,
} }
} }
pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> { pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> {
match self { match self {
Self::Dummy(d) => d.send(message), // Self::Dummy(d) => d.send(message),
Self::Ws(w) => w.send(message).await, Self::Ws(w) => w.send(message).await,
} }
} }
pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> { pub async fn recv(&self) -> io::Result<NetworkResult<Vec<u8>>> {
match self { match self {
Self::Dummy(d) => d.recv(), // Self::Dummy(d) => d.recv(),
Self::Ws(w) => w.recv().await, Self::Ws(w) => w.recv().await,
} }
} }

View File

@ -117,7 +117,7 @@ impl WebsocketNetworkConnection {
/////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////
/// ///
pub struct WebsocketProtocolHandler {} pub(in crate::network_manager) struct WebsocketProtocolHandler {}
impl WebsocketProtocolHandler { impl WebsocketProtocolHandler {
#[instrument(level = "trace", ret, err)] #[instrument(level = "trace", ret, err)]

View File

@ -175,6 +175,7 @@ impl BucketEntryInner {
} }
// Less is faster // Less is faster
#[allow(dead_code)]
pub fn cmp_fastest(e1: &Self, e2: &Self) -> std::cmp::Ordering { pub fn cmp_fastest(e1: &Self, e2: &Self) -> std::cmp::Ordering {
// Lower latency to the front // Lower latency to the front
if let Some(e1_latency) = &e1.peer_stats.latency { if let Some(e1_latency) = &e1.peer_stats.latency {
@ -234,6 +235,7 @@ impl BucketEntryInner {
} }
} }
#[allow(dead_code)]
pub fn sort_fastest_reliable_fn(cur_ts: Timestamp) -> impl FnMut(&Self, &Self) -> std::cmp::Ordering { pub fn sort_fastest_reliable_fn(cur_ts: Timestamp) -> impl FnMut(&Self, &Self) -> std::cmp::Ordering {
move |e1, e2| Self::cmp_fastest_reliable(cur_ts, e1, e2) move |e1, e2| Self::cmp_fastest_reliable(cur_ts, e1, e2)
} }
@ -537,6 +539,7 @@ impl BucketEntryInner {
self.envelope_support = envelope_support; self.envelope_support = envelope_support;
} }
#[allow(dead_code)]
pub fn envelope_support(&self) -> Vec<u8> { pub fn envelope_support(&self) -> Vec<u8> {
self.envelope_support.clone() self.envelope_support.clone()
} }
@ -617,12 +620,8 @@ impl BucketEntryInner {
} }
} }
pub fn set_updated_since_last_network_change(&mut self, updated: bool) { pub fn reset_updated_since_last_network_change(&mut self) {
self.updated_since_last_network_change = updated; self.updated_since_last_network_change = false;
}
pub fn has_updated_since_last_network_change(&self) -> bool {
self.updated_since_last_network_change
} }
///// stats methods ///// stats methods

View File

@ -20,20 +20,20 @@ use super::*;
use crate::crypto::*; use crate::crypto::*;
use crate::network_manager::*; use crate::network_manager::*;
use crate::rpc_processor::*; use crate::rpc_processor::*;
use bucket::*; use bucket::*;
use hashlink::LruCache; use hashlink::LruCache;
pub use bucket_entry::*; pub(crate) use bucket_entry::*;
pub use debug::*; pub(crate) use node_ref::*;
pub use find_peers::*; pub(crate) use node_ref_filter::*;
pub use node_ref::*; pub(crate) use privacy::*;
pub use node_ref_filter::*; pub(crate) use route_spec_store::*;
pub use privacy::*; pub(crate) use routing_domain_editor::*;
pub use route_spec_store::*; pub(crate) use routing_domains::*;
pub use routing_domain_editor::*; pub(crate) use routing_table_inner::*;
pub use routing_domains::*; pub(crate) use stats_accounting::*;
pub use routing_table_inner::*;
pub use stats_accounting::*;
pub use types::*; pub use types::*;
////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////
@ -64,7 +64,7 @@ pub struct LowLevelPortInfo {
pub low_level_protocol_ports: LowLevelProtocolPorts, pub low_level_protocol_ports: LowLevelProtocolPorts,
pub protocol_to_port: ProtocolToPortMapping, pub protocol_to_port: ProtocolToPortMapping,
} }
pub type RoutingTableEntryFilter<'t> = pub(crate) type RoutingTableEntryFilter<'t> =
Box<dyn FnMut(&RoutingTableInner, Option<Arc<BucketEntry>>) -> bool + Send + 't>; Box<dyn FnMut(&RoutingTableInner, Option<Arc<BucketEntry>>) -> bool + Send + 't>;
type SerializedBuckets = Vec<Vec<u8>>; type SerializedBuckets = Vec<Vec<u8>>;
@ -487,24 +487,10 @@ impl RoutingTable {
self.inner.read().relay_node_last_keepalive(domain) self.inner.read().relay_node_last_keepalive(domain)
} }
pub fn has_dial_info(&self, domain: RoutingDomain) -> bool {
self.inner.read().has_dial_info(domain)
}
pub fn dial_info_details(&self, domain: RoutingDomain) -> Vec<DialInfoDetail> { pub fn dial_info_details(&self, domain: RoutingDomain) -> Vec<DialInfoDetail> {
self.inner.read().dial_info_details(domain) self.inner.read().dial_info_details(domain)
} }
pub fn first_filtered_dial_info_detail(
&self,
routing_domain_set: RoutingDomainSet,
filter: &DialInfoFilter,
) -> Option<DialInfoDetail> {
self.inner
.read()
.first_filtered_dial_info_detail(routing_domain_set, filter)
}
pub fn all_filtered_dial_info_details( pub fn all_filtered_dial_info_details(
&self, &self,
routing_domain_set: RoutingDomainSet, routing_domain_set: RoutingDomainSet,
@ -521,16 +507,6 @@ impl RoutingTable {
.ensure_dial_info_is_valid(domain, dial_info) .ensure_dial_info_is_valid(domain, dial_info)
} }
pub fn node_info_is_valid_in_routing_domain(
&self,
routing_domain: RoutingDomain,
node_info: &NodeInfo,
) -> bool {
self.inner
.read()
.node_info_is_valid_in_routing_domain(routing_domain, node_info)
}
pub fn signed_node_info_is_valid_in_routing_domain( pub fn signed_node_info_is_valid_in_routing_domain(
&self, &self,
routing_domain: RoutingDomain, routing_domain: RoutingDomain,
@ -587,20 +563,6 @@ impl RoutingTable {
self.inner.read().get_network_class(routing_domain) self.inner.read().get_network_class(routing_domain)
} }
/// Return the domain's filter for what we can receivein the form of a dial info filter
pub fn get_inbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter {
self.inner
.read()
.get_inbound_dial_info_filter(routing_domain)
}
/// Return the domain's filter for what we can receive in the form of a node ref filter
pub fn get_inbound_node_ref_filter(&self, routing_domain: RoutingDomain) -> NodeRefFilter {
self.inner
.read()
.get_inbound_node_ref_filter(routing_domain)
}
/// Return the domain's filter for what we can send out in the form of a dial info filter /// Return the domain's filter for what we can send out in the form of a dial info filter
pub fn get_outbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter { pub fn get_outbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter {
self.inner self.inner
@ -625,27 +587,7 @@ impl RoutingTable {
self.inner.write().purge_last_connections(); self.inner.write().purge_last_connections();
} }
pub fn get_entry_count( /// See which nodes need to be pinged
&self,
routing_domain_set: RoutingDomainSet,
min_state: BucketEntryState,
crypto_kinds: &[CryptoKind],
) -> usize {
self.inner
.read()
.get_entry_count(routing_domain_set, min_state, crypto_kinds)
}
pub fn get_entry_count_per_crypto_kind(
&self,
routing_domain_set: RoutingDomainSet,
min_state: BucketEntryState,
) -> BTreeMap<CryptoKind, usize> {
self.inner
.read()
.get_entry_count_per_crypto_kind(routing_domain_set, min_state)
}
pub fn get_nodes_needing_ping( pub fn get_nodes_needing_ping(
&self, &self,
routing_domain: RoutingDomain, routing_domain: RoutingDomain,
@ -656,11 +598,6 @@ impl RoutingTable {
.get_nodes_needing_ping(self.clone(), routing_domain, cur_ts) .get_nodes_needing_ping(self.clone(), routing_domain, cur_ts)
} }
pub fn get_all_nodes(&self, cur_ts: Timestamp) -> Vec<NodeRef> {
let inner = self.inner.read();
inner.get_all_nodes(self.clone(), cur_ts)
}
fn queue_bucket_kicks(&self, node_ids: TypedKeyGroup) { fn queue_bucket_kicks(&self, node_ids: TypedKeyGroup) {
for node_id in node_ids.iter() { for node_id in node_ids.iter() {
// Skip node ids we didn't add to buckets // Skip node ids we didn't add to buckets
@ -780,12 +717,6 @@ impl RoutingTable {
out out
} }
pub fn touch_recent_peer(&self, node_id: TypedKey, last_connection: ConnectionDescriptor) {
self.inner
.write()
.touch_recent_peer(node_id, last_connection)
}
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////
// Find Nodes // Find Nodes
@ -984,27 +915,6 @@ impl RoutingTable {
out out
} }
pub fn find_peers_with_sort_and_filter<C, T, O>(
&self,
node_count: usize,
cur_ts: Timestamp,
filters: VecDeque<RoutingTableEntryFilter>,
compare: C,
transform: T,
) -> Vec<O>
where
C: for<'a, 'b> FnMut(
&'a RoutingTableInner,
&'b Option<Arc<BucketEntry>>,
&'b Option<Arc<BucketEntry>>,
) -> core::cmp::Ordering,
T: for<'r> FnMut(&'r RoutingTableInner, Option<Arc<BucketEntry>>) -> O + Send,
{
self.inner
.read()
.find_peers_with_sort_and_filter(node_count, cur_ts, filters, compare, transform)
}
pub fn find_preferred_fastest_nodes<'a, T, O>( pub fn find_preferred_fastest_nodes<'a, T, O>(
&self, &self,
node_count: usize, node_count: usize,

View File

@ -112,12 +112,6 @@ pub(crate) trait NodeRefBase: Sized {
fn best_node_id(&self) -> TypedKey { fn best_node_id(&self) -> TypedKey {
self.operate(|_rti, e| e.best_node_id()) self.operate(|_rti, e| e.best_node_id())
} }
fn has_updated_since_last_network_change(&self) -> bool {
self.operate(|_rti, e| e.has_updated_since_last_network_change())
}
fn set_updated_since_last_network_change(&self) {
self.operate_mut(|_rti, e| e.set_updated_since_last_network_change(true));
}
fn update_node_status(&self, routing_domain: RoutingDomain, node_status: NodeStatus) { fn update_node_status(&self, routing_domain: RoutingDomain, node_status: NodeStatus) {
self.operate_mut(|_rti, e| { self.operate_mut(|_rti, e| {
e.update_node_status(routing_domain, node_status); e.update_node_status(routing_domain, node_status);
@ -583,9 +577,9 @@ impl<'a> NodeRefLockedMut<'a> {
} }
} }
pub fn unlocked(&self) -> NodeRef { // pub fn unlocked(&self) -> NodeRef {
self.nr.clone() // self.nr.clone()
} // }
} }
impl<'a> NodeRefBase for NodeRefLockedMut<'a> { impl<'a> NodeRefBase for NodeRefLockedMut<'a> {

View File

@ -35,6 +35,7 @@ impl NodeRefFilter {
self.dial_info_filter = self.dial_info_filter.with_protocol_type(protocol_type); self.dial_info_filter = self.dial_info_filter.with_protocol_type(protocol_type);
self self
} }
#[allow(dead_code)]
pub fn with_protocol_type_set(mut self, protocol_set: ProtocolTypeSet) -> Self { pub fn with_protocol_type_set(mut self, protocol_set: ProtocolTypeSet) -> Self {
self.dial_info_filter = self.dial_info_filter.with_protocol_type_set(protocol_set); self.dial_info_filter = self.dial_info_filter.with_protocol_type_set(protocol_set);
self self
@ -43,6 +44,7 @@ impl NodeRefFilter {
self.dial_info_filter = self.dial_info_filter.with_address_type(address_type); self.dial_info_filter = self.dial_info_filter.with_address_type(address_type);
self self
} }
#[allow(dead_code)]
pub fn with_address_type_set(mut self, address_set: AddressTypeSet) -> Self { pub fn with_address_type_set(mut self, address_set: AddressTypeSet) -> Self {
self.dial_info_filter = self.dial_info_filter.with_address_type_set(address_set); self.dial_info_filter = self.dial_info_filter.with_address_type_set(address_set);
self self
@ -54,6 +56,7 @@ impl NodeRefFilter {
.filtered(&other_filter.dial_info_filter); .filtered(&other_filter.dial_info_filter);
self self
} }
#[allow(dead_code)]
pub fn is_dead(&self) -> bool { pub fn is_dead(&self) -> bool {
self.dial_info_filter.is_dead() || self.routing_domain_set.is_empty() self.dial_info_filter.is_dead() || self.routing_domain_set.is_empty()
} }

View File

@ -121,14 +121,6 @@ pub(crate) struct PrivateRoute {
} }
impl PrivateRoute { impl PrivateRoute {
/// Empty private route is the form used when receiving the last hop
pub fn new_empty(public_key: TypedKey) -> Self {
Self {
public_key,
hop_count: 0,
hops: PrivateRouteHops::Empty,
}
}
/// Stub route is the form used when no privacy is required, but you need to specify the destination for a safety route /// Stub route is the form used when no privacy is required, but you need to specify the destination for a safety route
pub fn new_stub(public_key: TypedKey, node: RouteNode) -> Self { pub fn new_stub(public_key: TypedKey, node: RouteNode) -> Self {
Self { Self {

View File

@ -1,4 +1,5 @@
use super::*; use super::*;
use crate::veilid_api::*;
mod permutation; mod permutation;
mod remote_private_route_info; mod remote_private_route_info;
@ -7,15 +8,14 @@ mod route_spec_store_cache;
mod route_spec_store_content; mod route_spec_store_content;
mod route_stats; mod route_stats;
pub use remote_private_route_info::*;
pub use route_set_spec_detail::*;
pub use route_spec_store_cache::*;
pub use route_spec_store_content::*;
pub use route_stats::*;
use crate::veilid_api::*;
use permutation::*; use permutation::*;
use remote_private_route_info::*;
use route_set_spec_detail::*;
use route_spec_store_cache::*;
use route_spec_store_content::*;
pub(crate) use route_spec_store_cache::CompiledRoute;
pub(crate) use route_stats::*;
/// The size of the remote private route cache /// The size of the remote private route cache
const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024; const REMOTE_PRIVATE_ROUTE_CACHE_SIZE: usize = 1024;
@ -27,14 +27,14 @@ const ROUTE_MIN_IDLE_TIME_MS: u32 = 30_000;
const COMPILED_ROUTE_CACHE_SIZE: usize = 256; const COMPILED_ROUTE_CACHE_SIZE: usize = 256;
#[derive(Debug)] #[derive(Debug)]
pub struct RouteSpecStoreInner { struct RouteSpecStoreInner {
/// Serialize RouteSpecStore content /// Serialize RouteSpecStore content
content: RouteSpecStoreContent, content: RouteSpecStoreContent,
/// RouteSpecStore cache /// RouteSpecStore cache
cache: RouteSpecStoreCache, cache: RouteSpecStoreCache,
} }
pub struct RouteSpecStoreUnlockedInner { struct RouteSpecStoreUnlockedInner {
/// Handle to routing table /// Handle to routing table
routing_table: RoutingTable, routing_table: RoutingTable,
/// Maximum number of hops in a route /// Maximum number of hops in a route
@ -54,7 +54,7 @@ impl fmt::Debug for RouteSpecStoreUnlockedInner {
/// The routing table's storage for private/safety routes /// The routing table's storage for private/safety routes
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct RouteSpecStore { pub(crate) struct RouteSpecStore {
inner: Arc<Mutex<RouteSpecStoreInner>>, inner: Arc<Mutex<RouteSpecStoreInner>>,
unlocked_inner: Arc<RouteSpecStoreUnlockedInner>, unlocked_inner: Arc<RouteSpecStoreUnlockedInner>,
} }

View File

@ -2,7 +2,7 @@ use super::*;
/// What remote private routes have seen /// What remote private routes have seen
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
pub struct RemotePrivateRouteInfo { pub(crate) struct RemotePrivateRouteInfo {
/// The private routes themselves /// The private routes themselves
private_routes: Vec<PrivateRoute>, private_routes: Vec<PrivateRoute>,
/// Did this remote private route see our node info due to no safety route in use /// Did this remote private route see our node info due to no safety route in use

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RouteSpecDetail { pub(crate) struct RouteSpecDetail {
/// Crypto kind /// Crypto kind
pub crypto_kind: CryptoKind, pub crypto_kind: CryptoKind,
/// Secret key /// Secret key
@ -11,7 +11,7 @@ pub struct RouteSpecDetail {
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RouteSetSpecDetail { pub(crate) struct RouteSetSpecDetail {
/// Route set per crypto kind /// Route set per crypto kind
route_set: BTreeMap<PublicKey, RouteSpecDetail>, route_set: BTreeMap<PublicKey, RouteSpecDetail>,
/// Route noderefs /// Route noderefs
@ -53,9 +53,6 @@ impl RouteSetSpecDetail {
pub fn get_route_by_key(&self, key: &PublicKey) -> Option<&RouteSpecDetail> { pub fn get_route_by_key(&self, key: &PublicKey) -> Option<&RouteSpecDetail> {
self.route_set.get(key) self.route_set.get(key)
} }
pub fn get_route_by_key_mut(&mut self, key: &PublicKey) -> Option<&mut RouteSpecDetail> {
self.route_set.get_mut(key)
}
pub fn get_route_set_keys(&self) -> TypedKeyGroup { pub fn get_route_set_keys(&self) -> TypedKeyGroup {
let mut tks = TypedKeyGroup::new(); let mut tks = TypedKeyGroup::new();
for (k, v) in &self.route_set { for (k, v) in &self.route_set {
@ -74,11 +71,6 @@ impl RouteSetSpecDetail {
) -> alloc::collections::btree_map::Iter<PublicKey, RouteSpecDetail> { ) -> alloc::collections::btree_map::Iter<PublicKey, RouteSpecDetail> {
self.route_set.iter() self.route_set.iter()
} }
pub fn iter_route_set_mut(
&mut self,
) -> alloc::collections::btree_map::IterMut<PublicKey, RouteSpecDetail> {
self.route_set.iter_mut()
}
pub fn get_stats(&self) -> &RouteStats { pub fn get_stats(&self) -> &RouteStats {
&self.stats &self.stats
} }

View File

@ -9,7 +9,7 @@ struct CompiledRouteCacheKey {
/// Compiled route (safety route + private route) /// Compiled route (safety route + private route)
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct CompiledRoute { pub(crate) struct CompiledRoute {
/// The safety route attached to the private route /// The safety route attached to the private route
pub safety_route: SafetyRoute, pub safety_route: SafetyRoute,
/// The secret used to encrypt the message payload /// The secret used to encrypt the message payload
@ -20,7 +20,7 @@ pub struct CompiledRoute {
/// Ephemeral data used to help the RouteSpecStore operate efficiently /// Ephemeral data used to help the RouteSpecStore operate efficiently
#[derive(Debug)] #[derive(Debug)]
pub struct RouteSpecStoreCache { pub(super) struct RouteSpecStoreCache {
/// How many times nodes have been used /// How many times nodes have been used
used_nodes: HashMap<PublicKey, usize>, used_nodes: HashMap<PublicKey, usize>,
/// How many times nodes have been used at the terminal point of a route /// How many times nodes have been used at the terminal point of a route

View File

@ -2,7 +2,7 @@ use super::*;
/// The core representation of the RouteSpecStore that can be serialized /// The core representation of the RouteSpecStore that can be serialized
#[derive(Debug, Clone, Default, Serialize, Deserialize)] #[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct RouteSpecStoreContent { pub(super) struct RouteSpecStoreContent {
/// All of the route sets we have allocated so far indexed by key /// All of the route sets we have allocated so far indexed by key
id_by_key: HashMap<PublicKey, RouteId>, id_by_key: HashMap<PublicKey, RouteId>,
/// All of the route sets we have allocated so far /// All of the route sets we have allocated so far

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Clone, Debug, Default, Serialize, Deserialize)] #[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct RouteStats { pub(crate) struct RouteStats {
/// Consecutive failed to send count /// Consecutive failed to send count
#[serde(skip)] #[serde(skip)]
pub failed_to_send: u32, pub failed_to_send: u32,
@ -94,6 +94,7 @@ impl RouteStats {
} }
/// Get the transfer stats /// Get the transfer stats
#[allow(dead_code)]
pub fn transfer_stats(&self) -> &TransferStatsDownUp { pub fn transfer_stats(&self) -> &TransferStatsDownUp {
&self.transfer_stats_down_up &self.transfer_stats_down_up
} }

View File

@ -27,7 +27,7 @@ enum RoutingDomainChange {
}, },
} }
pub struct RoutingDomainEditor { pub(crate) struct RoutingDomainEditor {
routing_table: RoutingTable, routing_table: RoutingTable,
routing_domain: RoutingDomain, routing_domain: RoutingDomain,
changes: Vec<RoutingDomainChange>, changes: Vec<RoutingDomainChange>,

View File

@ -2,7 +2,7 @@ use super::*;
/// Mechanism required to contact another node /// Mechanism required to contact another node
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum ContactMethod { pub(crate) enum ContactMethod {
/// Node is not reachable by any means /// Node is not reachable by any means
Unreachable, Unreachable,
/// Connection should have already existed /// Connection should have already existed
@ -20,7 +20,7 @@ pub enum ContactMethod {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct RoutingDomainDetailCommon { pub(crate) struct RoutingDomainDetailCommon {
routing_domain: RoutingDomain, routing_domain: RoutingDomain,
network_class: Option<NetworkClass>, network_class: Option<NetworkClass>,
outbound_protocols: ProtocolTypeSet, outbound_protocols: ProtocolTypeSet,
@ -216,6 +216,7 @@ impl RoutingDomainDetailCommon {
f(cpi.as_ref().unwrap()) f(cpi.as_ref().unwrap())
} }
#[allow(dead_code)]
pub fn inbound_dial_info_filter(&self) -> DialInfoFilter { pub fn inbound_dial_info_filter(&self) -> DialInfoFilter {
DialInfoFilter::all() DialInfoFilter::all()
.with_protocol_type_set(self.inbound_protocols) .with_protocol_type_set(self.inbound_protocols)
@ -233,7 +234,7 @@ impl RoutingDomainDetailCommon {
} }
/// General trait for all routing domains /// General trait for all routing domains
pub trait RoutingDomainDetail { pub(crate) trait RoutingDomainDetail {
// Common accessors // Common accessors
fn common(&self) -> &RoutingDomainDetailCommon; fn common(&self) -> &RoutingDomainDetailCommon;
fn common_mut(&mut self) -> &mut RoutingDomainDetailCommon; fn common_mut(&mut self) -> &mut RoutingDomainDetailCommon;

View File

@ -102,6 +102,7 @@ impl RoutingTableInner {
self.with_routing_domain(domain, |rd| rd.common().relay_node_last_keepalive()) self.with_routing_domain(domain, |rd| rd.common().relay_node_last_keepalive())
} }
#[allow(dead_code)]
pub fn has_dial_info(&self, domain: RoutingDomain) -> bool { pub fn has_dial_info(&self, domain: RoutingDomain) -> bool {
self.with_routing_domain(domain, |rd| !rd.common().dial_info_details().is_empty()) self.with_routing_domain(domain, |rd| !rd.common().dial_info_details().is_empty())
} }
@ -233,7 +234,7 @@ impl RoutingTableInner {
let cur_ts = get_aligned_timestamp(); let cur_ts = get_aligned_timestamp();
self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, v| { self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, v| {
v.with_mut(rti, |_rti, e| { v.with_mut(rti, |_rti, e| {
e.set_updated_since_last_network_change(false) e.reset_updated_since_last_network_change();
}); });
Option::<()>::None Option::<()>::None
}); });
@ -265,6 +266,7 @@ impl RoutingTableInner {
} }
/// Return the domain's filter for what we can receivein the form of a dial info filter /// Return the domain's filter for what we can receivein the form of a dial info filter
#[allow(dead_code)]
pub fn get_inbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter { pub fn get_inbound_dial_info_filter(&self, routing_domain: RoutingDomain) -> DialInfoFilter {
self.with_routing_domain(routing_domain, |rdd| { self.with_routing_domain(routing_domain, |rdd| {
rdd.common().inbound_dial_info_filter() rdd.common().inbound_dial_info_filter()
@ -272,6 +274,7 @@ impl RoutingTableInner {
} }
/// Return the domain's filter for what we can receive in the form of a node ref filter /// Return the domain's filter for what we can receive in the form of a node ref filter
#[allow(dead_code)]
pub fn get_inbound_node_ref_filter(&self, routing_domain: RoutingDomain) -> NodeRefFilter { pub fn get_inbound_node_ref_filter(&self, routing_domain: RoutingDomain) -> NodeRefFilter {
let dif = self.get_inbound_dial_info_filter(routing_domain); let dif = self.get_inbound_dial_info_filter(routing_domain);
NodeRefFilter::new() NodeRefFilter::new()
@ -336,7 +339,7 @@ impl RoutingTableInner {
self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, e| { self.with_entries_mut(cur_ts, BucketEntryState::Dead, |rti, e| {
e.with_mut(rti, |_rti, e| { e.with_mut(rti, |_rti, e| {
e.clear_signed_node_info(RoutingDomain::LocalNetwork); e.clear_signed_node_info(RoutingDomain::LocalNetwork);
e.set_updated_since_last_network_change(false); e.reset_updated_since_last_network_change();
}); });
Option::<()>::None Option::<()>::None
}); });
@ -462,32 +465,6 @@ impl RoutingTableInner {
count count
} }
/// Count entries per crypto kind that match some criteria
pub fn get_entry_count_per_crypto_kind(
&self,
routing_domain_set: RoutingDomainSet,
min_state: BucketEntryState,
) -> BTreeMap<CryptoKind, usize> {
let mut counts = BTreeMap::new();
let cur_ts = get_aligned_timestamp();
self.with_entries(cur_ts, min_state, |rti, e| {
if let Some(crypto_kinds) = e.with_inner(|e| {
if e.best_routing_domain(rti, routing_domain_set).is_some() {
Some(e.crypto_kinds())
} else {
None
}
}) {
// Got crypto kinds, add to map
for ck in crypto_kinds {
counts.entry(ck).and_modify(|x| *x += 1).or_insert(1);
}
}
Option::<()>::None
});
counts
}
/// Iterate entries with a filter /// Iterate entries with a filter
pub fn with_entries<T, F: FnMut(&RoutingTableInner, Arc<BucketEntry>) -> Option<T>>( pub fn with_entries<T, F: FnMut(&RoutingTableInner, Arc<BucketEntry>) -> Option<T>>(
&self, &self,
@ -527,7 +504,7 @@ impl RoutingTableInner {
None None
} }
pub fn get_nodes_needing_ping( pub(super) fn get_nodes_needing_ping(
&self, &self,
outer_self: RoutingTable, outer_self: RoutingTable,
routing_domain: RoutingDomain, routing_domain: RoutingDomain,
@ -575,6 +552,7 @@ impl RoutingTableInner {
node_refs node_refs
} }
#[allow(dead_code)]
pub fn get_all_nodes(&self, outer_self: RoutingTable, cur_ts: Timestamp) -> Vec<NodeRef> { pub fn get_all_nodes(&self, outer_self: RoutingTable, cur_ts: Timestamp) -> Vec<NodeRef> {
let mut node_refs = Vec::<NodeRef>::with_capacity(self.bucket_entry_count()); let mut node_refs = Vec::<NodeRef>::with_capacity(self.bucket_entry_count());
self.with_entries(cur_ts, BucketEntryState::Unreliable, |_rti, entry| { self.with_entries(cur_ts, BucketEntryState::Unreliable, |_rti, entry| {

View File

@ -109,10 +109,8 @@ impl RoutingTable {
unord.push( unord.push(
async move { async move {
let out = rpc rpc.rpc_call_status(Destination::direct(relay_nr_filtered), true)
.rpc_call_status(Destination::direct(relay_nr_filtered), true) .await
.await;
out
} }
.instrument(Span::current()) .instrument(Span::current())
.boxed(), .boxed(),

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
use core::convert::TryInto; use core::convert::TryInto;
pub fn encode_address( pub(crate) fn encode_address(
address: &Address, address: &Address,
builder: &mut veilid_capnp::address::Builder, builder: &mut veilid_capnp::address::Builder,
) -> Result<(), RPCError> { ) -> Result<(), RPCError> {
@ -37,7 +37,7 @@ pub fn encode_address(
Ok(()) Ok(())
} }
pub fn decode_address(reader: &veilid_capnp::address::Reader) -> Result<Address, RPCError> { pub(crate) fn decode_address(reader: &veilid_capnp::address::Reader) -> Result<Address, RPCError> {
match reader.reborrow().which() { match reader.reborrow().which() {
Ok(veilid_capnp::address::Which::Ipv4(Ok(v4))) => { Ok(veilid_capnp::address::Which::Ipv4(Ok(v4))) => {
let v4b = v4.get_addr().to_be_bytes(); let v4b = v4.get_addr().to_be_bytes();

View File

@ -1,6 +1,6 @@
use super::*; use super::*;
pub fn encode_address_type_set( pub(crate) fn encode_address_type_set(
address_type_set: &AddressTypeSet, address_type_set: &AddressTypeSet,
builder: &mut veilid_capnp::address_type_set::Builder, builder: &mut veilid_capnp::address_type_set::Builder,
) -> Result<(), RPCError> { ) -> Result<(), RPCError> {
@ -10,7 +10,7 @@ pub fn encode_address_type_set(
Ok(()) Ok(())
} }
pub fn decode_address_type_set( pub(crate) fn decode_address_type_set(
reader: &veilid_capnp::address_type_set::Reader, reader: &veilid_capnp::address_type_set::Reader,
) -> Result<AddressTypeSet, RPCError> { ) -> Result<AddressTypeSet, RPCError> {
let mut out = AddressTypeSet::new(); let mut out = AddressTypeSet::new();

View File

@ -1,7 +1,9 @@
use super::*; use super::*;
use core::convert::TryInto; use core::convert::TryInto;
pub fn decode_dial_info(reader: &veilid_capnp::dial_info::Reader) -> Result<DialInfo, RPCError> { pub(crate) fn decode_dial_info(
reader: &veilid_capnp::dial_info::Reader,
) -> Result<DialInfo, RPCError> {
match reader match reader
.reborrow() .reborrow()
.which() .which()
@ -60,7 +62,7 @@ pub fn decode_dial_info(reader: &veilid_capnp::dial_info::Reader) -> Result<Dial
} }
} }
pub fn encode_dial_info( pub(crate) fn encode_dial_info(
dial_info: &DialInfo, dial_info: &DialInfo,
builder: &mut veilid_capnp::dial_info::Builder, builder: &mut veilid_capnp::dial_info::Builder,
) -> Result<(), RPCError> { ) -> Result<(), RPCError> {

View File

@ -1,6 +1,8 @@
use super::*; use super::*;
pub fn encode_dial_info_class(dial_info_class: DialInfoClass) -> veilid_capnp::DialInfoClass { pub(crate) fn encode_dial_info_class(
dial_info_class: DialInfoClass,
) -> veilid_capnp::DialInfoClass {
match dial_info_class { match dial_info_class {
DialInfoClass::Direct => veilid_capnp::DialInfoClass::Direct, DialInfoClass::Direct => veilid_capnp::DialInfoClass::Direct,
DialInfoClass::Mapped => veilid_capnp::DialInfoClass::Mapped, DialInfoClass::Mapped => veilid_capnp::DialInfoClass::Mapped,
@ -11,7 +13,9 @@ pub fn encode_dial_info_class(dial_info_class: DialInfoClass) -> veilid_capnp::D
} }
} }
pub fn decode_dial_info_class(dial_info_class: veilid_capnp::DialInfoClass) -> DialInfoClass { pub(crate) fn decode_dial_info_class(
dial_info_class: veilid_capnp::DialInfoClass,
) -> DialInfoClass {
match dial_info_class { match dial_info_class {
veilid_capnp::DialInfoClass::Direct => DialInfoClass::Direct, veilid_capnp::DialInfoClass::Direct => DialInfoClass::Direct,
veilid_capnp::DialInfoClass::Mapped => DialInfoClass::Mapped, veilid_capnp::DialInfoClass::Mapped => DialInfoClass::Mapped,

View File

@ -1,6 +1,6 @@
use super::*; use super::*;
pub fn encode_dial_info_detail( pub(crate) fn encode_dial_info_detail(
dial_info_detail: &DialInfoDetail, dial_info_detail: &DialInfoDetail,
builder: &mut veilid_capnp::dial_info_detail::Builder, builder: &mut veilid_capnp::dial_info_detail::Builder,
) -> Result<(), RPCError> { ) -> Result<(), RPCError> {
@ -11,7 +11,7 @@ pub fn encode_dial_info_detail(
Ok(()) Ok(())
} }
pub fn decode_dial_info_detail( pub(crate) fn decode_dial_info_detail(
reader: &veilid_capnp::dial_info_detail::Reader, reader: &veilid_capnp::dial_info_detail::Reader,
) -> Result<DialInfoDetail, RPCError> { ) -> Result<DialInfoDetail, RPCError> {
let dial_info = decode_dial_info( let dial_info = decode_dial_info(

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
use core::convert::TryInto; use core::convert::TryInto;
pub fn decode_key256(public_key: &veilid_capnp::key256::Reader) -> PublicKey { pub(crate) fn decode_key256(public_key: &veilid_capnp::key256::Reader) -> PublicKey {
let u0 = public_key.get_u0().to_be_bytes(); let u0 = public_key.get_u0().to_be_bytes();
let u1 = public_key.get_u1().to_be_bytes(); let u1 = public_key.get_u1().to_be_bytes();
let u2 = public_key.get_u2().to_be_bytes(); let u2 = public_key.get_u2().to_be_bytes();
@ -16,7 +16,7 @@ pub fn decode_key256(public_key: &veilid_capnp::key256::Reader) -> PublicKey {
PublicKey::new(x) PublicKey::new(x)
} }
pub fn encode_key256(key: &PublicKey, builder: &mut veilid_capnp::key256::Builder) { pub(crate) fn encode_key256(key: &PublicKey, builder: &mut veilid_capnp::key256::Builder) {
builder.set_u0(u64::from_be_bytes( builder.set_u0(u64::from_be_bytes(
key.bytes[0..8] key.bytes[0..8]
.try_into() .try_into()

View File

@ -27,21 +27,22 @@ mod tunnel;
mod typed_key; mod typed_key;
mod typed_signature; mod typed_signature;
pub use address::*; pub(in crate::rpc_processor) use operations::*;
pub use address_type_set::*;
pub use dial_info::*; pub(crate) use address::*;
pub use dial_info_class::*; pub(crate) use address_type_set::*;
pub use dial_info_detail::*; pub(crate) use dial_info::*;
pub use key256::*; pub(crate) use dial_info_class::*;
pub use network_class::*; pub(crate) use dial_info_detail::*;
pub use node_info::*; pub(crate) use key256::*;
pub use node_status::*; pub(crate) use network_class::*;
pub use nonce::*; pub(crate) use node_info::*;
pub use operations::*; pub(crate) use node_status::*;
pub use peer_info::*; pub(crate) use nonce::*;
pub use private_safety_route::*; pub(crate) use peer_info::*;
pub use protocol_type_set::*; pub(crate) use private_safety_route::*;
pub use sender_info::*; pub(crate) use protocol_type_set::*;
pub(crate) use sender_info::*;
pub use sequencing::*; pub use sequencing::*;
pub use signal_info::*; pub use signal_info::*;
pub use signature512::*; pub use signature512::*;
@ -59,14 +60,14 @@ pub use typed_signature::*;
use super::*; use super::*;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum QuestionContext { pub(in crate::rpc_processor) enum QuestionContext {
GetValue(ValidateGetValueContext), GetValue(ValidateGetValueContext),
SetValue(ValidateSetValueContext), SetValue(ValidateSetValueContext),
} }
#[derive(Clone)] #[derive(Clone)]
pub struct RPCValidateContext { pub(in crate::rpc_processor) struct RPCValidateContext {
pub crypto: Crypto, pub crypto: Crypto,
pub rpc_processor: RPCProcessor, // pub rpc_processor: RPCProcessor,
pub question_context: Option<QuestionContext>, pub question_context: Option<QuestionContext>,
} }

View File

@ -29,34 +29,34 @@ mod operation_complete_tunnel;
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
mod operation_start_tunnel; mod operation_start_tunnel;
pub use answer::*; pub(in crate::rpc_processor) use answer::*;
pub use operation::*; pub(in crate::rpc_processor) use operation::*;
pub use operation_app_call::*; pub(in crate::rpc_processor) use operation_app_call::*;
pub use operation_app_message::*; pub(in crate::rpc_processor) use operation_app_message::*;
pub use operation_find_node::*; pub(in crate::rpc_processor) use operation_find_node::*;
pub use operation_get_value::*; pub(in crate::rpc_processor) use operation_get_value::*;
pub use operation_return_receipt::*; pub(in crate::rpc_processor) use operation_return_receipt::*;
pub use operation_route::*; pub(in crate::rpc_processor) use operation_route::*;
pub use operation_set_value::*; pub(in crate::rpc_processor) use operation_set_value::*;
pub use operation_signal::*; pub(in crate::rpc_processor) use operation_signal::*;
pub use operation_status::*; pub(in crate::rpc_processor) use operation_status::*;
pub use operation_validate_dial_info::*; pub(in crate::rpc_processor) use operation_validate_dial_info::*;
pub use operation_value_changed::*; pub(in crate::rpc_processor) use operation_value_changed::*;
pub use operation_watch_value::*; pub(in crate::rpc_processor) use operation_watch_value::*;
pub use question::*; pub(in crate::rpc_processor) use question::*;
pub use respond_to::*; pub(in crate::rpc_processor) use respond_to::*;
pub use statement::*; pub(in crate::rpc_processor) use statement::*;
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
pub use operation_find_block::*; pub(in crate::rpc_processor) use operation_find_block::*;
#[cfg(feature = "unstable-blockstore")] #[cfg(feature = "unstable-blockstore")]
pub use operation_supply_block::*; pub(in crate::rpc_processor) use operation_supply_block::*;
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
pub use operation_cancel_tunnel::*; pub(in crate::rpc_processor) use operation_cancel_tunnel::*;
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
pub use operation_complete_tunnel::*; pub(in crate::rpc_processor) use operation_complete_tunnel::*;
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
pub use operation_start_tunnel::*; pub(in crate::rpc_processor) use operation_start_tunnel::*;
use super::*; use super::*;

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct RoutedOperation { pub(in crate::rpc_processor) struct RoutedOperation {
sequencing: Sequencing, sequencing: Sequencing,
signatures: Vec<Signature>, signatures: Vec<Signature>,
nonce: Nonce, nonce: Nonce,
@ -106,7 +106,7 @@ impl RoutedOperation {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct RPCOperationRoute { pub(in crate::rpc_processor) struct RPCOperationRoute {
safety_route: SafetyRoute, safety_route: SafetyRoute,
operation: RoutedOperation, operation: RoutedOperation,
} }

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationStatusQ { pub(in crate::rpc_processor) struct RPCOperationStatusQ {
node_status: Option<NodeStatus>, node_status: Option<NodeStatus>,
} }
@ -43,7 +43,7 @@ impl RPCOperationStatusQ {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationStatusA { pub(in crate::rpc_processor) struct RPCOperationStatusA {
node_status: Option<NodeStatus>, node_status: Option<NodeStatus>,
sender_info: Option<SenderInfo>, sender_info: Option<SenderInfo>,
} }

View File

@ -1,7 +1,7 @@
use super::*; use super::*;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationValidateDialInfo { pub(in crate::rpc_processor) struct RPCOperationValidateDialInfo {
dial_info: DialInfo, dial_info: DialInfo,
receipt: Vec<u8>, receipt: Vec<u8>,
redirect: bool, redirect: bool,

View File

@ -2,7 +2,7 @@ use super::*;
use crate::storage_manager::SignedValueData; use crate::storage_manager::SignedValueData;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCOperationValueChanged { pub(in crate::rpc_processor) struct RPCOperationValueChanged {
key: TypedKey, key: TypedKey,
subkeys: ValueSubkeyRangeSet, subkeys: ValueSubkeyRangeSet,
count: u32, count: u32,

View File

@ -2,7 +2,7 @@ use super::*;
/// Where to send an RPC message /// Where to send an RPC message
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum Destination { pub(crate) enum Destination {
/// Send to node directly /// Send to node directly
Direct { Direct {
/// The node to send to /// The node to send to

View File

@ -8,14 +8,14 @@ where
result: Option<Result<R, RPCError>>, result: Option<Result<R, RPCError>>,
} }
pub type FanoutCallReturnType = RPCNetworkResult<Vec<PeerInfo>>; pub(crate) type FanoutCallReturnType = RPCNetworkResult<Vec<PeerInfo>>;
pub type FanoutNodeInfoFilter = Arc<dyn Fn(&[TypedKey], &NodeInfo) -> bool + Send + Sync>; pub(crate) type FanoutNodeInfoFilter = Arc<dyn Fn(&[TypedKey], &NodeInfo) -> bool + Send + Sync>;
pub fn empty_fanout_node_info_filter() -> FanoutNodeInfoFilter { pub(crate) fn empty_fanout_node_info_filter() -> FanoutNodeInfoFilter {
Arc::new(|_, _| true) Arc::new(|_, _| true)
} }
pub fn capability_fanout_node_info_filter(caps: Vec<Capability>) -> FanoutNodeInfoFilter { pub(crate) fn capability_fanout_node_info_filter(caps: Vec<Capability>) -> FanoutNodeInfoFilter {
Arc::new(move |_, ni| ni.has_capabilities(&caps)) Arc::new(move |_, ni| ni.has_capabilities(&caps))
} }
@ -34,7 +34,7 @@ pub fn capability_fanout_node_info_filter(caps: Vec<Capability>) -> FanoutNodeIn
/// If the algorithm times out, a Timeout result is returned, however operations will still have been performed and a /// If the algorithm times out, a Timeout result is returned, however operations will still have been performed and a
/// timeout is not necessarily indicative of an algorithmic 'failure', just that no definitive stopping condition was found /// timeout is not necessarily indicative of an algorithmic 'failure', just that no definitive stopping condition was found
/// in the given time /// in the given time
pub struct FanoutCall<R, F, C, D> pub(crate) struct FanoutCall<R, F, C, D>
where where
R: Unpin, R: Unpin,
F: Future<Output = FanoutCallReturnType>, F: Future<Output = FanoutCallReturnType>,

View File

@ -1,6 +1,6 @@
use super::*; use super::*;
pub struct FanoutQueue { pub(in crate::rpc_processor) struct FanoutQueue {
crypto_kind: CryptoKind, crypto_kind: CryptoKind,
current_nodes: VecDeque<NodeRef>, current_nodes: VecDeque<NodeRef>,
returned_nodes: HashSet<TypedKey>, returned_nodes: HashSet<TypedKey>,

View File

@ -29,13 +29,12 @@ mod rpc_complete_tunnel;
#[cfg(feature = "unstable-tunnels")] #[cfg(feature = "unstable-tunnels")]
mod rpc_start_tunnel; mod rpc_start_tunnel;
pub use coders::*; pub(crate) use coders::*;
pub use destination::*; pub(crate) use destination::*;
pub use fanout_call::*; pub(crate) use operation_waiter::*;
pub use fanout_queue::*; pub(crate) use rpc_error::*;
pub use operation_waiter::*; pub(crate) use rpc_status::*;
pub use rpc_error::*; pub(crate) use fanout_call::*;
pub use rpc_status::*;
use super::*; use super::*;
@ -43,6 +42,7 @@ use crypto::*;
use futures_util::StreamExt; use futures_util::StreamExt;
use network_manager::*; use network_manager::*;
use routing_table::*; use routing_table::*;
use fanout_queue::*;
use stop_token::future::FutureExt; use stop_token::future::FutureExt;
use storage_manager::*; use storage_manager::*;
@ -268,13 +268,13 @@ enum RPCKind {
///////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////
pub struct RPCProcessorInner { struct RPCProcessorInner {
send_channel: Option<flume::Sender<(Option<Id>, RPCMessageEncoded)>>, send_channel: Option<flume::Sender<(Option<Id>, RPCMessageEncoded)>>,
stop_source: Option<StopSource>, stop_source: Option<StopSource>,
worker_join_handles: Vec<MustJoinHandle<()>>, worker_join_handles: Vec<MustJoinHandle<()>>,
} }
pub struct RPCProcessorUnlockedInner { struct RPCProcessorUnlockedInner {
timeout_us: TimestampDuration, timeout_us: TimestampDuration,
queue_size: u32, queue_size: u32,
concurrency: u32, concurrency: u32,
@ -286,7 +286,7 @@ pub struct RPCProcessorUnlockedInner {
} }
#[derive(Clone)] #[derive(Clone)]
pub struct RPCProcessor { pub(crate) struct RPCProcessor {
crypto: Crypto, crypto: Crypto,
config: VeilidConfig, config: VeilidConfig,
network_manager: NetworkManager, network_manager: NetworkManager,
@ -974,11 +974,16 @@ impl RPCProcessor {
safety_route: Option<PublicKey>, safety_route: Option<PublicKey>,
remote_private_route: Option<PublicKey>, remote_private_route: Option<PublicKey>,
) { ) {
let wants_answer = matches!(rpc_kind, RPCKind::Question);
// Record for node if this was not sent via a route // Record for node if this was not sent via a route
if safety_route.is_none() && remote_private_route.is_none() { if safety_route.is_none() && remote_private_route.is_none() {
node_ref.stats_question_sent(send_ts, bytes, wants_answer); let wants_answer = matches!(rpc_kind, RPCKind::Question);
let is_answer = matches!(rpc_kind, RPCKind::Answer);
if is_answer {
node_ref.stats_answer_sent(bytes);
} else {
node_ref.stats_question_sent(send_ts, bytes, wants_answer);
}
return; return;
} }
@ -1422,7 +1427,7 @@ impl RPCProcessor {
// Validate the RPC operation // Validate the RPC operation
let validate_context = RPCValidateContext { let validate_context = RPCValidateContext {
crypto: self.crypto.clone(), crypto: self.crypto.clone(),
rpc_processor: self.clone(), // rpc_processor: self.clone(),
question_context, question_context,
}; };
operation.validate(&validate_context)?; operation.validate(&validate_context)?;

View File

@ -272,7 +272,8 @@ impl RPCProcessor {
feature = "verbose-tracing", feature = "verbose-tracing",
instrument(level = "trace", skip_all, err) instrument(level = "trace", skip_all, err)
)] )]
pub(crate) async fn process_private_route_first_hop(
async fn process_private_route_first_hop(
&self, &self,
mut routed_operation: RoutedOperation, mut routed_operation: RoutedOperation,
sr_pubkey: TypedKey, sr_pubkey: TypedKey,
@ -336,7 +337,7 @@ impl RPCProcessor {
} }
/// Decrypt route hop data and sign routed operation /// Decrypt route hop data and sign routed operation
pub(crate) fn decrypt_private_route_hop_data( fn decrypt_private_route_hop_data(
&self, &self,
route_hop_data: &RouteHopData, route_hop_data: &RouteHopData,
pr_pubkey: &TypedKey, pr_pubkey: &TypedKey,

View File

@ -115,12 +115,7 @@ impl RPCProcessor {
if protect { if protect {
self.network_manager() self.network_manager()
.connection_manager() .connection_manager()
.protect_connection( .protect_connection(waitable_reply.send_data_method.connection_descriptor);
waitable_reply
.send_data_method
.connection_descriptor
.clone(),
);
} }
// Note what kind of ping this was and to what peer scope // Note what kind of ping this was and to what peer scope

View File

@ -44,7 +44,7 @@ struct StorageManagerUnlockedInner {
} }
#[derive(Clone)] #[derive(Clone)]
pub struct StorageManager { pub(crate) struct StorageManager {
unlocked_inner: Arc<StorageManagerUnlockedInner>, unlocked_inner: Arc<StorageManagerUnlockedInner>,
inner: Arc<AsyncMutex<StorageManagerInner>>, inner: Arc<AsyncMutex<StorageManagerInner>>,
} }