[skip ci] continue refactor

This commit is contained in:
Christien Rioux 2025-01-30 20:49:38 -05:00
parent d196c934cd
commit 6319db2b06
54 changed files with 988 additions and 1168 deletions

View File

@ -12,23 +12,26 @@ impl<T: Send + Sync + 'static> AsAnyArcSendSync for T {
}
}
pub trait VeilidComponent: AsAnyArcSendSync + core::fmt::Debug {
fn registry(&self) -> VeilidComponentRegistry;
pub trait VeilidComponent:
AsAnyArcSendSync + VeilidComponentRegistryAccessor + core::fmt::Debug
{
fn init(&self) -> SendPinBoxFutureLifetime<'_, EyreResult<()>>;
fn post_init(&self) -> SendPinBoxFutureLifetime<'_, EyreResult<()>>;
fn pre_terminate(&self) -> SendPinBoxFutureLifetime<'_, ()>;
fn terminate(&self) -> SendPinBoxFutureLifetime<'_, ()>;
}
pub trait VeilidComponentRegistryAccessor {
fn registry(&self) -> VeilidComponentRegistry;
// Registry shortcuts
fn config(&self) -> VeilidConfig {
self.registry().config()
self.registry().config.clone()
}
fn update_callback(&self) -> UpdateCallback {
self.registry().update_callback()
self.registry().config.update_callback()
}
fn event_bus(&self) -> EventBus {
self.registry().event_bus()
self.registry().event_bus.clone()
}
}
@ -52,6 +55,7 @@ where
struct VeilidComponentRegistryInner {
type_map: HashMap<core::any::TypeId, Arc<dyn VeilidComponent + Send + Sync>>,
init_order: Vec<core::any::TypeId>,
mock: bool,
}
#[derive(Clone, Debug)]
@ -68,6 +72,7 @@ impl VeilidComponentRegistry {
inner: Arc::new(Mutex::new(VeilidComponentRegistryInner {
type_map: HashMap::new(),
init_order: Vec::new(),
mock: false,
})),
config,
event_bus: EventBus::new(),
@ -75,6 +80,11 @@ impl VeilidComponentRegistry {
}
}
pub fn enable_mock(&self) {
let mut inner = self.inner.lock();
inner.mock = true;
}
pub fn register<
T: VeilidComponent + Send + Sync + 'static,
F: FnOnce(VeilidComponentRegistry) -> T,
@ -191,16 +201,6 @@ impl VeilidComponentRegistry {
//////////////////////////////////////////////////////////////
pub fn config(&self) -> VeilidConfig {
self.config.clone()
}
pub fn update_callback(&self) -> UpdateCallback {
self.config.update_callback()
}
pub fn event_bus(&self) -> EventBus {
self.event_bus.clone()
}
pub fn lookup<'a, T: VeilidComponent + Send + Sync + 'static>(
&self,
) -> Option<VeilidComponentGuard<'a, T>> {
@ -218,13 +218,33 @@ impl VeilidComponentRegistry {
}
}
macro_rules! impl_veilid_component {
($component_name:ident) => {
impl VeilidComponent for $component_name {
impl VeilidComponentRegistryAccessor for VeilidComponentRegistry {
fn registry(&self) -> VeilidComponentRegistry {
self.clone()
}
}
////////////////////////////////////////////////////////////////////
macro_rules! impl_veilid_component_registry_accessor {
($struct_name:ident) => {
impl VeilidComponentRegistryAccessor for $struct_name {
fn registry(&self) -> VeilidComponentRegistry {
self.registry.clone()
}
}
};
}
pub(crate) use impl_veilid_component_registry_accessor;
/////////////////////////////////////////////////////////////////////
macro_rules! impl_veilid_component {
($component_name:ident) => {
impl_veilid_component_registry_accessor!($component_name);
impl VeilidComponent for $component_name {
fn init(&self) -> SendPinBoxFutureLifetime<'_, EyreResult<()>> {
Box::pin(async { self.init_async().await })
}
@ -245,3 +265,42 @@ macro_rules! impl_veilid_component {
}
pub(crate) use impl_veilid_component;
/////////////////////////////////////////////////////////////////////
// Utility macro for setting up a background TickTask
// Should be called during new/construction of a component with background tasks
// and before any post-init 'tick' operations are started
macro_rules! impl_setup_task {
($this:expr, $this_type:ty, $task_name:ident, $task_routine:ident ) => {{
let registry = $this.registry();
$this.$task_name.set_routine(move |s, l, t| {
let registry = registry.clone();
Box::pin(async move {
let this = registry.lookup::<$this_type>().unwrap();
this.$task_routine(s, Timestamp::new(l), Timestamp::new(t))
.await
})
});
}};
}
pub(crate) use impl_setup_task;
// Utility macro for setting up an event bus handler
// Should be called after init, during post-init or later
// Subscription should be unsubscribed before termination
macro_rules! impl_subscribe_event_bus {
($this:expr, $this_type:ty, $event_handler:ident ) => {{
let registry = $this.registry();
$this.event_bus().subscribe(move |evt| {
let registry = registry.clone();
Box::pin(async move {
let this = registry.lookup::<$this_type>().unwrap();
this.$event_handler(evt);
})
})
}};
}
pub(crate) use impl_subscribe_event_bus;

View File

@ -1,7 +1,10 @@
use crate::attachment_manager::*;
use crate::attachment_manager::AttachmentManager;
use crate::crypto::Crypto;
use crate::logging::*;
use crate::storage_manager::*;
use crate::network_manager::NetworkManager;
use crate::routing_table::RoutingTable;
use crate::rpc_processor::RPCProcessor;
use crate::storage_manager::StorageManager;
use crate::veilid_api::*;
use crate::veilid_config::*;
use crate::*;
@ -70,6 +73,9 @@ impl VeilidCoreContext {
#[cfg(feature = "unstable-blockstore")]
registry.register(BlockStore::new);
registry.register(StorageManager::new);
registry.register(RoutingTable::new);
registry.register(NetworkManager::new);
registry.register(RPCProcessor::new);
registry.register(AttachmentManager::new);
// Run initialization
@ -83,7 +89,7 @@ impl VeilidCoreContext {
// current subsystem, which is not available until after init succeeds
if let Err(e) = registry.post_init().await {
registry.terminate().await;
return VeilidAPIError::internal(e);
return Err(VeilidAPIError::internal(e));
}
info!("Veilid API startup complete");
@ -129,7 +135,18 @@ impl VeilidCoreContext {
/////////////////////////////////////////////////////////////////////////////
pub trait RegisteredComponents: VeilidComponent {
pub trait RegisteredComponents {
fn protected_store(&self) -> VeilidComponentGuard<'_, ProtectedStore>;
fn crypto(&self) -> VeilidComponentGuard<'_, Crypto>;
fn table_store(&self) -> VeilidComponentGuard<'_, TableStore>;
fn storage_manager(&self) -> VeilidComponentGuard<'_, StorageManager>;
fn routing_table(&self) -> VeilidComponentGuard<'_, RoutingTable>;
fn network_manager(&self) -> VeilidComponentGuard<'_, NetworkManager>;
fn rpc_processor(&self) -> VeilidComponentGuard<'_, RPCProcessor>;
fn attachment_manager(&self) -> VeilidComponentGuard<'_, AttachmentManager>;
}
impl<T: VeilidComponentRegistryAccessor> RegisteredComponents for T {
fn protected_store(&self) -> VeilidComponentGuard<'_, ProtectedStore> {
self.registry().lookup::<ProtectedStore>().unwrap()
}
@ -142,8 +159,19 @@ pub trait RegisteredComponents: VeilidComponent {
fn storage_manager(&self) -> VeilidComponentGuard<'_, StorageManager> {
self.registry().lookup::<StorageManager>().unwrap()
}
fn routing_table(&self) -> VeilidComponentGuard<'_, RoutingTable> {
self.registry().lookup::<RoutingTable>().unwrap()
}
fn network_manager(&self) -> VeilidComponentGuard<'_, NetworkManager> {
self.registry().lookup::<NetworkManager>().unwrap()
}
fn rpc_processor(&self) -> VeilidComponentGuard<'_, RPCProcessor> {
self.registry().lookup::<RPCProcessor>().unwrap()
}
fn attachment_manager(&self) -> VeilidComponentGuard<'_, AttachmentManager> {
self.registry().lookup::<AttachmentManager>().unwrap()
}
}
impl<T: VeilidComponent> RegisteredComponents for T {}
/////////////////////////////////////////////////////////////////////////////

View File

@ -107,6 +107,11 @@ impl ProtectedStore {
Ok(())
}
async fn post_init_async(&self) -> EyreResult<()> {
Ok(())
}
async fn pre_terminate_async(&self) {}
#[instrument(level = "debug", skip(self))]
async fn terminate_async(&self) {
*self.inner.lock() = Self::new_inner();

View File

@ -28,12 +28,15 @@ impl ProtectedStore {
}
#[instrument(level = "debug", skip(self), err)]
pub async fn init(&self) -> EyreResult<()> {
pub(crate) async fn init_async(&self) -> EyreResult<()> {
Ok(())
}
pub(crate) async fn post_init_async(&self) -> EyreResult<()> {}
pub(crate) async fn pre_terminate_async(&self) {}
#[instrument(level = "debug", skip(self))]
pub async fn terminate(&self) {}
pub(crate) async fn terminate_async(&self) {}
fn browser_key_name(&self, key: &str) -> String {
let c = self.config();

View File

@ -23,6 +23,7 @@ pub const ADDRESS_CHECK_CACHE_SIZE: usize = 10;
// TimestampDuration::new(3_600_000_000_u64); // 60 minutes
/// Address checker config
#[derive(Debug)]
pub struct AddressCheckConfig {
pub detect_address_changes: bool,
pub ip6_prefix_size: usize,
@ -44,6 +45,22 @@ pub struct AddressCheck {
address_consistency_table: BTreeMap<AddressCheckCacheKey, LruCache<IpAddr, SocketAddress>>,
}
impl fmt::Debug for AddressCheck {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AddressCheck")
.field("config", &self.config)
//.field("net", &self.net)
.field("current_network_class", &self.current_network_class)
.field("current_addresses", &self.current_addresses)
.field(
"address_inconsistency_table",
&self.address_inconsistency_table,
)
.field("address_consistency_table", &self.address_consistency_table)
.finish()
}
}
impl AddressCheck {
pub fn new(config: AddressCheckConfig, net: Network) -> Self {
Self {

View File

@ -39,7 +39,6 @@ struct AddressFilterUnlockedInner {
max_connection_frequency_per_min: usize,
punishment_duration_min: usize,
dial_info_failure_duration_min: usize,
routing_table: RoutingTable,
}
impl fmt::Debug for AddressFilterUnlockedInner {
@ -69,14 +68,19 @@ impl fmt::Debug for AddressFilterUnlockedInner {
#[derive(Clone, Debug)]
pub(crate) struct AddressFilter {
registry: VeilidComponentRegistry,
unlocked_inner: Arc<AddressFilterUnlockedInner>,
inner: Arc<Mutex<AddressFilterInner>>,
}
impl_veilid_component_registry_accessor!(AddressFilter);
impl AddressFilter {
pub fn new(config: VeilidConfig, routing_table: RoutingTable) -> Self {
pub fn new(registry: VeilidComponentRegistry) -> Self {
let config = registry.config();
let c = config.get();
Self {
registry,
unlocked_inner: Arc::new(AddressFilterUnlockedInner {
max_connections_per_ip4: c.network.max_connections_per_ip4 as usize,
max_connections_per_ip6_prefix: c.network.max_connections_per_ip6_prefix as usize,
@ -86,7 +90,6 @@ impl AddressFilter {
as usize,
punishment_duration_min: PUNISHMENT_DURATION_MIN,
dial_info_failure_duration_min: DIAL_INFO_FAILURE_DURATION_MIN,
routing_table,
}),
inner: Arc::new(Mutex::new(AddressFilterInner {
conn_count_by_ip4: BTreeMap::new(),
@ -192,7 +195,7 @@ impl AddressFilter {
warn!("Forgiving: {}", key);
inner.punishments_by_node_id.remove(&key);
// make the entry alive again if it's still here
if let Ok(Some(nr)) = self.unlocked_inner.routing_table.lookup_node_ref(key) {
if let Ok(Some(nr)) = self.routing_table().lookup_node_ref(key) {
nr.operate_mut(|_rti, e| e.set_punished(None));
}
}

View File

@ -124,24 +124,21 @@ pub enum StartupDisposition {
}
// The mutable state of the network manager
#[derive(Debug)]
struct NetworkManagerInner {
stats: NetworkManagerStats,
client_allowlist: LruCache<TypedKey, ClientAllowlistEntry>,
node_contact_method_cache: LruCache<NodeContactMethodCacheKey, NodeContactMethod>,
address_check: Option<AddressCheck>,
peer_info_change_subscription: Option<EventBusSubscription>,
socket_address_change_subscription: Option<EventBusSubscription>,
}
struct NetworkManagerUnlockedInner {
// Handles
event_bus: EventBus,
config: VeilidConfig,
storage_manager: StorageManager,
table_store: TableStore,
#[cfg(feature = "unstable-blockstore")]
block_store: BlockStore,
crypto: Crypto,
pub(crate) struct NetworkManager {
registry: VeilidComponentRegistry,
inner: Arc<Mutex<NetworkManagerInner>>,
// Accessors
routing_table: RwLock<Option<RoutingTable>>,
address_filter: RwLock<Option<AddressFilter>>,
components: RwLock<Option<NetworkComponents>>,
update_callback: RwLock<Option<UpdateCallback>>,
@ -154,10 +151,22 @@ struct NetworkManagerUnlockedInner {
startup_lock: StartupLock,
}
#[derive(Clone)]
pub(crate) struct NetworkManager {
inner: Arc<Mutex<NetworkManagerInner>>,
unlocked_inner: Arc<NetworkManagerUnlockedInner>,
impl_veilid_component!(NetworkManager);
impl fmt::Debug for NetworkManager {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NetworkManager")
//.field("registry", &self.registry)
.field("inner", &self.inner)
.field("address_filter", &self.address_filter)
// .field("components", &self.components)
// .field("update_callback", &self.update_callback)
// .field("rolling_transfers_task", &self.rolling_transfers_task)
// .field("address_filter_task", &self.address_filter_task)
.field("network_key", &self.network_key)
.field("startup_lock", &self.startup_lock)
.finish()
}
}
impl NetworkManager {
@ -167,52 +176,17 @@ impl NetworkManager {
client_allowlist: LruCache::new_unbounded(),
node_contact_method_cache: LruCache::new(NODE_CONTACT_METHOD_CACHE_SIZE),
address_check: None,
}
}
fn new_unlocked_inner(
event_bus: EventBus,
config: VeilidConfig,
storage_manager: StorageManager,
table_store: TableStore,
#[cfg(feature = "unstable-blockstore")] block_store: BlockStore,
crypto: Crypto,
network_key: Option<SharedSecret>,
) -> NetworkManagerUnlockedInner {
NetworkManagerUnlockedInner {
event_bus,
config: config.clone(),
storage_manager,
table_store,
#[cfg(feature = "unstable-blockstore")]
block_store,
crypto,
address_filter: RwLock::new(None),
routing_table: RwLock::new(None),
components: RwLock::new(None),
update_callback: RwLock::new(None),
rolling_transfers_task: TickTask::new(
"rolling_transfers_task",
ROLLING_TRANSFERS_INTERVAL_SECS,
),
address_filter_task: TickTask::new(
"address_filter_task",
ADDRESS_FILTER_TASK_INTERVAL_SECS,
),
network_key,
startup_lock: StartupLock::new(),
peer_info_change_subscription: None,
socket_address_change_subscription: None,
}
}
pub fn new(
event_bus: EventBus,
config: VeilidConfig,
storage_manager: StorageManager,
table_store: TableStore,
#[cfg(feature = "unstable-blockstore")] block_store: BlockStore,
crypto: Crypto,
) -> Self {
pub fn new(registry: VeilidComponentRegistry) -> Self {
// Make the network key
let network_key = {
let config = registry.config();
let crypto = registry.crypto();
let c = config.get();
let network_key_password = c.network.network_key_password.clone();
let network_key = if let Some(network_key_password) = network_key_password {
@ -239,109 +213,49 @@ impl NetworkManager {
};
let this = Self {
registry,
inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(Self::new_unlocked_inner(
event_bus,
config,
storage_manager,
table_store,
#[cfg(feature = "unstable-blockstore")]
block_store,
crypto,
network_key,
)),
address_filter: RwLock::new(None),
components: RwLock::new(None),
update_callback: RwLock::new(None),
rolling_transfers_task: TickTask::new(
"rolling_transfers_task",
ROLLING_TRANSFERS_INTERVAL_SECS,
),
address_filter_task: TickTask::new(
"address_filter_task",
ADDRESS_FILTER_TASK_INTERVAL_SECS,
),
network_key,
startup_lock: StartupLock::new(),
};
this.setup_tasks();
this
}
pub fn event_bus(&self) -> EventBus {
self.unlocked_inner.event_bus.clone()
}
pub fn config(&self) -> VeilidConfig {
self.unlocked_inner.config.clone()
}
pub fn with_config<F, R>(&self, f: F) -> R
where
F: FnOnce(&VeilidConfigInner) -> R,
{
f(&self.unlocked_inner.config.get())
}
pub fn storage_manager(&self) -> StorageManager {
self.unlocked_inner.storage_manager.clone()
}
pub fn table_store(&self) -> TableStore {
self.unlocked_inner.table_store.clone()
}
#[cfg(feature = "unstable-blockstore")]
pub fn block_store(&self) -> BlockStore {
self.unlocked_inner.block_store.clone()
}
pub fn crypto(&self) -> Crypto {
self.unlocked_inner.crypto.clone()
}
pub fn address_filter(&self) -> AddressFilter {
self.unlocked_inner
.address_filter
.read()
.as_ref()
.unwrap()
.clone()
}
pub fn routing_table(&self) -> RoutingTable {
self.unlocked_inner
.routing_table
.read()
.as_ref()
.unwrap()
.clone()
self.address_filter.read().as_ref().unwrap().clone()
}
fn net(&self) -> Network {
self.unlocked_inner
.components
.read()
.as_ref()
.unwrap()
.net
.clone()
self.components.read().as_ref().unwrap().net.clone()
}
fn opt_net(&self) -> Option<Network> {
self.unlocked_inner
.components
.read()
.as_ref()
.map(|x| x.net.clone())
self.components.read().as_ref().map(|x| x.net.clone())
}
fn receipt_manager(&self) -> ReceiptManager {
self.unlocked_inner
.components
self.components
.read()
.as_ref()
.unwrap()
.receipt_manager
.clone()
}
pub fn rpc_processor(&self) -> RPCProcessor {
self.unlocked_inner
.components
.read()
.as_ref()
.unwrap()
.rpc_processor
.clone()
}
pub fn opt_rpc_processor(&self) -> Option<RPCProcessor> {
self.unlocked_inner
.components
.read()
.as_ref()
.map(|x| x.rpc_processor.clone())
}
pub fn connection_manager(&self) -> ConnectionManager {
self.unlocked_inner
.components
self.components
.read()
.as_ref()
.unwrap()
@ -349,58 +263,26 @@ impl NetworkManager {
.clone()
}
pub fn opt_connection_manager(&self) -> Option<ConnectionManager> {
self.unlocked_inner
.components
self.components
.read()
.as_ref()
.map(|x| x.connection_manager.clone())
}
pub fn update_callback(&self) -> UpdateCallback {
self.unlocked_inner
.update_callback
.read()
.as_ref()
.unwrap()
.clone()
}
#[instrument(level = "debug", skip_all, err)]
pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> {
let routing_table = RoutingTable::new(self.clone());
routing_table.init().await?;
let address_filter = AddressFilter::new(self.config(), routing_table.clone());
*self.unlocked_inner.routing_table.write() = Some(routing_table.clone());
*self.unlocked_inner.address_filter.write() = Some(address_filter);
*self.unlocked_inner.update_callback.write() = Some(update_callback);
// Register event handlers
let this = self.clone();
self.event_bus().subscribe(move |evt| {
let this = this.clone();
Box::pin(async move {
this.peer_info_change_event_handler(evt);
})
});
let this = self.clone();
self.event_bus().subscribe(move |evt| {
let this = this.clone();
Box::pin(async move {
this.socket_address_change_event_handler(evt);
})
});
async fn init_async(&self) -> EyreResult<()> {
let address_filter = AddressFilter::new(self.registry());
*self.address_filter.write() = Some(address_filter);
Ok(())
}
async fn post_init_async(&self) -> EyreResult<()> {}
async fn pre_terminate_async(&self) {}
#[instrument(level = "debug", skip_all)]
pub async fn terminate(&self) {
let routing_table = self.unlocked_inner.routing_table.write().take();
if let Some(routing_table) = routing_table {
routing_table.terminate().await;
}
*self.unlocked_inner.update_callback.write() = None;
}
async fn terminate_async(&self) {}
#[instrument(level = "debug", skip_all, err)]
pub async fn internal_startup(&self) -> EyreResult<StartupDisposition> {
@ -456,7 +338,20 @@ impl NetworkManager {
ip6_prefix_size,
};
let address_check = AddressCheck::new(address_check_config, net.clone());
self.inner.lock().address_check = Some(address_check);
// Register event handlers
let peer_info_change_subscription =
impl_subscribe_event_bus!(self, Self, peer_info_change_event_handler);
let socket_address_change_subscription =
impl_subscribe_event_bus!(self, Self, socket_address_change_event_handler);
{
let mut inner = self.inner.lock();
inner.address_check = Some(address_check);
inner.peer_info_change_subscription = Some(peer_info_change_subscription);
inner.socket_address_change_subscription = Some(socket_address_change_subscription);
}
rpc_processor.startup().await?;
receipt_manager.startup().await?;
@ -495,8 +390,17 @@ impl NetworkManager {
// Cancel all tasks
self.cancel_tasks().await;
// Shutdown address check
self.inner.lock().address_check = Option::<AddressCheck>::None;
// Shutdown event bus subscriptions and address check
{
let mut inner = self.inner.lock();
if let Some(sub) = inner.socket_address_change_subscription.take() {
self.event_bus().unsubscribe(sub);
}
if let Some(sub) = inner.peer_info_change_subscription.take() {
self.event_bus().unsubscribe(sub);
}
inner.address_check = None;
}
// Shutdown network components if they started up
log_net!(debug "shutting down network components");

View File

@ -44,8 +44,6 @@ struct DiscoveryContextInner {
}
struct DiscoveryContextUnlockedInner {
routing_table: RoutingTable,
net: Network,
config: DiscoveryContextConfig,
// per-protocol
@ -54,25 +52,26 @@ struct DiscoveryContextUnlockedInner {
#[derive(Clone)]
pub(super) struct DiscoveryContext {
registry: VeilidComponentRegistry,
unlocked_inner: Arc<DiscoveryContextUnlockedInner>,
inner: Arc<Mutex<DiscoveryContextInner>>,
}
impl VeilidComponentRegistryAccessor for DiscoveryContext {
fn registry(&self) -> VeilidComponentRegistry {
self.registry.clone()
}
}
impl DiscoveryContext {
pub fn new(routing_table: RoutingTable, net: Network, config: DiscoveryContextConfig) -> Self {
let intf_addrs = Self::get_local_addresses(
routing_table.clone(),
config.protocol_type,
config.address_type,
);
pub fn new(registry: VeilidComponentRegistry, config: DiscoveryContextConfig) -> Self {
let routing_table = registry.routing_table();
let intf_addrs =
Self::get_local_addresses(&routing_table, config.protocol_type, config.address_type);
Self {
unlocked_inner: Arc::new(DiscoveryContextUnlockedInner {
routing_table,
net,
config,
intf_addrs,
}),
registry,
unlocked_inner: Arc::new(DiscoveryContextUnlockedInner { config, intf_addrs }),
inner: Arc::new(Mutex::new(DiscoveryContextInner {
external_info: Vec::new(),
})),
@ -85,7 +84,7 @@ impl DiscoveryContext {
// This pulls the already-detected local interface dial info from the routing table
#[instrument(level = "trace", skip(routing_table), ret)]
fn get_local_addresses(
routing_table: RoutingTable,
routing_table: &RoutingTable,
protocol_type: ProtocolType,
address_type: AddressType,
) -> Vec<SocketAddress> {
@ -109,7 +108,7 @@ impl DiscoveryContext {
// This is done over the normal port using RPC
#[instrument(level = "trace", skip(self), ret)]
async fn request_public_address(&self, node_ref: FilteredNodeRef) -> Option<SocketAddress> {
let rpc = self.unlocked_inner.routing_table.rpc_processor();
let rpc = self.rpc_processor();
let res = network_result_value_or_log!(match rpc.rpc_call_status(Destination::direct(node_ref.clone())).await {
Ok(v) => v,
@ -138,7 +137,7 @@ impl DiscoveryContext {
#[instrument(level = "trace", skip(self), ret)]
async fn discover_external_addresses(&self) -> bool {
let node_count = {
let config = self.unlocked_inner.routing_table.network_manager().config();
let config = self.registry.config();
let c = config.get();
c.network.dht.max_find_node_count as usize
};
@ -188,10 +187,11 @@ impl DiscoveryContext {
]);
// Find public nodes matching this filter
let nodes = self
.unlocked_inner
.routing_table
.find_fast_non_local_nodes_filtered(routing_domain, node_count, filters);
let nodes = self.routing_table().find_fast_non_local_nodes_filtered(
routing_domain,
node_count,
filters,
);
if nodes.is_empty() {
log_net!(debug
"no external address detection peers of type {:?}:{:?}",

View File

@ -3,7 +3,7 @@ use super::*;
impl Network {
#[instrument(level = "trace", target = "net", skip_all, err)]
pub(super) async fn network_interfaces_task_routine(
self,
&self,
_stop_token: StopToken,
_l: Timestamp,
_t: Timestamp,

View File

@ -8,7 +8,7 @@ type InboundProtocolMap = HashMap<(AddressType, LowLevelProtocolType, u16), Vec<
impl Network {
#[instrument(parent = None, level = "trace", skip(self), err)]
pub async fn update_network_class_task_routine(
self,
&self,
stop_token: StopToken,
l: Timestamp,
t: Timestamp,
@ -156,7 +156,7 @@ impl Network {
port,
};
context_configs.insert(dcc);
let discovery_context = DiscoveryContext::new(self.routing_table(), self.clone(), dcc);
let discovery_context = DiscoveryContext::new(self.registry(), dcc);
discovery_context.discover(&mut unord).await;
}

View File

@ -1,7 +1,7 @@
use super::*;
// Statistics per address
#[derive(Clone, Default)]
#[derive(Clone, Debug, Default)]
pub struct PerAddressStats {
pub last_seen_ts: Timestamp,
pub transfer_stats_accounting: TransferStatsAccounting,
@ -18,7 +18,7 @@ impl Default for PerAddressStatsKey {
}
// Statistics about the low-level network
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct NetworkManagerStats {
pub self_stats: PerAddressStats,
pub per_address_stats: LruCache<PerAddressStatsKey, PerAddressStats>,

View File

@ -640,7 +640,7 @@ impl BucketEntryInner {
only_live: bool,
filter: NodeRefFilter,
) -> Vec<(Flow, Timestamp)> {
let opt_connection_manager = rti.unlocked_inner.network_manager.opt_connection_manager();
let opt_connection_manager = rti.network_manager().opt_connection_manager();
let mut out: Vec<(Flow, Timestamp)> = self
.last_flows

View File

@ -35,7 +35,6 @@ impl RoutingTable {
let valid_envelope_versions = VALID_ENVELOPE_VERSIONS.map(|x| x.to_string()).join(",");
let node_ids = self
.unlocked_inner
.node_ids()
.iter()
.map(|x| x.to_string())
@ -57,7 +56,7 @@ impl RoutingTable {
pub fn debug_info_nodeid(&self) -> String {
let mut out = String::new();
for nid in self.unlocked_inner.node_ids().iter() {
for nid in self.node_ids().iter() {
out += &format!("{}\n", nid);
}
out
@ -66,7 +65,7 @@ impl RoutingTable {
pub fn debug_info_nodeinfo(&self) -> String {
let mut out = String::new();
let inner = self.inner.read();
out += &format!("Node Ids: {}\n", self.unlocked_inner.node_ids());
out += &format!("Node Ids: {}\n", self.node_ids());
out += &format!(
"Self Transfer Stats:\n{}",
indent_all_string(&inner.self_transfer_stats)
@ -250,7 +249,7 @@ impl RoutingTable {
out += &format!("{:?}: {}: {}\n", routing_domain, crypto_kind, count);
}
for ck in &VALID_CRYPTO_KINDS {
let our_node_id = self.unlocked_inner.node_id(*ck);
let our_node_id = self.node_id(*ck);
let mut filtered_total = 0;
let mut b = 0;
@ -319,7 +318,7 @@ impl RoutingTable {
) -> String {
let cur_ts = Timestamp::now();
let relay_node_filter = self.make_public_internet_relay_node_filter();
let our_node_ids = self.unlocked_inner.node_ids();
let our_node_ids = self.node_ids();
let mut relay_count = 0usize;
let mut relaying_count = 0usize;
@ -340,7 +339,7 @@ impl RoutingTable {
node_count,
filters,
|_rti, entry: Option<Arc<BucketEntry>>| {
NodeRef::new(self.clone(), entry.unwrap().clone())
NodeRef::new(self.registry(), entry.unwrap().clone())
},
);
let mut out = String::new();

View File

@ -42,10 +42,9 @@ impl RoutingTable {
) as RoutingTableEntryFilter;
let filters = VecDeque::from([filter]);
let node_count = {
let c = self.config.get();
c.network.dht.max_find_node_count as usize
};
let node_count = self
.config()
.with(|c| c.network.dht.max_find_node_count as usize);
let closest_nodes = match self.find_preferred_closest_nodes(
node_count,
@ -82,11 +81,13 @@ impl RoutingTable {
// find N nodes closest to the target node in our routing table
// ensure the nodes returned are only the ones closer to the target node than ourself
let Some(vcrypto) = self.crypto().get(crypto_kind) else {
let crypto = self.crypto();
let Some(vcrypto) = crypto.get(crypto_kind) else {
return NetworkResult::invalid_message("unsupported cryptosystem");
};
let vcrypto = &*vcrypto;
let own_distance = vcrypto.distance(&own_node_id.value, &key.value);
let vcrypto2 = vcrypto.clone();
let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
@ -121,10 +122,9 @@ impl RoutingTable {
) as RoutingTableEntryFilter;
let filters = VecDeque::from([filter]);
let node_count = {
let c = self.config.get();
c.network.dht.max_find_node_count as usize
};
let node_count = self
.config()
.with(|c| c.network.dht.max_find_node_count as usize);
//
let closest_nodes = match self.find_preferred_closest_nodes(
@ -147,7 +147,7 @@ impl RoutingTable {
// Validate peers returned are, in fact, closer to the key than the node we sent this to
// This same test is used on the other side so we vet things here
let valid = match Self::verify_peers_closer(vcrypto2, own_node_id, key, &closest_nodes) {
let valid = match Self::verify_peers_closer(vcrypto, own_node_id, key, &closest_nodes) {
Ok(v) => v,
Err(e) => {
panic!("missing cryptosystem in peers node ids: {}", e);
@ -165,8 +165,8 @@ impl RoutingTable {
/// Determine if set of peers is closer to key_near than key_far is to key_near
#[instrument(level = "trace", target = "rtab", skip_all, err)]
pub fn verify_peers_closer(
vcrypto: CryptoSystemVersion,
pub fn verify_peers_closer<'a>(
vcrypto: &'a (dyn CryptoSystem + Send + Sync),
key_far: TypedKey,
key_near: TypedKey,
peers: &[Arc<PeerInfo>],

View File

@ -91,11 +91,9 @@ pub struct RecentPeersEntry {
pub last_connection: Flow,
}
pub(crate) struct RoutingTableUnlockedInner {
// Accessors
event_bus: EventBus,
config: VeilidConfig,
network_manager: NetworkManager,
pub(crate) struct RoutingTable {
registry: VeilidComponentRegistry,
inner: Arc<RwLock<RoutingTableInner>>,
/// The current node's public DHT keys
node_id: TypedKeyGroup,
@ -131,26 +129,155 @@ pub(crate) struct RoutingTableUnlockedInner {
private_route_management_task: TickTask<EyreReport>,
}
impl RoutingTableUnlockedInner {
pub fn network_manager(&self) -> NetworkManager {
self.network_manager.clone()
impl fmt::Debug for RoutingTable {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RoutingTable")
// .field("inner", &self.inner)
// .field("unlocked_inner", &self.unlocked_inner)
.finish()
}
pub fn crypto(&self) -> Crypto {
self.network_manager().crypto()
}
impl_veilid_component!(RoutingTable);
impl RoutingTable {
pub fn new(registry: VeilidComponentRegistry) -> Self {
let config = registry.config();
let c = config.get();
let inner = Arc::new(RwLock::new(RoutingTableInner::new(registry.clone())));
let this = Self {
registry,
inner,
node_id: c.network.routing_table.node_id.clone(),
node_id_secret: c.network.routing_table.node_id_secret.clone(),
kick_queue: Mutex::new(BTreeSet::default()),
rolling_transfers_task: TickTask::new(
"rolling_transfers_task",
ROLLING_TRANSFERS_INTERVAL_SECS,
),
update_state_stats_task: TickTask::new(
"update_state_stats_task",
UPDATE_STATE_STATS_INTERVAL_SECS,
),
rolling_answers_task: TickTask::new(
"rolling_answers_task",
ROLLING_ANSWER_INTERVAL_SECS,
),
kick_buckets_task: TickTask::new("kick_buckets_task", 1),
bootstrap_task: TickTask::new("bootstrap_task", 1),
peer_minimum_refresh_task: TickTask::new("peer_minimum_refresh_task", 1),
closest_peers_refresh_task: TickTask::new_ms(
"closest_peers_refresh_task",
c.network.dht.min_peer_refresh_time_ms,
),
ping_validator_public_internet_task: TickTask::new(
"ping_validator_public_internet_task",
1,
),
ping_validator_local_network_task: TickTask::new(
"ping_validator_local_network_task",
1,
),
ping_validator_public_internet_relay_task: TickTask::new(
"ping_validator_public_internet_relay_task",
1,
),
ping_validator_active_watch_task: TickTask::new("ping_validator_active_watch_task", 1),
relay_management_task: TickTask::new(
"relay_management_task",
RELAY_MANAGEMENT_INTERVAL_SECS,
),
private_route_management_task: TickTask::new(
"private_route_management_task",
PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS,
),
};
this.setup_tasks();
this
}
pub fn rpc_processor(&self) -> RPCProcessor {
self.network_manager().rpc_processor()
/////////////////////////////////////
/// Initialization
/// Called to initialize the routing table after it is created
async fn init_async(&self) -> EyreResult<()> {
log_rtab!(debug "starting routing table init");
// Set up routing buckets
{
let mut inner = self.inner.write();
inner.init_buckets();
}
// Load bucket entries from table db if possible
log_rtab!(debug "loading routing table entries");
if let Err(e) = self.load_buckets().await {
log_rtab!(debug "Error loading buckets from storage: {:#?}. Resetting.", e);
let mut inner = self.inner.write();
inner.init_buckets();
}
// Set up routespecstore
log_rtab!(debug "starting route spec store init");
let route_spec_store = match RouteSpecStore::load(self.registry()).await {
Ok(v) => v,
Err(e) => {
log_rtab!(debug "Error loading route spec store: {:#?}. Resetting.", e);
RouteSpecStore::new(self.registry())
}
};
log_rtab!(debug "finished route spec store init");
{
let mut inner = self.inner.write();
inner.route_spec_store = Some(route_spec_store);
}
log_rtab!(debug "finished routing table init");
Ok(())
}
pub fn update_callback(&self) -> UpdateCallback {
self.network_manager().update_callback()
async fn post_init_async(&self) -> EyreResult<()> {
Ok(())
}
pub fn with_config<F, R>(&self, f: F) -> R
where
F: FnOnce(&VeilidConfigInner) -> R,
{
f(&self.config.get())
async fn pre_terminate_async(&self) {
// Stop tasks
self.cancel_tasks().await;
}
/// Called to shut down the routing table
async fn terminate_async(&self) {
log_rtab!(debug "starting routing table terminate");
// Load bucket entries from table db if possible
log_rtab!(debug "saving routing table entries");
if let Err(e) = self.save_buckets().await {
error!("failed to save routing table entries: {}", e);
}
log_rtab!(debug "saving route spec store");
let rss = {
let mut inner = self.inner.write();
inner.route_spec_store.take()
};
if let Some(rss) = rss {
if let Err(e) = rss.save().await {
error!("couldn't save route spec store: {}", e);
}
}
log_rtab!(debug "shutting down routing table");
let mut inner = self.inner.write();
*inner = RoutingTableInner::new(self.registry());
log_rtab!(debug "finished routing table terminate");
}
///////////////////////////////////////////////////////////////////
pub fn node_id(&self, kind: CryptoKind) -> TypedKey {
self.node_id.get(kind).unwrap()
}
@ -206,169 +333,6 @@ impl RoutingTableUnlockedInner {
.unwrap(),
)
}
}
#[derive(Clone)]
pub(crate) struct RoutingTable {
inner: Arc<RwLock<RoutingTableInner>>,
unlocked_inner: Arc<RoutingTableUnlockedInner>,
}
impl RoutingTable {
fn new_unlocked_inner(
event_bus: EventBus,
config: VeilidConfig,
network_manager: NetworkManager,
) -> RoutingTableUnlockedInner {
let c = config.get();
RoutingTableUnlockedInner {
event_bus,
config: config.clone(),
network_manager,
node_id: c.network.routing_table.node_id.clone(),
node_id_secret: c.network.routing_table.node_id_secret.clone(),
kick_queue: Mutex::new(BTreeSet::default()),
rolling_transfers_task: TickTask::new(
"rolling_transfers_task",
ROLLING_TRANSFERS_INTERVAL_SECS,
),
update_state_stats_task: TickTask::new(
"update_state_stats_task",
UPDATE_STATE_STATS_INTERVAL_SECS,
),
rolling_answers_task: TickTask::new(
"rolling_answers_task",
ROLLING_ANSWER_INTERVAL_SECS,
),
kick_buckets_task: TickTask::new("kick_buckets_task", 1),
bootstrap_task: TickTask::new("bootstrap_task", 1),
peer_minimum_refresh_task: TickTask::new("peer_minimum_refresh_task", 1),
closest_peers_refresh_task: TickTask::new_ms(
"closest_peers_refresh_task",
c.network.dht.min_peer_refresh_time_ms,
),
ping_validator_public_internet_task: TickTask::new(
"ping_validator_public_internet_task",
1,
),
ping_validator_local_network_task: TickTask::new(
"ping_validator_local_network_task",
1,
),
ping_validator_public_internet_relay_task: TickTask::new(
"ping_validator_public_internet_relay_task",
1,
),
ping_validator_active_watch_task: TickTask::new("ping_validator_active_watch_task", 1),
relay_management_task: TickTask::new(
"relay_management_task",
RELAY_MANAGEMENT_INTERVAL_SECS,
),
private_route_management_task: TickTask::new(
"private_route_management_task",
PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS,
),
}
}
pub fn new(network_manager: NetworkManager) -> Self {
let event_bus = network_manager.event_bus();
let config = network_manager.config();
let unlocked_inner = Arc::new(Self::new_unlocked_inner(event_bus, config, network_manager));
let inner = Arc::new(RwLock::new(RoutingTableInner::new(unlocked_inner.clone())));
let this = Self {
inner,
unlocked_inner,
};
this.setup_tasks();
this
}
/////////////////////////////////////
/// Initialization
/// Called to initialize the routing table after it is created
pub async fn init(&self) -> EyreResult<()> {
log_rtab!(debug "starting routing table init");
// Set up routing buckets
{
let mut inner = self.inner.write();
inner.init_buckets();
}
// Load bucket entries from table db if possible
log_rtab!(debug "loading routing table entries");
if let Err(e) = self.load_buckets().await {
log_rtab!(debug "Error loading buckets from storage: {:#?}. Resetting.", e);
let mut inner = self.inner.write();
inner.init_buckets();
}
// Set up routespecstore
log_rtab!(debug "starting route spec store init");
let route_spec_store = match RouteSpecStore::load(self.clone()).await {
Ok(v) => v,
Err(e) => {
log_rtab!(debug "Error loading route spec store: {:#?}. Resetting.", e);
RouteSpecStore::new(self.clone())
}
};
log_rtab!(debug "finished route spec store init");
{
let mut inner = self.inner.write();
inner.route_spec_store = Some(route_spec_store);
}
// Inform storage manager we are up
self.network_manager
.storage_manager()
.set_routing_table(Some(self.clone()))
.await;
log_rtab!(debug "finished routing table init");
Ok(())
}
/// Called to shut down the routing table
pub async fn terminate(&self) {
log_rtab!(debug "starting routing table terminate");
// Stop storage manager from using us
self.network_manager
.storage_manager()
.set_routing_table(None)
.await;
// Stop tasks
self.cancel_tasks().await;
// Load bucket entries from table db if possible
log_rtab!(debug "saving routing table entries");
if let Err(e) = self.save_buckets().await {
error!("failed to save routing table entries: {}", e);
}
log_rtab!(debug "saving route spec store");
let rss = {
let mut inner = self.inner.write();
inner.route_spec_store.take()
};
if let Some(rss) = rss {
if let Err(e) = rss.save().await {
error!("couldn't save route spec store: {}", e);
}
}
log_rtab!(debug "shutting down routing table");
let mut inner = self.inner.write();
*inner = RoutingTableInner::new(self.unlocked_inner.clone());
log_rtab!(debug "finished routing table terminate");
}
/// Serialize the routing table.
fn serialized_buckets(&self) -> (SerializedBucketMap, SerializedBuckets) {
@ -406,7 +370,7 @@ impl RoutingTable {
async fn save_buckets(&self) -> EyreResult<()> {
let (serialized_bucket_map, all_entry_bytes) = self.serialized_buckets();
let table_store = self.unlocked_inner.network_manager().table_store();
let table_store = self.table_store();
let tdb = table_store.open(ROUTING_TABLE, 1).await?;
let dbx = tdb.transact();
if let Err(e) = dbx.store_json(0, SERIALIZED_BUCKET_MAP, &serialized_bucket_map) {
@ -420,12 +384,14 @@ impl RoutingTable {
dbx.commit().await?;
Ok(())
}
/// Deserialize routing table from table store
async fn load_buckets(&self) -> EyreResult<()> {
// Make a cache validity key of all our node ids and our bootstrap choice
let mut cache_validity_key: Vec<u8> = Vec::new();
{
let c = self.unlocked_inner.config.get();
let config = self.config();
let c = config.get();
for ck in VALID_CRYPTO_KINDS {
if let Some(nid) = c.network.routing_table.node_id.get(ck) {
cache_validity_key.append(&mut nid.value.bytes.to_vec());
@ -446,7 +412,7 @@ impl RoutingTable {
};
// Deserialize bucket map and all entries from the table store
let table_store = self.unlocked_inner.network_manager().table_store();
let table_store = self.table_store();
let db = table_store.open(ROUTING_TABLE, 1).await?;
let caches_valid = match db.load(0, CACHE_VALIDITY_KEY).await? {
@ -479,14 +445,13 @@ impl RoutingTable {
// Reconstruct all entries
let inner = &mut *self.inner.write();
self.populate_routing_table(inner, serialized_bucket_map, all_entry_bytes)?;
Self::populate_routing_table_inner(inner, serialized_bucket_map, all_entry_bytes)?;
Ok(())
}
/// Write the deserialized table store data to the routing table.
pub fn populate_routing_table(
&self,
pub fn populate_routing_table_inner(
inner: &mut RoutingTableInner,
serialized_bucket_map: SerializedBucketMap,
all_entry_bytes: SerializedBuckets,
@ -542,16 +507,14 @@ impl RoutingTable {
self.inner.read().routing_domain_for_address(address)
}
pub fn route_spec_store(&self) -> RwLockReadGuard<'_, RouteSpecStore> {
self.inner.read().route_spec_store.as_ref().unwrap().clone()
}
pub fn route_spec_store_mut(&self) -> RwLockReadGuard<'_, RouteSpecStore> {
self.inner
.write()
.route_spec_store
.as_ref()
.unwrap()
.clone()
pub fn route_spec_store(&self) -> Option<MappedRwLockReadGuard<'_, RouteSpecStore>> {
let inner = self.inner.read();
if inner.route_spec_store.is_none() {
return None;
}
Some(RwLockReadGuard::map(inner, |x| {
x.route_spec_store.as_ref().unwrap()
}))
}
pub fn relay_node(&self, domain: RoutingDomain) -> Option<FilteredNodeRef> {
@ -664,7 +627,7 @@ impl RoutingTable {
) -> Vec<FilteredNodeRef> {
self.inner
.read()
.get_nodes_needing_ping(self.clone(), routing_domain, cur_ts)
.get_nodes_needing_ping(self.registry(), routing_domain, cur_ts)
}
fn queue_bucket_kicks(&self, node_ids: TypedKeyGroup) {
@ -675,21 +638,19 @@ impl RoutingTable {
}
// Put it in the kick queue
let x = self.unlocked_inner.calculate_bucket_index(node_id);
self.unlocked_inner.kick_queue.lock().insert(x);
let x = self.calculate_bucket_index(node_id);
self.kick_queue.lock().insert(x);
}
}
/// Resolve an existing routing table entry using any crypto kind and return a reference to it
pub fn lookup_any_node_ref(&self, node_id_key: PublicKey) -> EyreResult<Option<NodeRef>> {
self.inner
.read()
.lookup_any_node_ref(self.clone(), node_id_key)
self.inner.read().lookup_any_node_ref(node_id_key)
}
/// Resolve an existing routing table entry and return a reference to it
pub fn lookup_node_ref(&self, node_id: TypedKey) -> EyreResult<Option<NodeRef>> {
self.inner.read().lookup_node_ref(self.clone(), node_id)
self.inner.read().lookup_node_ref(node_id)
}
/// Resolve an existing routing table entry and return a filtered reference to it
@ -700,12 +661,9 @@ impl RoutingTable {
routing_domain_set: RoutingDomainSet,
dial_info_filter: DialInfoFilter,
) -> EyreResult<Option<FilteredNodeRef>> {
self.inner.read().lookup_and_filter_noderef(
self.clone(),
node_id,
routing_domain_set,
dial_info_filter,
)
self.inner
.read()
.lookup_and_filter_noderef(node_id, routing_domain_set, dial_info_filter)
}
/// Shortcut function to add a node to our routing table if it doesn't exist
@ -719,7 +677,7 @@ impl RoutingTable {
) -> EyreResult<FilteredNodeRef> {
self.inner
.write()
.register_node_with_peer_info(self.clone(), peer_info, allow_invalid)
.register_node_with_peer_info(peer_info, allow_invalid)
}
/// Shortcut function to add a node to our routing table if it doesn't exist
@ -734,7 +692,7 @@ impl RoutingTable {
) -> EyreResult<FilteredNodeRef> {
self.inner
.write()
.register_node_with_id(self.clone(), routing_domain, node_id, timestamp)
.register_node_with_id(routing_domain, node_id, timestamp)
}
//////////////////////////////////////////////////////////////////////
@ -893,7 +851,7 @@ impl RoutingTable {
filters: VecDeque<RoutingTableEntryFilter>,
) -> Vec<NodeRef> {
self.inner.read().find_fast_non_local_nodes_filtered(
self.clone(),
self.registry(),
routing_domain,
node_count,
filters,
@ -979,7 +937,7 @@ impl RoutingTable {
protocol_types_len * 2 * max_per_type,
filters,
|_rti, entry: Option<Arc<BucketEntry>>| {
NodeRef::new(self.clone(), entry.unwrap().clone())
NodeRef::new(self.registry(), entry.unwrap().clone())
},
)
}
@ -1081,7 +1039,6 @@ impl RoutingTable {
let res = network_result_try!(
rpc_processor
.clone()
.rpc_call_find_node(
Destination::direct(node_ref.default_filtered()),
node_id,
@ -1170,11 +1127,3 @@ impl RoutingTable {
}
}
}
impl core::ops::Deref for RoutingTable {
type Target = RoutingTableUnlockedInner;
fn deref(&self) -> &Self::Target {
&self.unlocked_inner
}
}

View File

@ -1,7 +1,7 @@
use super::*;
pub(crate) struct FilteredNodeRef {
routing_table: RoutingTable,
registry: VeilidComponentRegistry,
entry: Arc<BucketEntry>,
filter: NodeRefFilter,
sequencing: Sequencing,
@ -9,9 +9,15 @@ pub(crate) struct FilteredNodeRef {
track_id: usize,
}
impl VeilidComponentRegistryAccessor for FilteredNodeRef {
fn registry(&self) -> VeilidComponentRegistry {
self.registry.clone()
}
}
impl FilteredNodeRef {
pub fn new(
routing_table: RoutingTable,
registry: VeilidComponentRegistry,
entry: Arc<BucketEntry>,
filter: NodeRefFilter,
sequencing: Sequencing,
@ -19,7 +25,7 @@ impl FilteredNodeRef {
entry.ref_count.fetch_add(1u32, Ordering::AcqRel);
Self {
routing_table,
registry,
entry,
filter,
sequencing,
@ -29,7 +35,7 @@ impl FilteredNodeRef {
}
pub fn unfiltered(&self) -> NodeRef {
NodeRef::new(self.routing_table.clone(), self.entry.clone())
NodeRef::new(self.registry(), self.entry.clone())
}
pub fn filtered_clone(&self, filter: NodeRefFilter) -> FilteredNodeRef {
@ -40,7 +46,7 @@ impl FilteredNodeRef {
pub fn sequencing_clone(&self, sequencing: Sequencing) -> FilteredNodeRef {
FilteredNodeRef::new(
self.routing_table.clone(),
self.registry.clone(),
self.entry.clone(),
self.filter(),
sequencing,
@ -70,9 +76,6 @@ impl FilteredNodeRef {
}
impl NodeRefAccessorsTrait for FilteredNodeRef {
fn routing_table(&self) -> RoutingTable {
self.routing_table.clone()
}
fn entry(&self) -> Arc<BucketEntry> {
self.entry.clone()
}
@ -105,7 +108,8 @@ impl NodeRefOperateTrait for FilteredNodeRef {
where
F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> T,
{
let inner = &*self.routing_table.inner.read();
let routing_table = self.registry.routing_table();
let inner = &*routing_table.inner.read();
self.entry.with(inner, f)
}
@ -113,7 +117,8 @@ impl NodeRefOperateTrait for FilteredNodeRef {
where
F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> T,
{
let inner = &mut *self.routing_table.inner.write();
let routing_table = self.registry.routing_table();
let inner = &mut *routing_table.inner.write();
self.entry.with_mut(inner, f)
}
}
@ -125,7 +130,7 @@ impl Clone for FilteredNodeRef {
self.entry.ref_count.fetch_add(1u32, Ordering::AcqRel);
Self {
routing_table: self.routing_table.clone(),
registry: self.registry.clone(),
entry: self.entry.clone(),
filter: self.filter,
sequencing: self.sequencing,
@ -162,7 +167,7 @@ impl Drop for FilteredNodeRef {
// get node ids with inner unlocked because nothing could be referencing this entry now
// and we don't know when it will get dropped, possibly inside a lock
let node_ids = self.entry.with_inner(|e| e.node_ids());
self.routing_table.queue_bucket_kicks(node_ids);
self.routing_table().queue_bucket_kicks(node_ids);
}
}
}

View File

@ -16,18 +16,24 @@ pub(crate) use traits::*;
// Default NodeRef
pub(crate) struct NodeRef {
routing_table: RoutingTable,
registry: VeilidComponentRegistry,
entry: Arc<BucketEntry>,
#[cfg(feature = "tracking")]
track_id: usize,
}
impl VeilidComponentRegistryAccessor for NodeRef {
fn registry(&self) -> VeilidComponentRegistry {
self.registry.clone()
}
}
impl NodeRef {
pub fn new(routing_table: RoutingTable, entry: Arc<BucketEntry>) -> Self {
pub fn new(registry: VeilidComponentRegistry, entry: Arc<BucketEntry>) -> Self {
entry.ref_count.fetch_add(1u32, Ordering::AcqRel);
Self {
routing_table,
registry,
entry,
#[cfg(feature = "tracking")]
track_id: entry.track(),
@ -36,7 +42,7 @@ impl NodeRef {
pub fn default_filtered(&self) -> FilteredNodeRef {
FilteredNodeRef::new(
self.routing_table.clone(),
self.registry.clone(),
self.entry.clone(),
NodeRefFilter::new(),
Sequencing::default(),
@ -45,7 +51,7 @@ impl NodeRef {
pub fn sequencing_filtered(&self, sequencing: Sequencing) -> FilteredNodeRef {
FilteredNodeRef::new(
self.routing_table.clone(),
self.registry.clone(),
self.entry.clone(),
NodeRefFilter::new(),
sequencing,
@ -57,7 +63,7 @@ impl NodeRef {
routing_domain_set: R,
) -> FilteredNodeRef {
FilteredNodeRef::new(
self.routing_table.clone(),
self.registry.clone(),
self.entry.clone(),
NodeRefFilter::new().with_routing_domain_set(routing_domain_set.into()),
Sequencing::default(),
@ -66,7 +72,7 @@ impl NodeRef {
pub fn custom_filtered(&self, filter: NodeRefFilter) -> FilteredNodeRef {
FilteredNodeRef::new(
self.routing_table.clone(),
self.registry.clone(),
self.entry.clone(),
filter,
Sequencing::default(),
@ -76,7 +82,7 @@ impl NodeRef {
#[expect(dead_code)]
pub fn dial_info_filtered(&self, filter: DialInfoFilter) -> FilteredNodeRef {
FilteredNodeRef::new(
self.routing_table.clone(),
self.registry.clone(),
self.entry.clone(),
NodeRefFilter::new().with_dial_info_filter(filter),
Sequencing::default(),
@ -92,9 +98,6 @@ impl NodeRef {
}
impl NodeRefAccessorsTrait for NodeRef {
fn routing_table(&self) -> RoutingTable {
self.routing_table.clone()
}
fn entry(&self) -> Arc<BucketEntry> {
self.entry.clone()
}
@ -125,7 +128,8 @@ impl NodeRefOperateTrait for NodeRef {
where
F: FnOnce(&RoutingTableInner, &BucketEntryInner) -> T,
{
let inner = &*self.routing_table.inner.read();
let routing_table = self.routing_table();
let inner = &*routing_table.inner.read();
self.entry.with(inner, f)
}
@ -133,7 +137,8 @@ impl NodeRefOperateTrait for NodeRef {
where
F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner) -> T,
{
let inner = &mut *self.routing_table.inner.write();
let routing_table = self.routing_table();
let inner = &mut *routing_table.inner.write();
self.entry.with_mut(inner, f)
}
}
@ -145,7 +150,7 @@ impl Clone for NodeRef {
self.entry.ref_count.fetch_add(1u32, Ordering::AcqRel);
Self {
routing_table: self.routing_table.clone(),
registry: self.registry.clone(),
entry: self.entry.clone(),
#[cfg(feature = "tracking")]
track_id: self.entry.write().track(),
@ -178,7 +183,7 @@ impl Drop for NodeRef {
// get node ids with inner unlocked because nothing could be referencing this entry now
// and we don't know when it will get dropped, possibly inside a lock
let node_ids = self.entry.with_inner(|e| e.node_ids());
self.routing_table.queue_bucket_kicks(node_ids);
self.routing_table().queue_bucket_kicks(node_ids);
}
}
}

View File

@ -15,6 +15,21 @@ pub(crate) struct NodeRefLock<
nr: N,
}
impl<
'a,
N: NodeRefAccessorsTrait
+ NodeRefOperateTrait
+ VeilidComponentRegistryAccessor
+ fmt::Debug
+ fmt::Display
+ Clone,
> VeilidComponentRegistryAccessor for NodeRefLock<'a, N>
{
fn registry(&self) -> VeilidComponentRegistry {
self.nr.registry()
}
}
impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone>
NodeRefLock<'a, N>
{
@ -33,9 +48,6 @@ impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Disp
impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone>
NodeRefAccessorsTrait for NodeRefLock<'a, N>
{
fn routing_table(&self) -> RoutingTable {
self.nr.routing_table()
}
fn entry(&self) -> Arc<BucketEntry> {
self.nr.entry()
}

View File

@ -15,6 +15,21 @@ pub(crate) struct NodeRefLockMut<
nr: N,
}
impl<
'a,
N: NodeRefAccessorsTrait
+ NodeRefOperateTrait
+ VeilidComponentRegistryAccessor
+ fmt::Debug
+ fmt::Display
+ Clone,
> VeilidComponentRegistryAccessor for NodeRefLockMut<'a, N>
{
fn registry(&self) -> VeilidComponentRegistry {
self.nr.registry()
}
}
impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone>
NodeRefLockMut<'a, N>
{
@ -34,9 +49,6 @@ impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Disp
impl<'a, N: NodeRefAccessorsTrait + NodeRefOperateTrait + fmt::Debug + fmt::Display + Clone>
NodeRefAccessorsTrait for NodeRefLockMut<'a, N>
{
fn routing_table(&self) -> RoutingTable {
self.nr.routing_table()
}
fn entry(&self) -> Arc<BucketEntry> {
self.nr.entry()
}

View File

@ -2,7 +2,6 @@ use super::*;
// Field accessors
pub(crate) trait NodeRefAccessorsTrait {
fn routing_table(&self) -> RoutingTable;
fn entry(&self) -> Arc<BucketEntry>;
fn sequencing(&self) -> Sequencing;
fn routing_domain_set(&self) -> RoutingDomainSet;
@ -125,12 +124,12 @@ pub(crate) trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait
};
// If relay is ourselves, then return None, because we can't relay through ourselves
// and to contact this node we should have had an existing inbound connection
if rti.unlocked_inner.matches_own_node_id(rpi.node_ids()) {
if rti.routing_table().matches_own_node_id(rpi.node_ids()) {
bail!("Can't relay though ourselves");
}
// Register relay node and return noderef
let nr = rti.register_node_with_peer_info(self.routing_table(), rpi, false)?;
let nr = rti.register_node_with_peer_info(rpi, false)?;
Ok(Some(nr))
})
}

View File

@ -31,7 +31,7 @@ pub(crate) enum RouteNode {
}
impl RouteNode {
pub fn validate(&self, crypto: Crypto) -> VeilidAPIResult<()> {
pub fn validate(&self, crypto: &Crypto) -> VeilidAPIResult<()> {
match self {
RouteNode::NodeId(_) => Ok(()),
RouteNode::PeerInfo(pi) => pi.validate(crypto),
@ -91,7 +91,7 @@ pub(crate) struct RouteHop {
pub next_hop: Option<RouteHopData>,
}
impl RouteHop {
pub fn validate(&self, crypto: Crypto) -> VeilidAPIResult<()> {
pub fn validate(&self, crypto: &Crypto) -> VeilidAPIResult<()> {
self.node.validate(crypto)
}
}
@ -108,7 +108,7 @@ pub(crate) enum PrivateRouteHops {
}
impl PrivateRouteHops {
pub fn validate(&self, crypto: Crypto) -> VeilidAPIResult<()> {
pub fn validate(&self, crypto: &Crypto) -> VeilidAPIResult<()> {
match self {
PrivateRouteHops::FirstHop(rh) => rh.validate(crypto),
PrivateRouteHops::Data(_) => Ok(()),
@ -138,7 +138,7 @@ impl PrivateRoute {
}
}
pub fn validate(&self, crypto: Crypto) -> VeilidAPIResult<()> {
pub fn validate(&self, crypto: &Crypto) -> VeilidAPIResult<()> {
self.hops.validate(crypto)
}

View File

@ -34,53 +34,40 @@ struct RouteSpecStoreInner {
cache: RouteSpecStoreCache,
}
struct RouteSpecStoreUnlockedInner {
/// Handle to routing table
routing_table: RoutingTable,
/// The routing table's storage for private/safety routes
#[derive(Debug)]
pub(crate) struct RouteSpecStore {
registry: VeilidComponentRegistry,
inner: Arc<Mutex<RouteSpecStoreInner>>,
/// Maximum number of hops in a route
max_route_hop_count: usize,
/// Default number of hops in a route
default_route_hop_count: usize,
}
impl fmt::Debug for RouteSpecStoreUnlockedInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RouteSpecStoreUnlockedInner")
.field("max_route_hop_count", &self.max_route_hop_count)
.field("default_route_hop_count", &self.default_route_hop_count)
.finish()
}
}
/// The routing table's storage for private/safety routes
#[derive(Clone, Debug)]
pub(crate) struct RouteSpecStore {
inner: Arc<Mutex<RouteSpecStoreInner>>,
unlocked_inner: Arc<RouteSpecStoreUnlockedInner>,
}
impl_veilid_component_registry_accessor!(RouteSpecStore);
impl RouteSpecStore {
pub fn new(routing_table: RoutingTable) -> Self {
let config = routing_table.network_manager().config();
pub fn new(registry: VeilidComponentRegistry) -> Self {
let config = registry.config();
let c = config.get();
Self {
unlocked_inner: Arc::new(RouteSpecStoreUnlockedInner {
max_route_hop_count: c.network.rpc.max_route_hop_count.into(),
default_route_hop_count: c.network.rpc.default_route_hop_count.into(),
routing_table,
}),
registry,
inner: Arc::new(Mutex::new(RouteSpecStoreInner {
content: RouteSpecStoreContent::new(),
cache: Default::default(),
})),
max_route_hop_count: c.network.rpc.max_route_hop_count.into(),
default_route_hop_count: c.network.rpc.default_route_hop_count.into(),
}
}
#[instrument(level = "trace", target = "route", skip(routing_table), err)]
pub async fn load(routing_table: RoutingTable) -> EyreResult<RouteSpecStore> {
#[instrument(level = "trace", target = "route", skip_all, err)]
pub async fn load(registry: VeilidComponentRegistry) -> EyreResult<RouteSpecStore> {
let (max_route_hop_count, default_route_hop_count) = {
let config = routing_table.network_manager().config();
let config = registry.config();
let c = config.get();
(
c.network.rpc.max_route_hop_count as usize,
@ -89,7 +76,10 @@ impl RouteSpecStore {
};
// Get frozen blob from table store
let content = RouteSpecStoreContent::load(routing_table.clone()).await?;
let table_store = registry.lookup::<TableStore>().unwrap();
let routing_table = registry.lookup::<RoutingTable>().unwrap();
let content = RouteSpecStoreContent::load(&table_store, &routing_table).await?;
let mut inner = RouteSpecStoreInner {
content,
@ -104,12 +94,10 @@ impl RouteSpecStore {
// Return the loaded RouteSpecStore
let rss = RouteSpecStore {
unlocked_inner: Arc::new(RouteSpecStoreUnlockedInner {
max_route_hop_count,
default_route_hop_count,
routing_table: routing_table.clone(),
}),
registry,
inner: Arc::new(Mutex::new(inner)),
max_route_hop_count,
default_route_hop_count,
};
Ok(rss)
@ -123,9 +111,8 @@ impl RouteSpecStore {
};
// Save our content
content
.save(self.unlocked_inner.routing_table.clone())
.await?;
let table_store = self.registry.lookup::<TableStore>().unwrap();
content.save(&table_store).await?;
Ok(())
}
@ -146,16 +133,17 @@ impl RouteSpecStore {
dead_remote_routes,
}));
let update_callback = self.unlocked_inner.routing_table.update_callback();
let update_callback = self.registry.update_callback();
update_callback(update);
}
/// Purge the route spec store
pub async fn purge(&self) -> VeilidAPIResult<()> {
// Briefly pause routing table ticker while changes are made
let _tick_guard = self.unlocked_inner.routing_table.pause_tasks().await;
self.unlocked_inner.routing_table.cancel_tasks().await;
let routing_table = self.registry.lookup::<RoutingTable>().unwrap();
let _tick_guard = routing_table.pause_tasks().await;
routing_table.cancel_tasks().await;
{
let inner = &mut *self.inner.lock();
inner.content = Default::default();
@ -181,7 +169,7 @@ impl RouteSpecStore {
automatic: bool,
) -> VeilidAPIResult<RouteId> {
let inner = &mut *self.inner.lock();
let routing_table = self.unlocked_inner.routing_table.clone();
let routing_table = self.routing_table();
let rti = &mut *routing_table.inner.write();
self.allocate_route_inner(
@ -213,12 +201,10 @@ impl RouteSpecStore {
apibail_generic!("safety_spec.preferred_route must be empty when allocating new route");
}
let ip6_prefix_size = rti
.unlocked_inner
.config
.get()
.network
.max_connections_per_ip6_prefix_size as usize;
let ip6_prefix_size = self
.registry()
.config()
.with(|c| c.network.max_connections_per_ip6_prefix_size as usize);
if safety_spec.hop_count < 1 {
apibail_invalid_argument!(
@ -228,7 +214,7 @@ impl RouteSpecStore {
);
}
if safety_spec.hop_count > self.unlocked_inner.max_route_hop_count {
if safety_spec.hop_count > self.max_route_hop_count {
apibail_invalid_argument!(
"Not allocating route longer than max route hop count",
"hop_count",
@ -492,9 +478,8 @@ impl RouteSpecStore {
})
};
let routing_table = self.unlocked_inner.routing_table.clone();
let transform = |_rti: &RoutingTableInner, entry: Option<Arc<BucketEntry>>| -> NodeRef {
NodeRef::new(routing_table.clone(), entry.unwrap())
NodeRef::new(self.registry(), entry.unwrap())
};
// Pull the whole routing table in sorted order
@ -667,13 +652,9 @@ impl RouteSpecStore {
// Got a unique route, lets build the details, register it, and return it
let hop_node_refs: Vec<NodeRef> = route_nodes.iter().map(|k| nodes[*k].clone()).collect();
let mut route_set = BTreeMap::<PublicKey, RouteSpecDetail>::new();
let crypto = self.crypto();
for crypto_kind in crypto_kinds.iter().copied() {
let vcrypto = self
.unlocked_inner
.routing_table
.crypto()
.get(crypto_kind)
.unwrap();
let vcrypto = crypto.get(crypto_kind).unwrap();
let keypair = vcrypto.generate_keypair();
let hops: Vec<PublicKey> = route_nodes
.iter()
@ -734,7 +715,7 @@ impl RouteSpecStore {
R: fmt::Debug,
{
let inner = &*self.inner.lock();
let crypto = self.unlocked_inner.routing_table.crypto();
let crypto = self.crypto();
let Some(vcrypto) = crypto.get(public_key.kind) else {
log_rpc!(debug "can't handle route with public key: {:?}", public_key);
return None;
@ -852,7 +833,7 @@ impl RouteSpecStore {
};
// Test with double-round trip ping to self
let rpc_processor = self.unlocked_inner.routing_table.rpc_processor();
let rpc_processor = self.rpc_processor();
let _res = match rpc_processor.rpc_call_status(dest).await? {
NetworkResult::Value(v) => v,
_ => {
@ -886,7 +867,7 @@ impl RouteSpecStore {
// Get a safety route that is good enough
let safety_spec = SafetySpec {
preferred_route: None,
hop_count: self.unlocked_inner.default_route_hop_count,
hop_count: self.default_route_hop_count,
stability,
sequencing,
};
@ -900,8 +881,7 @@ impl RouteSpecStore {
};
// Test with double-round trip ping to self
let rpc_processor = self.unlocked_inner.routing_table.rpc_processor();
let _res = match rpc_processor.rpc_call_status(dest).await? {
let _res = match self.rpc_processor().rpc_call_status(dest).await? {
NetworkResult::Value(v) => v,
_ => {
// Did not error, but did not come back, just return false
@ -1097,7 +1077,7 @@ impl RouteSpecStore {
) -> VeilidAPIResult<CompiledRoute> {
// let profile_start_ts = get_timestamp();
let inner = &mut *self.inner.lock();
let routing_table = self.unlocked_inner.routing_table.clone();
let routing_table = self.routing_table();
let rti = &mut *routing_table.inner.write();
// Get useful private route properties
@ -1108,7 +1088,7 @@ impl RouteSpecStore {
};
let pr_pubkey = private_route.public_key.value;
let pr_hopcount = private_route.hop_count as usize;
let max_route_hop_count = self.unlocked_inner.max_route_hop_count;
let max_route_hop_count = self.max_route_hop_count;
// Check private route hop count isn't larger than the max route hop count plus one for the 'first hop' header
if pr_hopcount > (max_route_hop_count + 1) {
@ -1130,10 +1110,10 @@ impl RouteSpecStore {
let opt_first_hop = match pr_first_hop_node {
RouteNode::NodeId(id) => rti
.lookup_node_ref(routing_table.clone(), TypedKey::new(crypto_kind, id))
.lookup_node_ref(TypedKey::new(crypto_kind, id))
.map_err(VeilidAPIError::internal)?,
RouteNode::PeerInfo(pi) => Some(
rti.register_node_with_peer_info(routing_table.clone(), pi, false)
rti.register_node_with_peer_info(pi, false)
.map_err(VeilidAPIError::internal)?
.unfiltered(),
),
@ -1362,7 +1342,7 @@ impl RouteSpecStore {
avoid_nodes: &[TypedKey],
) -> VeilidAPIResult<PublicKey> {
// Ensure the total hop count isn't too long for our config
let max_route_hop_count = self.unlocked_inner.max_route_hop_count;
let max_route_hop_count = self.max_route_hop_count;
if safety_spec.hop_count == 0 {
apibail_invalid_argument!(
"safety route hop count is zero",
@ -1438,7 +1418,7 @@ impl RouteSpecStore {
avoid_nodes: &[TypedKey],
) -> VeilidAPIResult<PublicKey> {
let inner = &mut *self.inner.lock();
let routing_table = self.unlocked_inner.routing_table.clone();
let routing_table = self.routing_table();
let rti = &mut *routing_table.inner.write();
self.get_route_for_safety_spec_inner(
@ -1457,7 +1437,7 @@ impl RouteSpecStore {
rsd: &RouteSpecDetail,
optimized: bool,
) -> VeilidAPIResult<PrivateRoute> {
let routing_table = self.unlocked_inner.routing_table.clone();
let routing_table = self.routing_table();
let rti = &*routing_table.inner.read();
// Ensure we get the crypto for it
@ -1732,8 +1712,7 @@ impl RouteSpecStore {
cur_ts: Timestamp,
) -> VeilidAPIResult<()> {
let Some(our_node_info_ts) = self
.unlocked_inner
.routing_table
.routing_table()
.get_published_peer_info(RoutingDomain::PublicInternet)
.map(|pi| pi.signed_node_info().timestamp())
else {
@ -1767,11 +1746,7 @@ impl RouteSpecStore {
let inner = &mut *self.inner.lock();
// Check for stub route
if self
.unlocked_inner
.routing_table
.matches_own_node_id_key(key)
{
if self.routing_table().matches_own_node_id_key(key) {
return None;
}
@ -1869,7 +1844,7 @@ impl RouteSpecStore {
/// Convert binary blob to private route vector
pub fn blob_to_private_routes(&self, blob: Vec<u8>) -> VeilidAPIResult<Vec<PrivateRoute>> {
// Get crypto
let crypto = self.unlocked_inner.routing_table.crypto();
let crypto = self.crypto();
// Deserialize count
if blob.is_empty() {
@ -1904,7 +1879,7 @@ impl RouteSpecStore {
let private_route = decode_private_route(&decode_context, &pr_reader).map_err(|e| {
VeilidAPIError::invalid_argument("failed to decode private route", "e", e)
})?;
private_route.validate(crypto.clone()).map_err(|e| {
private_route.validate(&crypto).map_err(|e| {
VeilidAPIError::invalid_argument("failed to validate private route", "e", e)
})?;
@ -1920,7 +1895,7 @@ impl RouteSpecStore {
/// Generate RouteId from typed key set of route public keys
fn generate_allocated_route_id(&self, rssd: &RouteSetSpecDetail) -> VeilidAPIResult<RouteId> {
let route_set_keys = rssd.get_route_set_keys();
let crypto = self.unlocked_inner.routing_table.crypto();
let crypto = self.crypto();
let mut idbytes = Vec::with_capacity(PUBLIC_KEY_LENGTH * route_set_keys.len());
let mut best_kind: Option<CryptoKind> = None;
@ -1945,7 +1920,7 @@ impl RouteSpecStore {
&self,
private_routes: &[PrivateRoute],
) -> VeilidAPIResult<RouteId> {
let crypto = self.unlocked_inner.routing_table.crypto();
let crypto = self.crypto();
let mut idbytes = Vec::with_capacity(PUBLIC_KEY_LENGTH * private_routes.len());
let mut best_kind: Option<CryptoKind> = None;

View File

@ -17,9 +17,11 @@ impl RouteSpecStoreContent {
}
}
pub async fn load(routing_table: RoutingTable) -> EyreResult<RouteSpecStoreContent> {
pub async fn load(
table_store: &TableStore,
routing_table: &RoutingTable,
) -> EyreResult<RouteSpecStoreContent> {
// Deserialize what we can
let table_store = routing_table.network_manager().table_store();
let rsstdb = table_store.open("RouteSpecStore", 1).await?;
let mut content: RouteSpecStoreContent =
rsstdb.load_json(0, b"content").await?.unwrap_or_default();
@ -59,10 +61,9 @@ impl RouteSpecStoreContent {
Ok(content)
}
pub async fn save(&self, routing_table: RoutingTable) -> EyreResult<()> {
pub async fn save(&self, table_store: &TableStore) -> EyreResult<()> {
// Save all the fields we care about to the frozen blob in table storage
// This skips #[with(Skip)] saving the secret keys, we save them in the protected store instead
let table_store = routing_table.network_manager().table_store();
let rsstdb = table_store.open("RouteSpecStore", 1).await?;
rsstdb.store_json(0, b"content", self).await?;

View File

@ -15,8 +15,8 @@ pub type EntryCounts = BTreeMap<(RoutingDomain, CryptoKind), usize>;
/// RoutingTable rwlock-internal data
pub struct RoutingTableInner {
/// Extra pointer to unlocked members to simplify access
pub(super) unlocked_inner: Arc<RoutingTableUnlockedInner>,
/// Convenience accessor for the global component registry
pub(super) registry: VeilidComponentRegistry,
/// Routing table buckets that hold references to entries, per crypto kind
pub(super) buckets: BTreeMap<CryptoKind, Vec<Bucket>>,
/// A weak set of all the entries we have in the buckets for faster iteration
@ -44,10 +44,12 @@ pub struct RoutingTableInner {
pub(super) opt_active_watch_keepalive_ts: Option<Timestamp>,
}
impl_veilid_component_registry_accessor!(RoutingTableInner);
impl RoutingTableInner {
pub(super) fn new(unlocked_inner: Arc<RoutingTableUnlockedInner>) -> RoutingTableInner {
pub(super) fn new(registry: VeilidComponentRegistry) -> RoutingTableInner {
RoutingTableInner {
unlocked_inner,
registry,
buckets: BTreeMap::new(),
public_internet_routing_domain: PublicInternetRoutingDomainDetail::default(),
local_network_routing_domain: LocalNetworkRoutingDomainDetail::default(),
@ -458,7 +460,7 @@ impl RoutingTableInner {
// Collect all entries that are 'needs_ping' and have some node info making them reachable somehow
pub(super) fn get_nodes_needing_ping(
&self,
outer_self: RoutingTable,
registry: VeilidComponentRegistry,
routing_domain: RoutingDomain,
cur_ts: Timestamp,
) -> Vec<FilteredNodeRef> {
@ -559,7 +561,7 @@ impl RoutingTableInner {
let transform = |_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
FilteredNodeRef::new(
outer_self.clone(),
self.registry.clone(),
v.unwrap().clone(),
NodeRefFilter::new().with_routing_domain(routing_domain),
Sequencing::default(),
@ -570,10 +572,10 @@ impl RoutingTableInner {
}
#[expect(dead_code)]
pub fn get_all_alive_nodes(&self, outer_self: RoutingTable, cur_ts: Timestamp) -> Vec<NodeRef> {
pub fn get_all_alive_nodes(&self, cur_ts: Timestamp) -> Vec<NodeRef> {
let mut node_refs = Vec::<NodeRef>::with_capacity(self.bucket_entry_count());
self.with_entries(cur_ts, BucketEntryState::Unreliable, |_rti, entry| {
node_refs.push(NodeRef::new(outer_self.clone(), entry));
node_refs.push(NodeRef::new(self.registry(), entry));
Option::<()>::None
});
node_refs
@ -687,15 +689,16 @@ impl RoutingTableInner {
#[instrument(level = "trace", skip_all, err)]
fn create_node_ref<F>(
&mut self,
outer_self: RoutingTable,
node_ids: &TypedKeyGroup,
update_func: F,
) -> EyreResult<NodeRef>
where
F: FnOnce(&mut RoutingTableInner, &mut BucketEntryInner),
{
let routing_table = self.routing_table();
// Ensure someone isn't trying register this node itself
if self.unlocked_inner.matches_own_node_id(node_ids) {
if routing_table.matches_own_node_id(node_ids) {
bail!("can't register own node");
}
@ -708,7 +711,7 @@ impl RoutingTableInner {
continue;
}
// Find the first in crypto sort order
let bucket_index = self.unlocked_inner.calculate_bucket_index(node_id);
let bucket_index = routing_table.calculate_bucket_index(node_id);
let bucket = self.get_bucket(bucket_index);
if let Some(entry) = bucket.entry(&node_id.value) {
// Best entry is the first one in sorted order that exists from the node id list
@ -730,7 +733,7 @@ impl RoutingTableInner {
}
// Make a noderef to return
let nr = NodeRef::new(outer_self.clone(), best_entry.clone());
let nr = NodeRef::new(self.registry(), best_entry.clone());
// Update the entry with the update func
best_entry.with_mut_inner(|e| update_func(self, e));
@ -741,11 +744,11 @@ impl RoutingTableInner {
// If no entry exists yet, add the first entry to a bucket, possibly evicting a bucket member
let first_node_id = node_ids[0];
let bucket_entry = self.unlocked_inner.calculate_bucket_index(&first_node_id);
let bucket_entry = routing_table.calculate_bucket_index(&first_node_id);
let bucket = self.get_bucket_mut(bucket_entry);
let new_entry = bucket.add_new_entry(first_node_id.value);
self.all_entries.insert(new_entry.clone());
self.unlocked_inner.kick_queue.lock().insert(bucket_entry);
routing_table.kick_queue.lock().insert(bucket_entry);
// Update the other bucket entries with the remaining node ids
if let Err(e) = self.update_bucket_entry_node_ids(new_entry.clone(), node_ids) {
@ -753,7 +756,7 @@ impl RoutingTableInner {
}
// Make node ref to return
let nr = NodeRef::new(outer_self.clone(), new_entry.clone());
let nr = NodeRef::new(self.registry(), new_entry.clone());
// Update the entry with the update func
new_entry.with_mut_inner(|e| update_func(self, e));
@ -766,15 +769,9 @@ impl RoutingTableInner {
/// Resolve an existing routing table entry using any crypto kind and return a reference to it
#[instrument(level = "trace", skip_all, err)]
pub fn lookup_any_node_ref(
&self,
outer_self: RoutingTable,
node_id_key: PublicKey,
) -> EyreResult<Option<NodeRef>> {
pub fn lookup_any_node_ref(&self, node_id_key: PublicKey) -> EyreResult<Option<NodeRef>> {
for ck in VALID_CRYPTO_KINDS {
if let Some(nr) =
self.lookup_node_ref(outer_self.clone(), TypedKey::new(ck, node_id_key))?
{
if let Some(nr) = self.lookup_node_ref(TypedKey::new(ck, node_id_key))? {
return Ok(Some(nr));
}
}
@ -783,35 +780,30 @@ impl RoutingTableInner {
/// Resolve an existing routing table entry and return a reference to it
#[instrument(level = "trace", skip_all, err)]
pub fn lookup_node_ref(
&self,
outer_self: RoutingTable,
node_id: TypedKey,
) -> EyreResult<Option<NodeRef>> {
if self.unlocked_inner.matches_own_node_id(&[node_id]) {
pub fn lookup_node_ref(&self, node_id: TypedKey) -> EyreResult<Option<NodeRef>> {
if self.routing_table().matches_own_node_id(&[node_id]) {
bail!("can't look up own node id in routing table");
}
if !VALID_CRYPTO_KINDS.contains(&node_id.kind) {
bail!("can't look up node id with invalid crypto kind");
}
let bucket_index = self.unlocked_inner.calculate_bucket_index(&node_id);
let bucket_index = self.routing_table().calculate_bucket_index(&node_id);
let bucket = self.get_bucket(bucket_index);
Ok(bucket
.entry(&node_id.value)
.map(|e| NodeRef::new(outer_self, e)))
.map(|e| NodeRef::new(self.registry(), e)))
}
/// Resolve an existing routing table entry and return a filtered reference to it
#[instrument(level = "trace", skip_all, err)]
pub fn lookup_and_filter_noderef(
&self,
outer_self: RoutingTable,
node_id: TypedKey,
routing_domain_set: RoutingDomainSet,
dial_info_filter: DialInfoFilter,
) -> EyreResult<Option<FilteredNodeRef>> {
let nr = self.lookup_node_ref(outer_self, node_id)?;
let nr = self.lookup_node_ref(node_id)?;
Ok(nr.map(|nr| {
nr.custom_filtered(
NodeRefFilter::new()
@ -826,7 +818,7 @@ impl RoutingTableInner {
where
F: FnOnce(Arc<BucketEntry>) -> R,
{
if self.unlocked_inner.matches_own_node_id(&[node_id]) {
if self.routing_table().matches_own_node_id(&[node_id]) {
log_rtab!(error "can't look up own node id in routing table");
return None;
}
@ -834,7 +826,7 @@ impl RoutingTableInner {
log_rtab!(error "can't look up node id with invalid crypto kind");
return None;
}
let bucket_entry = self.unlocked_inner.calculate_bucket_index(&node_id);
let bucket_entry = self.routing_table().calculate_bucket_index(&node_id);
let bucket = self.get_bucket(bucket_entry);
bucket.entry(&node_id.value).map(f)
}
@ -845,7 +837,6 @@ impl RoutingTableInner {
#[instrument(level = "trace", skip_all, err)]
pub fn register_node_with_peer_info(
&mut self,
outer_self: RoutingTable,
peer_info: Arc<PeerInfo>,
allow_invalid: bool,
) -> EyreResult<FilteredNodeRef> {
@ -853,7 +844,7 @@ impl RoutingTableInner {
// if our own node is in the list, then ignore it as we don't add ourselves to our own routing table
if self
.unlocked_inner
.routing_table()
.matches_own_node_id(peer_info.node_ids())
{
bail!("can't register own node id in routing table");
@ -891,10 +882,10 @@ impl RoutingTableInner {
if let Some(relay_peer_info) = peer_info.signed_node_info().relay_peer_info(routing_domain)
{
if !self
.unlocked_inner
.routing_table()
.matches_own_node_id(relay_peer_info.node_ids())
{
self.register_node_with_peer_info(outer_self.clone(), relay_peer_info, false)?;
self.register_node_with_peer_info(relay_peer_info, false)?;
}
}
@ -902,7 +893,7 @@ impl RoutingTableInner {
Arc::unwrap_or_clone(peer_info).destructure();
let mut updated = false;
let mut old_peer_info = None;
let nr = self.create_node_ref(outer_self, &node_ids, |_rti, e| {
let nr = self.create_node_ref(&node_ids, |_rti, e| {
old_peer_info = e.make_peer_info(routing_domain);
updated = e.update_signed_node_info(routing_domain, &signed_node_info);
})?;
@ -922,12 +913,11 @@ impl RoutingTableInner {
#[instrument(level = "trace", skip_all, err)]
pub fn register_node_with_id(
&mut self,
outer_self: RoutingTable,
routing_domain: RoutingDomain,
node_id: TypedKey,
timestamp: Timestamp,
) -> EyreResult<FilteredNodeRef> {
let nr = self.create_node_ref(outer_self, &TypedKeyGroup::from(node_id), |_rti, e| {
let nr = self.create_node_ref(&TypedKeyGroup::from(node_id), |_rti, e| {
//e.make_not_dead(timestamp);
e.touch_last_seen(timestamp);
})?;
@ -1057,7 +1047,7 @@ impl RoutingTableInner {
#[instrument(level = "trace", skip_all)]
pub fn find_fast_non_local_nodes_filtered(
&self,
outer_self: RoutingTable,
registry: VeilidComponentRegistry,
routing_domain: RoutingDomain,
node_count: usize,
mut filters: VecDeque<RoutingTableEntryFilter>,
@ -1089,7 +1079,7 @@ impl RoutingTableInner {
node_count,
filters,
|_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
NodeRef::new(outer_self.clone(), v.unwrap().clone())
NodeRef::new(registry.clone(), v.unwrap().clone())
},
)
}
@ -1283,10 +1273,12 @@ impl RoutingTableInner {
T: for<'r> FnMut(&'r RoutingTableInner, Option<Arc<BucketEntry>>) -> O,
{
let cur_ts = Timestamp::now();
let routing_table = self.routing_table();
// Get the crypto kind
let crypto_kind = node_id.kind;
let Some(vcrypto) = self.unlocked_inner.crypto().get(crypto_kind) else {
let crypto = self.crypto();
let Some(vcrypto) = crypto.get(crypto_kind) else {
apibail_generic!("invalid crypto kind");
};
@ -1338,12 +1330,12 @@ impl RoutingTableInner {
let a_key = if let Some(a_entry) = a_entry {
a_entry.with_inner(|e| e.node_ids().get(crypto_kind).unwrap())
} else {
self.unlocked_inner.node_id(crypto_kind)
routing_table.node_id(crypto_kind)
};
let b_key = if let Some(b_entry) = b_entry {
b_entry.with_inner(|e| e.node_ids().get(crypto_kind).unwrap())
} else {
self.unlocked_inner.node_id(crypto_kind)
routing_table.node_id(crypto_kind)
};
// distance is the next metric, closer nodes first
@ -1379,7 +1371,8 @@ impl RoutingTableInner {
.collect();
// Sort closest
let sort = make_closest_noderef_sort(self.unlocked_inner.crypto(), node_id);
let crypto = self.crypto();
let sort = make_closest_noderef_sort(&crypto, node_id);
closest_nodes_locked.sort_by(sort);
// Unlock noderefs
@ -1388,10 +1381,10 @@ impl RoutingTableInner {
}
#[instrument(level = "trace", skip_all)]
pub fn make_closest_noderef_sort(
crypto: Crypto,
pub fn make_closest_noderef_sort<'a>(
crypto: &'a Crypto,
node_id: TypedKey,
) -> impl Fn(&LockedNodeRef, &LockedNodeRef) -> core::cmp::Ordering {
) -> impl Fn(&LockedNodeRef, &LockedNodeRef) -> core::cmp::Ordering + 'a {
let kind = node_id.kind;
// Get cryptoversion to check distance with
let vcrypto = crypto.get(kind).unwrap();
@ -1417,10 +1410,10 @@ pub fn make_closest_noderef_sort(
}
}
pub fn make_closest_node_id_sort(
crypto: Crypto,
pub fn make_closest_node_id_sort<'a>(
crypto: &'a Crypto,
node_id: TypedKey,
) -> impl Fn(&CryptoKey, &CryptoKey) -> core::cmp::Ordering {
) -> impl Fn(&CryptoKey, &CryptoKey) -> core::cmp::Ordering + 'a {
let kind = node_id.kind;
// Get cryptoversion to check distance with
let vcrypto = crypto.get(kind).unwrap();

View File

@ -10,15 +10,15 @@ enum RoutingDomainChangeLocalNetwork {
Common(RoutingDomainChangeCommon),
}
pub struct RoutingDomainEditorLocalNetwork {
routing_table: RoutingTable,
pub struct RoutingDomainEditorLocalNetwork<'a> {
routing_table: &'a RoutingTable,
changes: Vec<RoutingDomainChangeLocalNetwork>,
}
impl RoutingDomainEditorLocalNetwork {
pub(in crate::routing_table) fn new(routing_table: RoutingTable) -> Self {
impl<'a> RoutingDomainEditorLocalNetwork<'a> {
pub(in crate::routing_table) fn new(routing_table: &'a RoutingTable) -> Self {
Self {
routing_table: routing_table.clone(),
routing_table,
changes: Vec::new(),
}
}
@ -30,7 +30,7 @@ impl RoutingDomainEditorLocalNetwork {
}
}
impl RoutingDomainEditorCommonTrait for RoutingDomainEditorLocalNetwork {
impl<'a> RoutingDomainEditorCommonTrait for RoutingDomainEditorLocalNetwork<'a> {
#[instrument(level = "debug", skip(self))]
fn clear_dial_info_details(
&mut self,

View File

@ -5,13 +5,13 @@ enum RoutingDomainChangePublicInternet {
Common(RoutingDomainChangeCommon),
}
pub struct RoutingDomainEditorPublicInternet {
routing_table: RoutingTable,
pub struct RoutingDomainEditorPublicInternet<'a> {
routing_table: &'a RoutingTable,
changes: Vec<RoutingDomainChangePublicInternet>,
}
impl RoutingDomainEditorPublicInternet {
pub(in crate::routing_table) fn new(routing_table: RoutingTable) -> Self {
impl<'a> RoutingDomainEditorPublicInternet<'a> {
pub(in crate::routing_table) fn new(routing_table: &'a RoutingTable) -> Self {
Self {
routing_table,
changes: Vec::new(),
@ -41,7 +41,7 @@ impl RoutingDomainEditorPublicInternet {
}
}
impl RoutingDomainEditorCommonTrait for RoutingDomainEditorPublicInternet {
impl<'a> RoutingDomainEditorCommonTrait for RoutingDomainEditorPublicInternet<'a> {
#[instrument(level = "debug", skip(self))]
fn clear_dial_info_details(
&mut self,
@ -263,8 +263,9 @@ impl RoutingDomainEditorCommonTrait for RoutingDomainEditorPublicInternet {
if changed {
// Clear the routespecstore cache if our PublicInternet dial info has changed
let rss = self.routing_table.route_spec_store();
rss.reset_cache();
if let Some(rss) = self.routing_table.route_spec_store() {
rss.reset_cache();
}
}
}

View File

@ -81,7 +81,7 @@ impl RoutingTable {
}
// If this is our own node id, then we skip it for bootstrap, in case we are a bootstrap node
if self.unlocked_inner.matches_own_node_id(&node_ids) {
if self.matches_own_node_id(&node_ids) {
return Ok(None);
}
@ -255,7 +255,7 @@ impl RoutingTable {
//#[instrument(level = "trace", skip(self), err)]
pub fn bootstrap_with_peer(
self,
&self,
crypto_kinds: Vec<CryptoKind>,
pi: Arc<PeerInfo>,
unord: &FuturesUnordered<SendPinBoxFuture<()>>,
@ -325,7 +325,7 @@ impl RoutingTable {
#[instrument(level = "trace", skip(self), err)]
pub async fn bootstrap_with_peer_list(
self,
&self,
peers: Vec<Arc<PeerInfo>>,
stop_token: StopToken,
) -> EyreResult<()> {
@ -364,10 +364,15 @@ impl RoutingTable {
}
#[instrument(level = "trace", skip(self), err)]
pub async fn bootstrap_task_routine(self, stop_token: StopToken) -> EyreResult<()> {
pub async fn bootstrap_task_routine(
&self,
stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
let bootstrap = self
.unlocked_inner
.with_config(|c| c.network.routing_table.bootstrap.clone());
.config()
.with(|c| c.network.routing_table.bootstrap.clone());
// Don't bother if bootstraps aren't configured
if bootstrap.is_empty() {

View File

@ -10,14 +10,18 @@ impl RoutingTable {
/// Ask our closest peers to give us more peers close to ourselves. This will
/// assist with the DHT and other algorithms that utilize the distance metric.
#[instrument(level = "trace", skip(self), err)]
pub async fn closest_peers_refresh_task_routine(self, stop_token: StopToken) -> EyreResult<()> {
pub async fn closest_peers_refresh_task_routine(
&self,
stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
let mut unord = FuturesUnordered::new();
for crypto_kind in VALID_CRYPTO_KINDS {
// Get our node id for this cryptokind
let self_node_id = self.node_id(crypto_kind);
let routing_table = self.clone();
let mut filters = VecDeque::new();
let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
@ -47,13 +51,13 @@ impl RoutingTable {
) as RoutingTableEntryFilter;
filters.push_front(filter);
let noderefs = routing_table
let noderefs = self
.find_preferred_closest_nodes(
CLOSEST_PEERS_REQUEST_COUNT,
self_node_id,
filters,
|_rti, entry: Option<Arc<BucketEntry>>| {
NodeRef::new(routing_table.clone(), entry.unwrap().clone())
NodeRef::new(self.registry(), entry.unwrap().clone())
},
)
.unwrap();

View File

@ -11,15 +11,15 @@ impl RoutingTable {
// Attempts to keep the size of the routing table down to the bucket depth
#[instrument(level = "trace", skip(self), err)]
pub async fn kick_buckets_task_routine(
self,
&self,
_stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
let kick_queue: Vec<BucketIndex> =
core::mem::take(&mut *self.unlocked_inner.kick_queue.lock())
.into_iter()
.collect();
let crypto = self.crypto();
let kick_queue: Vec<BucketIndex> = core::mem::take(&mut *self.kick_queue.lock())
.into_iter()
.collect();
let mut inner = self.inner.write();
// Get our exempt nodes for each crypto kind
@ -30,7 +30,7 @@ impl RoutingTable {
let Some(buckets) = inner.buckets.get(&kind) else {
continue;
};
let sort = make_closest_node_id_sort(self.crypto(), our_node_id);
let sort = make_closest_node_id_sort(&crypto, our_node_id);
let mut closest_peers = BTreeSet::<CryptoKey>::new();
let mut closest_unreliable_count = 0usize;

View File

@ -12,175 +12,98 @@ use super::*;
impl RoutingTable {
pub fn setup_tasks(&self) {
// Set rolling transfers tick task
{
let this = self.clone();
self.unlocked_inner
.rolling_transfers_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().rolling_transfers_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
impl_setup_task!(
self,
Self,
rolling_transfers_task,
rolling_transfers_task_routine
);
// Set update state stats tick task
{
let this = self.clone();
self.unlocked_inner
.update_state_stats_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().update_state_stats_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
impl_setup_task!(
self,
Self,
update_state_stats_task,
update_state_stats_task_routine
);
// Set rolling answers tick task
{
let this = self.clone();
self.unlocked_inner
.rolling_answers_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().rolling_answers_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
impl_setup_task!(
self,
Self,
rolling_answers_task,
rolling_answers_task_routine
);
// Set kick buckets tick task
{
let this = self.clone();
self.unlocked_inner
.kick_buckets_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().kick_buckets_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
impl_setup_task!(self, Self, kick_buckets_task, kick_buckets_task_routine);
// Set bootstrap tick task
{
let this = self.clone();
self.unlocked_inner
.bootstrap_task
.set_routine(move |s, _l, _t| Box::pin(this.clone().bootstrap_task_routine(s)));
}
impl_setup_task!(self, Self, bootstrap_task, bootstrap_task_routine);
// Set peer minimum refresh tick task
{
let this = self.clone();
self.unlocked_inner
.peer_minimum_refresh_task
.set_routine(move |s, _l, _t| {
Box::pin(this.clone().peer_minimum_refresh_task_routine(s))
});
}
impl_setup_task!(
self,
Self,
peer_minimum_refresh_task,
peer_minimum_refresh_task_routine
);
// Set closest peers refresh tick task
{
let this = self.clone();
self.unlocked_inner
.closest_peers_refresh_task
.set_routine(move |s, _l, _t| {
Box::pin(this.clone().closest_peers_refresh_task_routine(s))
});
}
impl_setup_task!(
self,
Self,
closest_peers_refresh_task,
closest_peers_refresh_task_routine
);
// Set ping validator PublicInternet tick task
{
let this = self.clone();
self.unlocked_inner
.ping_validator_public_internet_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().ping_validator_public_internet_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
impl_setup_task!(
self,
Self,
ping_validator_public_internet_task,
ping_validator_public_internet_task_routine
);
// Set ping validator LocalNetwork tick task
{
let this = self.clone();
self.unlocked_inner
.ping_validator_local_network_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().ping_validator_local_network_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
impl_setup_task!(
self,
Self,
ping_validator_local_network_task,
ping_validator_local_network_task_routine
);
// Set ping validator PublicInternet Relay tick task
{
let this = self.clone();
self.unlocked_inner
.ping_validator_public_internet_relay_task
.set_routine(move |s, l, t| {
Box::pin(
this.clone()
.ping_validator_public_internet_relay_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
),
)
});
}
impl_setup_task!(
self,
Self,
ping_validator_public_internet_relay_task,
ping_validator_public_internet_relay_task_routine
);
// Set ping validator Active Watch tick task
{
let this = self.clone();
self.unlocked_inner
.ping_validator_active_watch_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().ping_validator_active_watch_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
impl_setup_task!(
self,
Self,
ping_validator_active_watch_task,
ping_validator_active_watch_task_routine
);
// Set relay management tick task
{
let this = self.clone();
self.unlocked_inner
.relay_management_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().relay_management_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
impl_setup_task!(
self,
Self,
relay_management_task,
relay_management_task_routine
);
// Set private route management tick task
{
let this = self.clone();
self.unlocked_inner
.private_route_management_task
.set_routine(move |s, l, t| {
Box::pin(this.clone().private_route_management_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
});
}
impl_setup_task!(
self,
Self,
private_route_management_task,
private_route_management_task_routine
);
}
/// Ticks about once per second
@ -197,18 +120,18 @@ impl RoutingTable {
};
// Do rolling transfers every ROLLING_TRANSFERS_INTERVAL_SECS secs
self.unlocked_inner.rolling_transfers_task.tick().await?;
self.rolling_transfers_task.tick().await?;
// Do state stats update every UPDATE_STATE_STATS_INTERVAL_SECS secs
self.unlocked_inner.update_state_stats_task.tick().await?;
self.update_state_stats_task.tick().await?;
// Do rolling answers every ROLLING_ANSWER_INTERVAL_SECS secs
self.unlocked_inner.rolling_answers_task.tick().await?;
self.rolling_answers_task.tick().await?;
// Kick buckets task
let kick_bucket_queue_count = self.unlocked_inner.kick_queue.lock().len();
let kick_bucket_queue_count = self.kick_queue.lock().len();
if kick_bucket_queue_count > 0 {
self.unlocked_inner.kick_buckets_task.tick().await?;
self.kick_buckets_task.tick().await?;
}
// Refresh entry counts
@ -222,7 +145,9 @@ impl RoutingTable {
return Ok(());
}
let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize);
let min_peer_count = self
.config()
.with(|c| c.network.dht.min_peer_count as usize);
// Figure out which tables need bootstrap or peer minimum refresh
let mut needs_bootstrap = false;
@ -237,40 +162,27 @@ impl RoutingTable {
}
}
if needs_bootstrap {
self.unlocked_inner.bootstrap_task.tick().await?;
self.bootstrap_task.tick().await?;
}
if needs_peer_minimum_refresh {
self.unlocked_inner.peer_minimum_refresh_task.tick().await?;
self.peer_minimum_refresh_task.tick().await?;
}
// Ping validate some nodes to groom the table
self.unlocked_inner
.ping_validator_public_internet_task
.tick()
.await?;
self.unlocked_inner
.ping_validator_local_network_task
.tick()
.await?;
self.unlocked_inner
.ping_validator_public_internet_relay_task
.tick()
.await?;
self.unlocked_inner
.ping_validator_active_watch_task
self.ping_validator_public_internet_task.tick().await?;
self.ping_validator_local_network_task.tick().await?;
self.ping_validator_public_internet_relay_task
.tick()
.await?;
self.ping_validator_active_watch_task.tick().await?;
// Run the relay management task
self.unlocked_inner.relay_management_task.tick().await?;
self.relay_management_task.tick().await?;
// Get more nodes if we need to
if !needs_bootstrap && !needs_peer_minimum_refresh {
// Run closest peers refresh task
self.unlocked_inner
.closest_peers_refresh_task
.tick()
.await?;
self.closest_peers_refresh_task.tick().await?;
}
// Only perform these operations if we already have a published peer info
@ -279,10 +191,7 @@ impl RoutingTable {
.is_some()
{
// Run the private route management task
self.unlocked_inner
.private_route_management_task
.tick()
.await?;
self.private_route_management_task.tick().await?;
}
Ok(())
@ -295,82 +204,57 @@ impl RoutingTable {
pub async fn cancel_tasks(&self) {
// Cancel all tasks being ticked
log_rtab!(debug "stopping rolling transfers task");
if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await {
if let Err(e) = self.rolling_transfers_task.stop().await {
error!("rolling_transfers_task not stopped: {}", e);
}
log_rtab!(debug "stopping update state stats task");
if let Err(e) = self.unlocked_inner.update_state_stats_task.stop().await {
if let Err(e) = self.update_state_stats_task.stop().await {
error!("update_state_stats_task not stopped: {}", e);
}
log_rtab!(debug "stopping rolling answers task");
if let Err(e) = self.unlocked_inner.rolling_answers_task.stop().await {
if let Err(e) = self.rolling_answers_task.stop().await {
error!("rolling_answers_task not stopped: {}", e);
}
log_rtab!(debug "stopping kick buckets task");
if let Err(e) = self.unlocked_inner.kick_buckets_task.stop().await {
if let Err(e) = self.kick_buckets_task.stop().await {
error!("kick_buckets_task not stopped: {}", e);
}
log_rtab!(debug "stopping bootstrap task");
if let Err(e) = self.unlocked_inner.bootstrap_task.stop().await {
if let Err(e) = self.bootstrap_task.stop().await {
error!("bootstrap_task not stopped: {}", e);
}
log_rtab!(debug "stopping peer minimum refresh task");
if let Err(e) = self.unlocked_inner.peer_minimum_refresh_task.stop().await {
if let Err(e) = self.peer_minimum_refresh_task.stop().await {
error!("peer_minimum_refresh_task not stopped: {}", e);
}
log_rtab!(debug "stopping ping_validator tasks");
if let Err(e) = self
.unlocked_inner
.ping_validator_public_internet_task
.stop()
.await
{
if let Err(e) = self.ping_validator_public_internet_task.stop().await {
error!("ping_validator_public_internet_task not stopped: {}", e);
}
if let Err(e) = self
.unlocked_inner
.ping_validator_local_network_task
.stop()
.await
{
if let Err(e) = self.ping_validator_local_network_task.stop().await {
error!("ping_validator_local_network_task not stopped: {}", e);
}
if let Err(e) = self
.unlocked_inner
.ping_validator_public_internet_relay_task
.stop()
.await
{
if let Err(e) = self.ping_validator_public_internet_relay_task.stop().await {
error!(
"ping_validator_public_internet_relay_task not stopped: {}",
e
);
}
if let Err(e) = self
.unlocked_inner
.ping_validator_active_watch_task
.stop()
.await
{
if let Err(e) = self.ping_validator_active_watch_task.stop().await {
error!("ping_validator_active_watch_task not stopped: {}", e);
}
log_rtab!(debug "stopping relay management task");
if let Err(e) = self.unlocked_inner.relay_management_task.stop().await {
if let Err(e) = self.relay_management_task.stop().await {
warn!("relay_management_task not stopped: {}", e);
}
log_rtab!(debug "stopping private route management task");
if let Err(e) = self
.unlocked_inner
.private_route_management_task
.stop()
.await
{
if let Err(e) = self.private_route_management_task.stop().await {
warn!("private_route_management_task not stopped: {}", e);
}
log_rtab!(debug "stopping closest peers refresh task");
if let Err(e) = self.unlocked_inner.closest_peers_refresh_task.stop().await {
if let Err(e) = self.closest_peers_refresh_task.stop().await {
warn!("closest_peers_refresh_task not stopped: {}", e);
}
}

View File

@ -12,11 +12,16 @@ impl RoutingTable {
// nodes for their PublicInternet peers, which is a very fast way to get
// a new node online.
#[instrument(level = "trace", skip(self), err)]
pub async fn peer_minimum_refresh_task_routine(self, stop_token: StopToken) -> EyreResult<()> {
pub async fn peer_minimum_refresh_task_routine(
&self,
stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
) -> EyreResult<()> {
// Get counts by crypto kind
let entry_count = self.inner.read().cached_entry_counts();
let (min_peer_count, min_peer_refresh_time_ms) = self.with_config(|c| {
let (min_peer_count, min_peer_refresh_time_ms) = self.config().with(|c| {
(
c.network.dht.min_peer_count as usize,
c.network.dht.min_peer_refresh_time_ms,
@ -39,7 +44,6 @@ impl RoutingTable {
continue;
}
let routing_table = self.clone();
let mut filters = VecDeque::new();
let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
@ -64,23 +68,18 @@ impl RoutingTable {
) as RoutingTableEntryFilter;
filters.push_front(filter);
let noderefs = routing_table.find_preferred_fastest_nodes(
let noderefs = self.find_preferred_fastest_nodes(
min_peer_count,
filters,
|_rti, entry: Option<Arc<BucketEntry>>| {
NodeRef::new(routing_table.clone(), entry.unwrap().clone())
NodeRef::new(self.registry(), entry.unwrap().clone())
},
);
for nr in noderefs {
let routing_table = self.clone();
ord.push_back(
async move {
routing_table
.reverse_find_node(crypto_kind, nr, false, vec![])
.await
}
.instrument(Span::current()),
async move { self.reverse_find_node(crypto_kind, nr, false, vec![]).await }
.instrument(Span::current()),
);
}
}

View File

@ -18,7 +18,7 @@ impl RoutingTable {
// Task routine for PublicInternet status pings
#[instrument(level = "trace", skip(self), err)]
pub async fn ping_validator_public_internet_task_routine(
self,
&self,
stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,
@ -37,7 +37,7 @@ impl RoutingTable {
// Task routine for LocalNetwork status pings
#[instrument(level = "trace", skip(self), err)]
pub async fn ping_validator_local_network_task_routine(
self,
&self,
stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,
@ -56,7 +56,7 @@ impl RoutingTable {
// Task routine for PublicInternet relay keepalive pings
#[instrument(level = "trace", skip(self), err)]
pub async fn ping_validator_public_internet_relay_task_routine(
self,
&self,
stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,
@ -75,7 +75,7 @@ impl RoutingTable {
// Task routine for active watch keepalive pings
#[instrument(level = "trace", skip(self), err)]
pub async fn ping_validator_active_watch_task_routine(
self,
&self,
stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,
@ -180,11 +180,11 @@ impl RoutingTable {
}
for relay_nr_filtered in relay_noderefs {
let rpc = rpc.clone();
futurequeue.push_back(
async move {
log_rtab!("--> PublicInternet Relay ping to {:?}", relay_nr_filtered);
let _ = rpc
let _ = self
.rpc_processor()
.rpc_call_status(Destination::direct(relay_nr_filtered))
.await?;
Ok(())
@ -202,8 +202,6 @@ impl RoutingTable {
cur_ts: Timestamp,
futurequeue: &mut VecDeque<PingValidatorFuture>,
) -> EyreResult<()> {
let rpc = self.rpc_processor();
let watches_need_keepalive = {
let mut inner = self.inner.write();
let need = inner
@ -224,15 +222,16 @@ impl RoutingTable {
}
// Get all the active watches from the storage manager
let storage_manager = self.unlocked_inner.network_manager.storage_manager();
let watch_destinations = storage_manager.get_active_watch_nodes().await;
let watch_destinations = self.storage_manager().get_active_watch_nodes().await;
for watch_destination in watch_destinations {
let rpc = rpc.clone();
futurequeue.push_back(
async move {
log_rtab!("--> Watch Keepalive ping to {:?}", watch_destination);
let _ = rpc.rpc_call_status(watch_destination).await?;
let _ = self
.rpc_processor()
.rpc_call_status(watch_destination)
.await?;
Ok(())
}
.boxed(),
@ -249,8 +248,6 @@ impl RoutingTable {
cur_ts: Timestamp,
futurequeue: &mut VecDeque<PingValidatorFuture>,
) -> EyreResult<()> {
let rpc = self.rpc_processor();
// Get all nodes needing pings in the PublicInternet routing domain
let node_refs = self.get_nodes_needing_ping(RoutingDomain::PublicInternet, cur_ts);
@ -258,12 +255,14 @@ impl RoutingTable {
for nr in node_refs {
let nr = nr.sequencing_clone(Sequencing::PreferOrdered);
let rpc = rpc.clone();
futurequeue.push_back(
async move {
#[cfg(feature = "verbose-tracing")]
log_rtab!(debug "--> PublicInternet Validator ping to {:?}", nr);
let _ = rpc.rpc_call_status(Destination::direct(nr)).await?;
let _ = self
.rpc_processor()
.rpc_call_status(Destination::direct(nr))
.await?;
Ok(())
}
.boxed(),

View File

@ -8,12 +8,13 @@ const BACKGROUND_SAFETY_ROUTE_COUNT: usize = 2;
impl RoutingTable {
fn get_background_safety_route_count(&self) -> usize {
let c = self.config.get();
if c.capabilities.disable.contains(&CAP_ROUTE) {
0
} else {
BACKGROUND_SAFETY_ROUTE_COUNT
}
self.config().with(|c| {
if c.capabilities.disable.contains(&CAP_ROUTE) {
0
} else {
BACKGROUND_SAFETY_ROUTE_COUNT
}
})
}
/// Fastest routes sort
fn route_sort_latency_fn(a: &(RouteId, u64), b: &(RouteId, u64)) -> cmp::Ordering {
@ -44,10 +45,13 @@ impl RoutingTable {
/// If a route doesn't 'need_testing', then we neither test nor drop it
#[instrument(level = "trace", skip(self))]
fn get_allocated_routes_to_test(&self, cur_ts: Timestamp) -> Vec<RouteId> {
let default_route_hop_count =
self.with_config(|c| c.network.rpc.default_route_hop_count as usize);
let default_route_hop_count = self
.config()
.with(|c| c.network.rpc.default_route_hop_count as usize);
let rss = self.route_spec_store();
let Some(rss) = self.route_spec_store() else {
return vec![];
};
let mut must_test_routes = Vec::<RouteId>::new();
let mut unpublished_routes = Vec::<(RouteId, u64)>::new();
let mut expired_routes = Vec::<RouteId>::new();
@ -115,7 +119,10 @@ impl RoutingTable {
log_rtab!("Testing routes: {:?}", routes_needing_testing);
// Test all the routes that need testing at the same time
let rss = self.route_spec_store();
let Some(rss) = self.route_spec_store() else {
return Ok(());
};
#[derive(Default, Debug)]
struct TestRouteContext {
dead_routes: Vec<RouteId>,
@ -125,10 +132,11 @@ impl RoutingTable {
{
let mut unord = FuturesUnordered::new();
for r in routes_needing_testing {
let rss = rss.clone();
let ctx = ctx.clone();
unord.push(
async move {
let rss = self.route_spec_store().unwrap();
let success = match rss.test_route(r).await {
// Test had result
Ok(Some(v)) => v,
@ -169,7 +177,7 @@ impl RoutingTable {
/// Keep private routes assigned and accessible
#[instrument(level = "trace", skip(self, stop_token), err)]
pub async fn private_route_management_task_routine(
self,
&self,
stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,
@ -183,10 +191,13 @@ impl RoutingTable {
}
// Ensure we have a minimum of N allocated local, unpublished routes with the default number of hops and all our supported crypto kinds
let default_route_hop_count =
self.with_config(|c| c.network.rpc.default_route_hop_count as usize);
let default_route_hop_count = self
.config()
.with(|c| c.network.rpc.default_route_hop_count as usize);
let mut local_unpublished_route_count = 0usize;
let rss = self.route_spec_store();
let Some(rss) = self.route_spec_store() else {
return Ok(());
};
rss.list_allocated_routes(|_k, v| {
if !v.is_published()
&& v.hop_count() == default_route_hop_count

View File

@ -54,7 +54,7 @@ impl RoutingTable {
// Keep relays assigned and accessible
#[instrument(level = "trace", skip_all, err)]
pub async fn relay_management_task_routine(
self,
&self,
_stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,

View File

@ -4,7 +4,7 @@ impl RoutingTable {
// Compute transfer statistics to determine how 'fast' a node is
#[instrument(level = "trace", skip(self), err)]
pub async fn rolling_transfers_task_routine(
self,
&self,
_stop_token: StopToken,
last_ts: Timestamp,
cur_ts: Timestamp,
@ -27,8 +27,9 @@ impl RoutingTable {
}
// Roll all route transfers
let rss = self.route_spec_store();
rss.roll_transfers(last_ts, cur_ts);
if let Some(rss) = self.route_spec_store() {
rss.roll_transfers(last_ts, cur_ts);
}
Ok(())
}
@ -36,7 +37,7 @@ impl RoutingTable {
// Update state statistics in PeerStats
#[instrument(level = "trace", skip(self), err)]
pub async fn update_state_stats_task_routine(
self,
&self,
_stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
@ -57,7 +58,7 @@ impl RoutingTable {
// Update rolling answers in PeerStats
#[instrument(level = "trace", skip(self), err)]
pub async fn rolling_answers_task_routine(
self,
&self,
_stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,

View File

@ -1,30 +1,18 @@
use super::*;
use crate::storage_manager::StorageManager;
pub mod test_serialize_routing_table;
pub(crate) fn mock_routing_table() -> routing_table::RoutingTable {
pub(crate) async fn mock_routing_table<'a>() -> VeilidComponentGuard<'a, RoutingTable> {
let veilid_config =
VeilidConfig::new_from_config(VeilidConfigInner::default(), Arc::new(|_| {}));
let registry = VeilidComponentRegistry::new(veilid_config);
registry.enable_mock();
registry.register(ProtectedStore::new);
registry.register(TableStore::new);
registry.register(Crypto::new);
let storage_manager = storage_manager::StorageManager::new(
event_bus.clone(),
veilid_config.clone(),
crypto.clone(),
table_store.clone(),
#[cfg(feature = "unstable-blockstore")]
block_store.clone(),
);
let network_manager = network_manager::NetworkManager::new(
event_bus.clone(),
veilid_config.clone(),
storage_manager,
table_store.clone(),
#[cfg(feature = "unstable-blockstore")]
block_store.clone(),
crypto.clone(),
);
RoutingTable::new(network_manager)
registry.register(StorageManager::new);
registry.register(RoutingTable::new);
registry.init().await.unwrap();
registry.lookup::<RoutingTable>().unwrap()
}

View File

@ -1,16 +1,14 @@
use super::*;
pub async fn test_routingtable_buckets_round_trip() {
let original = mock_routing_table();
let copy = mock_routing_table();
original.init().await.unwrap();
copy.init().await.unwrap();
let original = mock_routing_table().await;
let copy = mock_routing_table().await;
// Add lots of routes to `original` here to exercise all various types.
let (serialized_bucket_map, all_entry_bytes) = original.serialized_buckets();
copy.populate_routing_table(
RoutingTable::populate_routing_table_inner(
&mut copy.inner.write(),
serialized_bucket_map,
all_entry_bytes,
@ -51,8 +49,8 @@ pub async fn test_routingtable_buckets_round_trip() {
}
// Even if these are mocks, we should still practice good hygiene.
original.terminate().await;
copy.terminate().await;
original.registry().terminate().await;
copy.registry().terminate().await;
}
pub async fn test_round_trip_peerinfo() {

View File

@ -43,7 +43,7 @@ impl PeerInfo {
}
}
pub fn validate(&self, crypto: Crypto) -> VeilidAPIResult<()> {
pub fn validate(&self, crypto: &Crypto) -> VeilidAPIResult<()> {
let validated_node_ids = self.signed_node_info.validate(&self.node_ids, crypto)?;
if validated_node_ids.is_empty() {
// Shouldn't get here because signed node info validation also checks this
@ -65,11 +65,11 @@ impl PeerInfo {
(self.routing_domain, self.node_ids, self.signed_node_info)
}
pub fn validate_vec(peer_info_vec: &mut Vec<Arc<PeerInfo>>, crypto: Crypto) {
pub fn validate_vec(peer_info_vec: &mut Vec<Arc<PeerInfo>>, crypto: &Crypto) {
let mut n = 0usize;
while n < peer_info_vec.len() {
let pi = peer_info_vec.get(n).unwrap();
if pi.validate(crypto.clone()).is_err() {
if pi.validate(crypto).is_err() {
peer_info_vec.remove(n);
} else {
n += 1;

View File

@ -36,7 +36,7 @@ impl SignedDirectNodeInfo {
pub fn validate(
&self,
node_ids: &TypedKeyGroup,
crypto: Crypto,
crypto: &Crypto,
) -> VeilidAPIResult<TypedKeyGroup> {
let node_info_bytes = Self::make_signature_bytes(&self.node_info, self.timestamp)?;
@ -54,7 +54,7 @@ impl SignedDirectNodeInfo {
}
pub fn make_signatures(
crypto: Crypto,
crypto: &Crypto,
typed_key_pairs: Vec<TypedKeyPair>,
node_info: NodeInfo,
) -> VeilidAPIResult<Self> {

View File

@ -27,7 +27,7 @@ impl SignedNodeInfo {
pub fn validate(
&self,
node_ids: &TypedKeyGroup,
crypto: Crypto,
crypto: &Crypto,
) -> VeilidAPIResult<TypedKeyGroup> {
match self {
SignedNodeInfo::Direct(d) => d.validate(node_ids, crypto),

View File

@ -49,7 +49,7 @@ impl SignedRelayedNodeInfo {
pub fn validate(
&self,
node_ids: &TypedKeyGroup,
crypto: Crypto,
crypto: &Crypto,
) -> VeilidAPIResult<TypedKeyGroup> {
// Ensure the relay info for the node has a superset of the crypto kinds of the node it is relaying
if common_crypto_kinds(
@ -81,7 +81,7 @@ impl SignedRelayedNodeInfo {
}
pub fn make_signatures(
crypto: Crypto,
crypto: &Crypto,
typed_key_pairs: Vec<TypedKeyPair>,
node_info: NodeInfo,
relay_ids: TypedKeyGroup,

View File

@ -91,14 +91,14 @@ pub fn capability_fanout_node_info_filter(caps: Vec<Capability>) -> FanoutNodeIn
/// If the algorithm times out, a Timeout result is returned, however operations will still have been performed and a
/// timeout is not necessarily indicative of an algorithmic 'failure', just that no definitive stopping condition was found
/// in the given time
pub(crate) struct FanoutCall<R, F, C, D>
pub(crate) struct FanoutCall<'a, R, F, C, D>
where
R: Unpin,
F: Future<Output = FanoutCallResult>,
C: Fn(NodeRef) -> F,
D: Fn(&[NodeRef]) -> Option<R>,
{
routing_table: RoutingTable,
routing_table: &'a RoutingTable,
node_id: TypedKey,
context: Mutex<FanoutContext<R>>,
node_count: usize,
@ -109,7 +109,7 @@ where
check_done: D,
}
impl<R, F, C, D> FanoutCall<R, F, C, D>
impl<'a, R, F, C, D> FanoutCall<'a, R, F, C, D>
where
R: Unpin,
F: Future<Output = FanoutCallResult>,
@ -118,7 +118,7 @@ where
{
#[allow(clippy::too_many_arguments)]
pub fn new(
routing_table: RoutingTable,
routing_table: &'a RoutingTable,
node_id: TypedKey,
node_count: usize,
fanout: usize,
@ -126,13 +126,13 @@ where
node_info_filter: FanoutNodeInfoFilter,
call_routine: C,
check_done: D,
) -> Arc<Self> {
) -> Self {
let context = Mutex::new(FanoutContext {
fanout_queue: FanoutQueue::new(node_id.kind),
result: None,
});
Arc::new(Self {
Self {
routing_table,
node_id,
context,
@ -142,11 +142,11 @@ where
node_info_filter,
call_routine,
check_done,
})
}
}
#[instrument(level = "trace", target = "fanout", skip_all)]
fn evaluate_done(self: Arc<Self>, ctx: &mut FanoutContext<R>) -> bool {
fn evaluate_done(&self, ctx: &mut FanoutContext<R>) -> bool {
// If we have a result, then we're done
if ctx.result.is_some() {
return true;
@ -158,7 +158,7 @@ where
}
#[instrument(level = "trace", target = "fanout", skip_all)]
fn add_to_fanout_queue(self: Arc<Self>, new_nodes: &[NodeRef]) {
fn add_to_fanout_queue(&self, new_nodes: &[NodeRef]) {
event!(target: "fanout", Level::DEBUG,
"FanoutCall::add_to_fanout_queue:\n new_nodes={{\n{}}}\n",
new_nodes
@ -169,24 +169,23 @@ where
);
let ctx = &mut *self.context.lock();
let this = self.clone();
ctx.fanout_queue.add(new_nodes, |current_nodes| {
let mut current_nodes_vec = this
let mut current_nodes_vec = self
.routing_table
.sort_and_clean_closest_noderefs(this.node_id, current_nodes);
.sort_and_clean_closest_noderefs(self.node_id, current_nodes);
current_nodes_vec.truncate(self.node_count);
current_nodes_vec
});
}
#[instrument(level = "trace", target = "fanout", skip_all)]
async fn fanout_processor(self: Arc<Self>) -> bool {
async fn fanout_processor(&self) -> bool {
// Loop until we have a result or are done
loop {
// Get the closest node we haven't processed yet if we're not done yet
let next_node = {
let mut ctx = self.context.lock();
if self.clone().evaluate_done(&mut ctx) {
if self.evaluate_done(&mut ctx) {
break true;
}
ctx.fanout_queue.next()
@ -221,7 +220,7 @@ where
let new_nodes = self
.routing_table
.register_nodes_with_peer_info_list(filtered_v);
self.clone().add_to_fanout_queue(&new_nodes);
self.add_to_fanout_queue(&new_nodes);
}
#[allow(unused_variables)]
Ok(x) => {
@ -239,10 +238,10 @@ where
}
#[instrument(level = "trace", target = "fanout", skip_all)]
fn init_closest_nodes(self: Arc<Self>) -> Result<(), RPCError> {
fn init_closest_nodes(&self) -> Result<(), RPCError> {
// Get the 'node_count' closest nodes to the key out of our routing table
let closest_nodes = {
let routing_table = self.routing_table.clone();
let routing_table = self.routing_table;
let node_info_filter = self.node_info_filter.clone();
let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
@ -253,7 +252,7 @@ where
let entry = opt_entry.unwrap();
// Filter entries
entry.with(rti, |_rti, e| {
entry.with(routing_table, rti, |_rt, _rti, e| {
let Some(signed_node_info) =
e.signed_node_info(RoutingDomain::PublicInternet)
else {
@ -277,20 +276,20 @@ where
let filters = VecDeque::from([filter]);
let transform = |_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
NodeRef::new(routing_table.clone(), v.unwrap().clone())
NodeRef::new(routing_table.registry(), v.unwrap().clone())
};
routing_table
.find_preferred_closest_nodes(self.node_count, self.node_id, filters, transform)
.map_err(RPCError::invalid_format)?
};
self.clone().add_to_fanout_queue(&closest_nodes);
self.add_to_fanout_queue(&closest_nodes);
Ok(())
}
#[instrument(level = "trace", target = "fanout", skip_all)]
pub async fn run(
self: Arc<Self>,
&self,
init_fanout_queue: Vec<NodeRef>,
) -> TimeoutOr<Result<Option<R>, RPCError>> {
// Get timeout in milliseconds
@ -302,17 +301,17 @@ where
};
// Initialize closest nodes list
if let Err(e) = self.clone().init_closest_nodes() {
if let Err(e) = self.init_closest_nodes() {
return TimeoutOr::value(Err(e));
}
// Ensure we include the most recent nodes
self.clone().add_to_fanout_queue(&init_fanout_queue);
self.add_to_fanout_queue(&init_fanout_queue);
// Do a quick check to see if we're already done
{
let mut ctx = self.context.lock();
if self.clone().evaluate_done(&mut ctx) {
if self.evaluate_done(&mut ctx) {
return TimeoutOr::value(ctx.result.take().transpose());
}
}
@ -322,7 +321,7 @@ where
{
// Spin up 'fanout' tasks to process the fanout
for _ in 0..self.fanout {
let h = self.clone().fanout_processor();
let h = self.fanout_processor();
unord.push(h);
}
}

View File

@ -88,29 +88,27 @@ enum RPCKind {
/////////////////////////////////////////////////////////////////////
#[derive(Debug)]
struct RPCProcessorInner {
send_channel: Option<flume::Sender<(Span, MessageEncoded)>>,
stop_source: Option<StopSource>,
worker_join_handles: Vec<MustJoinHandle<()>>,
}
struct RPCProcessorUnlockedInner {
network_manager: NetworkManager,
#[derive(Debug)]
pub(crate) struct RPCProcessor {
registry: VeilidComponentRegistry,
inner: Arc<Mutex<RPCProcessorInner>>,
timeout_us: TimestampDuration,
queue_size: u32,
concurrency: u32,
max_route_hop_count: usize,
update_callback: UpdateCallback,
waiting_rpc_table: OperationWaiter<Message, Option<QuestionContext>>,
waiting_app_call_table: OperationWaiter<Vec<u8>, ()>,
startup_lock: StartupLock,
}
#[derive(Clone)]
pub(crate) struct RPCProcessor {
inner: Arc<Mutex<RPCProcessorInner>>,
unlocked_inner: Arc<RPCProcessorUnlockedInner>,
}
impl_veilid_component!(RPCProcessor);
impl RPCProcessor {
fn new_inner() -> RPCProcessorInner {
@ -120,13 +118,11 @@ impl RPCProcessor {
worker_join_handles: Vec::new(),
}
}
fn new_unlocked_inner(
network_manager: NetworkManager,
update_callback: UpdateCallback,
) -> RPCProcessorUnlockedInner {
pub fn new(registry: VeilidComponentRegistry) -> Self {
// make local copy of node id for easy access
let (concurrency, queue_size, max_route_hop_count, timeout_us) = {
let config = network_manager.config();
let config = registry.config();
let c = config.get();
// set up channel
@ -146,50 +142,20 @@ impl RPCProcessor {
(concurrency, queue_size, max_route_hop_count, timeout_us)
};
RPCProcessorUnlockedInner {
network_manager,
Self {
registry,
inner: Arc::new(Mutex::new(Self::new_inner())),
timeout_us,
queue_size,
concurrency,
max_route_hop_count,
update_callback,
waiting_rpc_table: OperationWaiter::new(),
waiting_app_call_table: OperationWaiter::new(),
startup_lock: StartupLock::new(),
}
}
pub fn new(network_manager: NetworkManager, update_callback: UpdateCallback) -> Self {
Self {
inner: Arc::new(Mutex::new(Self::new_inner())),
unlocked_inner: Arc::new(Self::new_unlocked_inner(network_manager, update_callback)),
}
}
pub fn network_manager(&self) -> NetworkManager {
self.unlocked_inner.network_manager.clone()
}
pub fn crypto(&self) -> Crypto {
self.unlocked_inner.network_manager.crypto()
}
pub fn event_bus(&self) -> EventBus {
self.unlocked_inner.network_manager.event_bus()
}
pub fn routing_table(&self) -> RoutingTable {
self.unlocked_inner.network_manager.routing_table()
}
pub fn storage_manager(&self) -> StorageManager {
self.unlocked_inner.network_manager.storage_manager()
}
pub fn with_config<R, F: FnOnce(&VeilidConfigInner) -> R>(&self, func: F) -> R {
let config = self.unlocked_inner.network_manager.config();
let c = config.get();
func(&c)
}
xxx continue here
//////////////////////////////////////////////////////////////////////

View File

@ -28,13 +28,13 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_get_value(
&self,
rpc_processor: RPCProcessor,
rpc_processor: &RPCProcessor,
key: TypedKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
last_get_result: GetResult,
) -> VeilidAPIResult<flume::Receiver<VeilidAPIResult<OutboundGetValueResult>>> {
let routing_table = rpc_processor.routing_table();
let routing_table = self.routing_table();
let routing_domain = RoutingDomain::PublicInternet;
// Get the DHT parameters for 'GetValue'
@ -84,12 +84,13 @@ impl StorageManager {
// Routine to call to generate fanout
let call_routine = {
let context = context.clone();
let rpc_processor = rpc_processor.clone();
let registry = self.registry();
move |next_node: NodeRef| {
let context = context.clone();
let rpc_processor = rpc_processor.clone();
let registry = registry.clone();
let last_descriptor = last_get_result.opt_descriptor.clone();
async move {
let rpc_processor = registry.lookup::<RPCProcessor>().unwrap();
let gva = network_result_try!(
rpc_processor
.clone()
@ -300,13 +301,14 @@ impl StorageManager {
subkey: ValueSubkey,
last_seq: ValueSeqNum,
) {
let this = self.clone();
inner.process_deferred_results(
let registry = self.registry();
Self::process_deferred_results_inner(inner,
res_rx,
Box::new(
move |result: VeilidAPIResult<get_value::OutboundGetValueResult>| -> SendPinBoxFuture<bool> {
let this = this.clone();
let registry=registry.clone();
Box::pin(async move {
let this = registry.lookup::<StorageManager>().unwrap();
let result = match result {
Ok(v) => v,
Err(e) => {
@ -361,24 +363,27 @@ impl StorageManager {
};
// Keep the list of nodes that returned a value for later reference
let mut inner = self.lock().await?;
let mut inner = self.inner.lock().await;
inner.process_fanout_results(
Self::process_fanout_results_inner(
&mut *inner,
key,
core::iter::once((subkey, &result.fanout_result)),
false,
self.config()
.with(|c| c.network.dht.set_value_count as usize),
);
// If we got a new value back then write it to the opened record
if Some(get_result_value.value_data().seq()) != opt_last_seq {
inner
.handle_set_local_value(
key,
subkey,
get_result_value.clone(),
WatchUpdateMode::UpdateAll,
)
.await?;
Self::handle_set_local_value(
&mut *inner,
key,
subkey,
get_result_value.clone(),
WatchUpdateMode::UpdateAll,
)
.await?;
}
Ok(Some(get_result_value.value_data().clone()))
}
@ -391,12 +396,13 @@ impl StorageManager {
subkey: ValueSubkey,
want_descriptor: bool,
) -> VeilidAPIResult<NetworkResult<GetResult>> {
let mut inner = self.lock().await?;
let mut inner = self.inner.lock().await;
// See if this is a remote or local value
let (_is_local, last_get_result) = {
// See if the subkey we are getting has a last known local value
let mut last_get_result = inner.handle_get_local_value(key, subkey, true).await?;
let mut last_get_result =
Self::handle_get_local_value_inner(&mut *inner, key, subkey, true).await?;
// If this is local, it must have a descriptor already
if last_get_result.opt_descriptor.is_some() {
if !want_descriptor {
@ -405,9 +411,9 @@ impl StorageManager {
(true, last_get_result)
} else {
// See if the subkey we are getting has a last known remote value
let last_get_result = inner
.handle_get_remote_value(key, subkey, want_descriptor)
.await?;
let last_get_result =
Self::handle_get_remote_value_inner(&mut *inner, key, subkey, want_descriptor)
.await?;
(false, last_get_result)
}
};

View File

@ -52,14 +52,14 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_inspect_value(
&self,
rpc_processor: RPCProcessor,
rpc_processor: &RPCProcessor,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
safety_selection: SafetySelection,
local_inspect_result: InspectResult,
use_set_scope: bool,
) -> VeilidAPIResult<OutboundInspectValueResult> {
let routing_table = rpc_processor.routing_table();
let routing_table = self.routing_table();
let routing_domain = RoutingDomain::PublicInternet;
// Get the DHT parameters for 'InspectValue'

View File

@ -241,8 +241,9 @@ impl StorageManager {
let mut inner = self.inner.lock().await;
// Schedule tick
let registry = self.registry();
let tick_future = interval("storage manager tick", 1000, move || {
let registry = self.registry();
let registry = registry.clone();
async move {
let this = registry.lookup::<StorageManager>().unwrap();
if let Err(e) = this.tick().await {
@ -306,7 +307,7 @@ impl StorageManager {
log_stor!(debug "finished storage manager shutdown");
}
async fn save_metadata(inner: &StorageManagerInner) -> EyreResult<()> {
async fn save_metadata(inner: &mut StorageManagerInner) -> EyreResult<()> {
if let Some(metadata_db) = &inner.metadata_db {
let tx = metadata_db.transact();
tx.store_json(0, OFFLINE_SUBKEY_WRITES, &inner.offline_subkey_writes)?;
@ -417,8 +418,7 @@ impl StorageManager {
// Now that the record is made we should always succeed to open the existing record
// The initial writer is the owner of the record
inner
.open_existing_record(key, Some(owner), safety_selection)
Self::open_existing_record_inner(&mut *inner, key, Some(owner), safety_selection)
.await
.map(|r| r.unwrap())
}
@ -431,12 +431,11 @@ impl StorageManager {
writer: Option<KeyPair>,
safety_selection: SafetySelection,
) -> VeilidAPIResult<DHTRecordDescriptor> {
let mut inner = self.lock().await?;
let mut inner = self.inner.lock().await;
// See if we have a local record already or not
if let Some(res) = inner
.open_existing_record(key, writer, safety_selection)
.await?
if let Some(res) =
Self::open_existing_record_inner(&mut *inner, key, writer, safety_selection).await?
{
return Ok(res);
}
@ -444,7 +443,7 @@ impl StorageManager {
// No record yet, try to get it from the network
// Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = Self::get_ready_rpc_processor(&inner) else {
let Some(rpc_processor) = self.get_ready_rpc_processor() else {
apibail_try_again!("offline, try again later");
};
@ -456,7 +455,7 @@ impl StorageManager {
let subkey: ValueSubkey = 0;
let res_rx = self
.outbound_get_value(
rpc_processor,
&rpc_processor,
key,
subkey,
safety_selection,
@ -513,10 +512,10 @@ impl StorageManager {
#[instrument(level = "trace", target = "stor", skip_all)]
pub async fn close_record(&self, key: TypedKey) -> VeilidAPIResult<()> {
let (opt_opened_record, opt_rpc_processor) = {
let mut inner = self.lock().await?;
let mut inner = self.inner.lock().await;
(
inner.close_record(key)?,
Self::get_ready_rpc_processor(&inner),
Self::close_record_inner(&mut *inner, key)?,
self.get_ready_rpc_processor(),
)
};
@ -571,7 +570,7 @@ impl StorageManager {
self.close_record(key).await?;
// Get record from the local store
let mut inner = self.lock().await?;
let mut inner = self.inner.lock().await;
let Some(local_record_store) = inner.local_record_store.as_mut() else {
apibail_not_initialized!();
};
@ -588,7 +587,7 @@ impl StorageManager {
subkey: ValueSubkey,
force_refresh: bool,
) -> VeilidAPIResult<Option<ValueData>> {
let mut inner = self.lock().await?;
let mut inner = self.inner.lock().await;
let safety_selection = {
let Some(opened_record) = inner.opened_records.get(&key) else {
apibail_generic!("record not open");
@ -597,7 +596,8 @@ impl StorageManager {
};
// See if the requested subkey is our local record store
let last_get_result = inner.handle_get_local_value(key, subkey, true).await?;
let last_get_result =
Self::handle_get_local_value_inner(&mut *inner, key, subkey, true).await?;
// Return the existing value if we have one unless we are forcing a refresh
if !force_refresh {
@ -609,7 +609,7 @@ impl StorageManager {
// Refresh if we can
// Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = Self::get_ready_rpc_processor(&inner) else {
let Some(rpc_processor) = self.get_ready_rpc_processor() else {
// Return the existing value if we have one if we aren't online
if let Some(last_get_result_value) = last_get_result.opt_value {
return Ok(Some(last_get_result_value.value_data().clone()));
@ -628,7 +628,7 @@ impl StorageManager {
.map(|v| v.value_data().seq());
let res_rx = self
.outbound_get_value(
rpc_processor,
&rpc_processor,
key,
subkey,
safety_selection,
@ -651,7 +651,7 @@ impl StorageManager {
if let Some(out) = &out {
// If there's more to process, do it in the background
if partial {
let mut inner = self.lock().await?;
let mut inner = self.inner.lock().await;
self.process_deferred_outbound_get_value_result_inner(
&mut inner,
res_rx,
@ -1514,6 +1514,7 @@ impl StorageManager {
key: TypedKey,
subkey_results_iter: I,
is_set: bool,
consensus_count: usize,
) {
// Get local record store
let local_record_store = inner.local_record_store.as_mut().unwrap();
@ -1545,7 +1546,7 @@ impl StorageManager {
.collect::<Vec<_>>();
nodes_ts.sort_by(|a, b| b.1.cmp(&a.1));
for dead_node_key in nodes_ts.iter().skip(self.set_consensus_count) {
for dead_node_key in nodes_ts.iter().skip(consensus_count) {
d.nodes.remove(&dead_node_key.0);
}
});

View File

@ -13,7 +13,9 @@ impl StorageManager {
let mut inner = self.inner.lock().await;
let routing_table = self.routing_table();
let rss = routing_table.route_spec_store();
let Some(rss) = routing_table.route_spec_store() else {
return Ok(());
};
let update_callback = self.update_callback();

View File

@ -4,7 +4,7 @@ impl StorageManager {
// Check if server-side watches have expired
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn check_watched_records_task_routine(
self,
&self,
_stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,

View File

@ -10,64 +10,48 @@ impl StorageManager {
pub(super) fn setup_tasks(&self) {
// Set flush records tick task
log_stor!(debug "starting flush record stores task");
{
let registry = self.registry();
self.flush_record_stores_task.set_routine(move |s, l, t| {
Box::pin(async move {
let this = registry.lookup::<StorageManager>().unwrap();
this.flush_record_stores_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.await
})
});
}
impl_setup_task!(
self,
Self,
flush_record_stores_task,
flush_record_stores_task_routine
);
// Set offline subkey writes tick task
log_stor!(debug "starting offline subkey writes task");
{
let registry = self.registry();
self.offline_subkey_writes_task.set_routine(move |s, l, t| {
Box::pin(async move {
let this = registry.lookup::<StorageManager>().unwrap();
this.offline_subkey_writes_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.await
})
});
}
impl_setup_task!(
self,
Self,
offline_subkey_writes_task,
offline_subkey_writes_task_routine
);
// Set send value changes tick task
log_stor!(debug "starting send value changes task");
{
let registry = self.registry();
self.send_value_changes_task.set_routine(move |s, l, t| {
Box::pin(async move {
let this = registry.lookup::<StorageManager>().unwrap();
this.send_value_changes_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.await
})
});
}
impl_setup_task!(
self,
Self,
send_value_changes_task,
send_value_changes_task_routine
);
// Set check active watches tick task
log_stor!(debug "starting check active watches task");
{
let registry = self.registry();
self.check_active_watches_task.set_routine(move |s, l, t| {
Box::pin(async move {
let this = registry.lookup::<StorageManager>().unwrap();
this.check_active_watches_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.await
})
});
}
impl_setup_task!(
self,
Self,
check_active_watches_task,
check_active_watches_task_routine
);
// Set check watched records tick task
log_stor!(debug "starting checked watched records task");
{
let registry = self.registry();
self.check_watched_records_task.set_routine(move |s, l, t| {
Box::pin(async move {
let this = registry.lookup::<StorageManager>().unwrap();
this.check_watched_records_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.await
})
});
}
impl_setup_task!(
self,
Self,
check_watched_records_task,
check_watched_records_task_routine
);
}
#[instrument(parent = None, level = "trace", target = "stor", name = "StorageManager::tick", skip_all, err)]

View File

@ -186,7 +186,13 @@ impl StorageManager {
// Process all results
#[instrument(level = "trace", target = "stor", skip_all)]
fn process_single_result_inner(inner: &mut StorageManagerInner, result: WorkItemResult) {
async fn process_single_result(&self, result: WorkItemResult) {
let consensus_count = self
.config()
.with(|c| c.network.dht.set_value_count as usize);
let mut inner = self.inner.lock().await;
// Debug print the result
log_stor!(debug "Offline write result: {:?}", result);
@ -218,10 +224,11 @@ impl StorageManager {
// Keep the list of nodes that returned a value for later reference
Self::process_fanout_results_inner(
inner,
&mut *inner,
result.key,
result.fanout_results.iter().map(|x| (x.0, &x.1)),
true,
consensus_count,
);
}
@ -240,8 +247,7 @@ impl StorageManager {
.process_work_item(stop_token.clone(), work_item)
.await?;
{
let mut inner = self.inner.lock().await;
Self::process_single_result_inner(&mut inner, result);
self.process_single_result(result).await;
}
}
@ -258,7 +264,7 @@ impl StorageManager {
) -> EyreResult<()> {
// Operate on a copy of the offline subkey writes map
let work_items = {
let mut inner = self.lock().await?;
let mut inner = self.inner.lock().await;
// Move the current set of writes to 'in flight'
for osw in &mut inner.offline_subkey_writes {
osw.1.subkeys_in_flight = mem::take(&mut osw.1.subkeys);
@ -277,7 +283,7 @@ impl StorageManager {
// Ensure nothing is left in-flight when returning even due to an error
{
let mut inner = self.lock().await?;
let mut inner = self.inner.lock().await;
for osw in &mut inner.offline_subkey_writes {
osw.1.subkeys = osw
.1

View File

@ -6,7 +6,7 @@ impl StorageManager {
// Send value change notifications across the network
#[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn send_value_changes_task_routine(
self,
&self,
stop_token: StopToken,
_last_ts: Timestamp,
_cur_ts: Timestamp,
@ -31,10 +31,9 @@ impl StorageManager {
// Add a future for each value change
for vc in value_changes {
let this = self.clone();
unord.push(
async move {
if let Err(e) = this.send_value_change(vc).await {
if let Err(e) = self.send_value_change(vc).await {
log_stor!(debug "Failed to send value change: {}", e);
}
}

View File

@ -104,7 +104,8 @@ pub async fn test_get_dht_record_key(api: VeilidAPI) {
.with_safety(SafetySelection::Unsafe(Sequencing::EnsureOrdered))
.unwrap();
let cs = api.crypto().unwrap().get(CRYPTO_KIND_VLD0).unwrap();
let crypto = api.crypto();
let cs = crypto.get(CRYPTO_KIND_VLD0).unwrap();
let owner_keypair = cs.generate_keypair();
let schema = DHTSchema::dflt(1).unwrap();
@ -117,7 +118,6 @@ pub async fn test_get_dht_record_key(api: VeilidAPI) {
// recreate the record key from the metadata alone
let key = rc
.get_dht_record_key(schema.clone(), &owner_keypair.key, Some(CRYPTO_KIND_VLD0))
.await
.unwrap();
// keys should be the same

View File

@ -616,7 +616,7 @@ impl VeilidAPI {
0,
"debug_keypair",
"kind",
get_crypto_system_version(crypto.clone()),
get_crypto_system_version(&crypto),
)
.unwrap_or_else(|_| crypto.best());

View File

@ -227,7 +227,7 @@ impl RoutingContext {
/// Builds the record key for a given schema and owner public key
#[instrument(target = "veilid_api", level = "debug", ret, err)]
pub async fn get_dht_record_key(
pub fn get_dht_record_key(
&self,
schema: DHTSchema,
owner_key: &PublicKey,
@ -240,9 +240,7 @@ impl RoutingContext {
let kind = kind.unwrap_or(best_crypto_kind());
Crypto::validate_crypto_kind(kind)?;
let storage_manager = self.api.storage_manager()?;
storage_manager
.get_record_key(kind, schema, owner_key)
.await
storage_manager.get_record_key(kind, schema, owner_key)
}
/// Creates a new DHT record