1
0
mirror of https://gitlab.com/veilid/veilid.git synced 2025-03-22 13:56:35 -04:00

[skip ci] continue refactor

This commit is contained in:
Christien Rioux 2025-02-01 20:47:28 -05:00
parent 7edbf28e36
commit 62eaedcaf8
40 changed files with 209 additions and 269 deletions

@ -136,39 +136,39 @@ impl VeilidCoreContext {
/////////////////////////////////////////////////////////////////////////////
pub trait RegisteredComponents {
fn protected_store(&self) -> VeilidComponentGuard<'_, ProtectedStore>;
fn crypto(&self) -> VeilidComponentGuard<'_, Crypto>;
fn table_store(&self) -> VeilidComponentGuard<'_, TableStore>;
fn storage_manager(&self) -> VeilidComponentGuard<'_, StorageManager>;
fn routing_table(&self) -> VeilidComponentGuard<'_, RoutingTable>;
fn network_manager(&self) -> VeilidComponentGuard<'_, NetworkManager>;
fn rpc_processor(&self) -> VeilidComponentGuard<'_, RPCProcessor>;
fn attachment_manager(&self) -> VeilidComponentGuard<'_, AttachmentManager>;
fn protected_store<'a>(&self) -> VeilidComponentGuard<'a, ProtectedStore>;
fn crypto<'a>(&self) -> VeilidComponentGuard<'a, Crypto>;
fn table_store<'a>(&self) -> VeilidComponentGuard<'a, TableStore>;
fn storage_manager<'a>(&self) -> VeilidComponentGuard<'a, StorageManager>;
fn routing_table<'a>(&self) -> VeilidComponentGuard<'a, RoutingTable>;
fn network_manager<'a>(&self) -> VeilidComponentGuard<'a, NetworkManager>;
fn rpc_processor<'a>(&self) -> VeilidComponentGuard<'a, RPCProcessor>;
fn attachment_manager<'a>(&self) -> VeilidComponentGuard<'a, AttachmentManager>;
}
impl<T: VeilidComponentRegistryAccessor> RegisteredComponents for T {
fn protected_store(&self) -> VeilidComponentGuard<'_, ProtectedStore> {
fn protected_store<'a>(&self) -> VeilidComponentGuard<'a, ProtectedStore> {
self.registry().lookup::<ProtectedStore>().unwrap()
}
fn crypto(&self) -> VeilidComponentGuard<'_, Crypto> {
fn crypto<'a>(&self) -> VeilidComponentGuard<'a, Crypto> {
self.registry().lookup::<Crypto>().unwrap()
}
fn table_store(&self) -> VeilidComponentGuard<'_, TableStore> {
fn table_store<'a>(&self) -> VeilidComponentGuard<'a, TableStore> {
self.registry().lookup::<TableStore>().unwrap()
}
fn storage_manager(&self) -> VeilidComponentGuard<'_, StorageManager> {
fn storage_manager<'a>(&self) -> VeilidComponentGuard<'a, StorageManager> {
self.registry().lookup::<StorageManager>().unwrap()
}
fn routing_table(&self) -> VeilidComponentGuard<'_, RoutingTable> {
fn routing_table<'a>(&self) -> VeilidComponentGuard<'a, RoutingTable> {
self.registry().lookup::<RoutingTable>().unwrap()
}
fn network_manager(&self) -> VeilidComponentGuard<'_, NetworkManager> {
fn network_manager<'a>(&self) -> VeilidComponentGuard<'a, NetworkManager> {
self.registry().lookup::<NetworkManager>().unwrap()
}
fn rpc_processor(&self) -> VeilidComponentGuard<'_, RPCProcessor> {
fn rpc_processor<'a>(&self) -> VeilidComponentGuard<'a, RPCProcessor> {
self.registry().lookup::<RPCProcessor>().unwrap()
}
fn attachment_manager(&self) -> VeilidComponentGuard<'_, AttachmentManager> {
fn attachment_manager<'a>(&self) -> VeilidComponentGuard<'a, AttachmentManager> {
self.registry().lookup::<AttachmentManager>().unwrap()
}
}

@ -80,7 +80,7 @@ impl AddressFilter {
inner.dial_info_failures.clear();
}
fn purge_old_timestamps(&self, inner: &mut AddressFilterInner, cur_ts: Timestamp) {
fn purge_old_timestamps_inner(&self, inner: &mut AddressFilterInner, cur_ts: Timestamp) {
// v4
{
let mut dead_keys = Vec::<Ipv4Addr>::new();
@ -304,14 +304,14 @@ impl AddressFilter {
#[instrument(parent = None, level = "trace", skip_all, err)]
pub async fn address_filter_task_routine(
self,
&self,
_stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
//
let mut inner = self.inner.lock();
self.purge_old_timestamps(&mut inner, cur_ts);
self.purge_old_timestamps_inner(&mut inner, cur_ts);
self.purge_old_punishments(&mut inner, cur_ts);
Ok(())
@ -326,7 +326,7 @@ impl AddressFilter {
}
let ts = Timestamp::now();
self.purge_old_timestamps(inner, ts);
self.purge_old_timestamps_inner(inner, ts);
match ipblock {
IpAddr::V4(v4) => {
@ -377,13 +377,13 @@ impl AddressFilter {
Ok(())
}
pub fn remove_connection(&mut self, addr: IpAddr) -> Result<(), AddressNotInTableError> {
pub fn remove_connection(&self, addr: IpAddr) -> Result<(), AddressNotInTableError> {
let mut inner = self.inner.lock();
let ipblock = ip_to_ipblock(self.max_connections_per_ip6_prefix_size, addr);
let ts = Timestamp::now();
self.purge_old_timestamps(&mut inner, ts);
self.purge_old_timestamps_inner(&mut inner, ts);
match ipblock {
IpAddr::V4(v4) => {

@ -452,12 +452,11 @@ impl ConnectionManager {
let network_manager = self.network_manager();
let prot_conn = network_result_try!(loop {
let address_filter = network_manager.address_filter();
let result_net_res = ProtocolNetworkConnection::connect(
preferred_local_address,
&dial_info,
self.arc.connection_initial_timeout_ms,
&*address_filter,
network_manager.address_filter(),
)
.await;
match result_net_res {

@ -241,7 +241,8 @@ impl ConnectionTable {
let ip_addr = flow.remote_address().ip_addr();
if let Err(e) = self
.network_manager()
.with_address_filter_mut(|af| af.add_connection(ip_addr))
.address_filter()
.add_connection(ip_addr)
{
// Return the connection in the error to be disposed of
return Err(ConnectionTableAddError::address_filter(
@ -468,7 +469,8 @@ impl ConnectionTable {
// address_filter
let ip_addr = remote.socket_addr().ip();
self.network_manager()
.with_address_filter_mut(|af| af.remove_connection(ip_addr))
.address_filter()
.remove_connection(ip_addr)
.expect("Inconsistency in connection table");
conn
}

@ -35,7 +35,7 @@ impl NetworkManager {
// Direct bootstrap request
#[instrument(level = "trace", target = "net", err, skip(self))]
pub async fn boot_request(&self, dial_info: DialInfo) -> EyreResult<Vec<Arc<PeerInfo>>> {
let timeout_ms = self.with_config(|c| c.network.rpc.timeout_ms);
let timeout_ms = self.config().with(|c| c.network.rpc.timeout_ms);
// Send boot magic to requested peer address
let data = BOOT_MAGIC.to_vec();

@ -132,13 +132,12 @@ struct NetworkManagerInner {
socket_address_change_subscription: Option<EventBusSubscription>,
}
#[derive(Debug)]
pub(crate) struct NetworkManager {
registry: VeilidComponentRegistry,
inner: Mutex<NetworkManagerInner>,
// Address filter
address_filter: RwLock<AddressFilter>,
address_filter: AddressFilter,
// Accessors
components: RwLock<Option<NetworkComponents>>,
@ -218,7 +217,7 @@ impl NetworkManager {
let this = Self {
registry,
inner: Mutex::new(inner),
address_filter: RwLock::new(address_filter),
address_filter,
components: RwLock::new(None),
rolling_transfers_task: TickTask::new(
"rolling_transfers_task",
@ -237,24 +236,8 @@ impl NetworkManager {
this
}
pub fn with_address_filter_mut<F, R>(&self, callback: F) -> R
where
F: FnOnce(&mut AddressFilter) -> R,
{
let mut af = self.address_filter.write();
callback(&mut *af)
}
pub fn with_address_filter<F, R>(&self, callback: F) -> R
where
F: FnOnce(&AddressFilter) -> R,
{
let af = self.address_filter.read();
callback(&*af)
}
pub fn address_filter<'a>(&self) -> RwLockReadGuard<'a, AddressFilter> {
self.address_filter.read()
pub fn address_filter(&self) -> &AddressFilter {
&self.address_filter
}
fn net(&self) -> Network {
@ -289,9 +272,6 @@ impl NetworkManager {
#[instrument(level = "debug", skip_all, err)]
async fn init_async(&self) -> EyreResult<()> {
let address_filter = AddressFilter::new(self.registry());
*self.address_filter.write() = Some(address_filter);
Ok(())
}
@ -302,9 +282,7 @@ impl NetworkManager {
async fn pre_terminate_async(&self) {}
#[instrument(level = "debug", skip_all)]
async fn terminate_async(&self) {
*self.address_filter.write() = None;
}
async fn terminate_async(&self) {}
#[instrument(level = "debug", skip_all, err)]
pub async fn internal_startup(&self) -> EyreResult<StartupDisposition> {
@ -314,11 +292,11 @@ impl NetworkManager {
}
// Clean address filter for things that should not be persistent
self.address_filter().restart();
self.address_filter.restart();
// Create network components
let connection_manager = ConnectionManager::new(self.registry());
let net = Network::new(self.registry(), connection_manager.clone());
let net = Network::new(self.registry());
let receipt_manager = ReceiptManager::new();
*self.components.write() = Some(NetworkComponents {
net: net.clone(),
@ -527,7 +505,7 @@ impl NetworkManager {
let crypto = self.crypto();
// Generate receipt and serialized form to return
let vcrypto = self.crypto().best();
let vcrypto = crypto.best();
let nonce = vcrypto.random_nonce();
let node_id = routing_table.node_id(vcrypto.kind());
@ -567,7 +545,7 @@ impl NetworkManager {
let crypto = self.crypto();
// Generate receipt and serialized form to return
let vcrypto = self.crypto().best();
let vcrypto = crypto.best();
let nonce = vcrypto.random_nonce();
let node_id = routing_table.node_id(vcrypto.kind());

@ -307,8 +307,7 @@ impl NetworkConnection {
flow
);
let network_manager = connection_manager.network_manager();
let address_filter = network_manager.address_filter();
let registry = connection_manager.registry();
let mut unord = FuturesUnordered::new();
let mut need_receiver = true;
let mut need_sender = true;
@ -364,14 +363,17 @@ impl NetworkConnection {
// Add another message receiver future if necessary
if need_receiver {
need_receiver = false;
let registry = registry.clone();
let receiver_fut = Self::recv_internal(&protocol_connection, stats.clone())
.then(|res| async {
let registry = registry;
let network_manager = registry.network_manager();
match res {
Ok(v) => {
let peer_address = protocol_connection.flow().remote();
// Check to see if it is punished
if address_filter.is_ip_addr_punished(peer_address.socket_addr().ip()) {
if network_manager.address_filter().is_ip_addr_punished(peer_address.socket_addr().ip()) {
return RecvLoopAction::Finish;
}
@ -383,7 +385,7 @@ impl NetworkConnection {
// Punish invalid framing (tcp framing or websocket framing)
if v.is_invalid_message() {
address_filter.punish_ip_addr(peer_address.socket_addr().ip(), PunishmentReason::InvalidFraming);
network_manager.address_filter().punish_ip_addr(peer_address.socket_addr().ip(), PunishmentReason::InvalidFraming);
return RecvLoopAction::Finish;
}

@ -40,9 +40,10 @@ impl NetworkManager {
destination_node_ref: FilteredNodeRef,
data: Vec<u8>,
) -> SendPinBoxFuture<EyreResult<NetworkResult<SendDataMethod>>> {
let this = self.clone();
let registry = self.registry();
Box::pin(
async move {
let this = registry.network_manager();
// If we need to relay, do it
let (contact_method, target_node_ref, opt_relayed_contact_method) = match possibly_relayed_contact_method.clone() {
@ -652,17 +653,14 @@ impl NetworkManager {
data: Vec<u8>,
) -> EyreResult<NetworkResult<UniqueFlow>> {
// Detect if network is stopping so we can break out of this
let Some(stop_token) = self.unlocked_inner.startup_lock.stop_token() else {
let Some(stop_token) = self.startup_lock.stop_token() else {
return Ok(NetworkResult::service_unavailable("network is stopping"));
};
// Build a return receipt for the signal
let receipt_timeout = TimestampDuration::new_ms(
self.unlocked_inner
.config
.get()
.network
.reverse_connection_receipt_time_ms as u64,
self.config()
.with(|c| c.network.reverse_connection_receipt_time_ms as u64),
);
let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?;
@ -763,7 +761,7 @@ impl NetworkManager {
data: Vec<u8>,
) -> EyreResult<NetworkResult<UniqueFlow>> {
// Detect if network is stopping so we can break out of this
let Some(stop_token) = self.unlocked_inner.startup_lock.stop_token() else {
let Some(stop_token) = self.startup_lock.stop_token() else {
return Ok(NetworkResult::service_unavailable("network is stopping"));
};
@ -776,11 +774,8 @@ impl NetworkManager {
// Build a return receipt for the signal
let receipt_timeout = TimestampDuration::new_ms(
self.unlocked_inner
.config
.get()
.network
.hole_punch_receipt_time_ms as u64,
self.config()
.with(|c| c.network.hole_punch_receipt_time_ms as u64),
);
let (receipt, eventual_value) = self.generate_single_shot_receipt(receipt_timeout, [])?;

@ -117,11 +117,9 @@ impl NetworkManager {
}
pub(super) fn send_network_update(&self) {
let update_cb = self.unlocked_inner.update_callback.read().clone();
if update_cb.is_none() {
return;
}
let update_cb = self.update_callback();
let state = self.get_veilid_state();
(update_cb.unwrap())(VeilidUpdate::Network(state));
update_cb(VeilidUpdate::Network(state));
}
}

@ -14,14 +14,16 @@ impl NetworkManager {
// Set address filter task
{
let this = self.clone();
let registry = self.registry();
self.address_filter_task.set_routine(move |s, l, t| {
xxx continue here
Box::pin(this.address_filter().address_filter_task_routine(
s,
Timestamp::new(l),
Timestamp::new(t),
))
let registry = registry.clone();
Box::pin(async move {
registry
.network_manager()
.address_filter()
.address_filter_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.await
})
});
}
}

@ -4,7 +4,7 @@ impl NetworkManager {
// Compute transfer statistics for the low level network
#[instrument(level = "trace", skip(self), err)]
pub async fn rolling_transfers_task_routine(
self,
&self,
_stop_token: StopToken,
last_ts: Timestamp,
cur_ts: Timestamp,

@ -1,13 +1,11 @@
use super::*;
use super::connection_table::*;
use crate::tests::common::test_veilid_config::*;
use crate::tests::mock_routing_table;
pub async fn test_add_get_remove() {
let config = get_config();
let address_filter = AddressFilter::new(config.clone(), mock_routing_table());
let table = ConnectionTable::new(config, address_filter);
let routing_table = mock_routing_table().await;
let table = ConnectionTable::new(routing_table.registry());
let a1 = Flow::new_no_local(PeerAddress::new(
SocketAddress::new(Address::IPV4(Ipv4Addr::new(192, 168, 0, 1)), 8080),

@ -30,7 +30,7 @@ pub async fn test_signed_node_info() {
// Test correct validation
let keypair = vcrypto.generate_keypair();
let sni = SignedDirectNodeInfo::make_signatures(
crypto.clone(),
&crypto,
vec![TypedKeyPair::new(ck, keypair)],
node_info.clone(),
)
@ -42,7 +42,7 @@ pub async fn test_signed_node_info() {
sni.timestamp(),
sni.signatures().to_vec(),
);
let tks_validated = sdni.validate(&tks, crypto.clone()).unwrap();
let tks_validated = sdni.validate(&tks, &crypto).unwrap();
assert_eq!(tks_validated.len(), oldtkslen);
assert_eq!(tks_validated.len(), sni.signatures().len());
@ -54,7 +54,7 @@ pub async fn test_signed_node_info() {
sni.timestamp(),
sni.signatures().to_vec(),
);
sdni.validate(&tks1, crypto.clone()).unwrap_err();
sdni.validate(&tks1, &crypto).unwrap_err();
// Test unsupported cryptosystem validation
let fake_crypto_kind: CryptoKind = FourCC::from([0, 1, 2, 3]);
@ -65,7 +65,7 @@ pub async fn test_signed_node_info() {
tksfake.add(TypedKey::new(ck, keypair.key));
let sdnifake =
SignedDirectNodeInfo::new(node_info.clone(), sni.timestamp(), sigsfake.clone());
let tksfake_validated = sdnifake.validate(&tksfake, crypto.clone()).unwrap();
let tksfake_validated = sdnifake.validate(&tksfake, &crypto).unwrap();
assert_eq!(tksfake_validated.len(), 1);
assert_eq!(sdnifake.signatures().len(), sigsfake.len());
@ -89,7 +89,7 @@ pub async fn test_signed_node_info() {
let oldtks2len = tks2.len();
let sni2 = SignedRelayedNodeInfo::make_signatures(
crypto.clone(),
&crypto,
vec![TypedKeyPair::new(ck, keypair2)],
node_info2.clone(),
tks.clone(),
@ -103,7 +103,7 @@ pub async fn test_signed_node_info() {
sni2.timestamp(),
sni2.signatures().to_vec(),
);
let tks2_validated = srni.validate(&tks2, crypto.clone()).unwrap();
let tks2_validated = srni.validate(&tks2, &crypto).unwrap();
assert_eq!(tks2_validated.len(), oldtks2len);
assert_eq!(tks2_validated.len(), sni2.signatures().len());
@ -119,7 +119,7 @@ pub async fn test_signed_node_info() {
sni2.timestamp(),
sni2.signatures().to_vec(),
);
srni.validate(&tks3, crypto.clone()).unwrap_err();
srni.validate(&tks3, &crypto).unwrap_err();
// Test unsupported cryptosystem validation
let fake_crypto_kind: CryptoKind = FourCC::from([0, 1, 2, 3]);
@ -135,7 +135,7 @@ pub async fn test_signed_node_info() {
sni2.timestamp(),
sigsfake3.clone(),
);
let tksfake3_validated = srnifake.validate(&tksfake3, crypto.clone()).unwrap();
let tksfake3_validated = srnifake.validate(&tksfake3, &crypto).unwrap();
assert_eq!(tksfake3_validated.len(), 1);
assert_eq!(srnifake.signatures().len(), sigsfake3.len());
}

@ -21,7 +21,7 @@ pub(crate) enum SignalInfo {
}
impl SignalInfo {
pub fn validate(&self, crypto: Crypto) -> Result<(), RPCError> {
pub fn validate(&self, crypto: &Crypto) -> Result<(), RPCError> {
match self {
SignalInfo::HolePunch { receipt, peer_info } => {
if receipt.len() < MIN_RECEIPT_SIZE {

@ -95,6 +95,8 @@ pub(crate) struct RoutingTable {
registry: VeilidComponentRegistry,
inner: RwLock<RoutingTableInner>,
/// Route spec store
route_spec_store: RouteSpecStore,
/// The current node's public DHT keys
node_id: TypedKeyGroup,
/// The current node's public DHT secrets
@ -145,9 +147,11 @@ impl RoutingTable {
let config = registry.config();
let c = config.get();
let inner = RwLock::new(RoutingTableInner::new(registry.clone()));
let route_spec_store = RouteSpecStore::new(registry.clone());
let this = Self {
registry,
inner,
route_spec_store,
node_id: c.network.routing_table.node_id.clone(),
node_id_secret: c.network.routing_table.node_id_secret.clone(),
kick_queue: Mutex::new(BTreeSet::default()),
@ -221,20 +225,12 @@ impl RoutingTable {
// Set up routespecstore
log_rtab!(debug "starting route spec store init");
let route_spec_store = match RouteSpecStore::load(self.registry()).await {
Ok(v) => v,
Err(e) => {
log_rtab!(debug "Error loading route spec store: {:#?}. Resetting.", e);
RouteSpecStore::new(self.registry())
}
if let Err(e) = self.route_spec_store().load().await {
log_rtab!(debug "Error loading route spec store: {:#?}. Resetting.", e);
self.route_spec_store().reset();
};
log_rtab!(debug "finished route spec store init");
{
let mut inner = self.inner.write();
inner.route_spec_store = Some(route_spec_store);
}
log_rtab!(debug "finished routing table init");
Ok(())
}
@ -507,14 +503,8 @@ impl RoutingTable {
self.inner.read().routing_domain_for_address(address)
}
pub fn route_spec_store(&self) -> Option<MappedRwLockReadGuard<'_, RouteSpecStore>> {
let inner = self.inner.read();
if inner.route_spec_store.is_none() {
return None;
}
Some(RwLockReadGuard::map(inner, |x| {
x.route_spec_store.as_ref().unwrap()
}))
pub fn route_spec_store(&self) -> &RouteSpecStore {
&self.route_spec_store
}
pub fn relay_node(&self, domain: RoutingDomain) -> Option<FilteredNodeRef> {
@ -627,7 +617,7 @@ impl RoutingTable {
) -> Vec<FilteredNodeRef> {
self.inner
.read()
.get_nodes_needing_ping(self.registry(), routing_domain, cur_ts)
.get_nodes_needing_ping(routing_domain, cur_ts)
}
fn queue_bucket_kicks(&self, node_ids: TypedKeyGroup) {

@ -9,11 +9,7 @@ pub(crate) struct FilteredNodeRef {
track_id: usize,
}
impl VeilidComponentRegistryAccessor for FilteredNodeRef {
fn registry(&self) -> VeilidComponentRegistry {
self.registry.clone()
}
}
impl_veilid_component_registry_accessor!(FilteredNodeRef);
impl FilteredNodeRef {
pub fn new(

@ -22,11 +22,7 @@ pub(crate) struct NodeRef {
track_id: usize,
}
impl VeilidComponentRegistryAccessor for NodeRef {
fn registry(&self) -> VeilidComponentRegistry {
self.registry.clone()
}
}
impl_veilid_component_registry_accessor!(NodeRef);
impl NodeRef {
pub fn new(registry: VeilidComponentRegistry, entry: Arc<BucketEntry>) -> Self {

@ -64,43 +64,41 @@ impl RouteSpecStore {
}
}
#[instrument(level = "trace", target = "route", skip_all, err)]
pub async fn load(registry: VeilidComponentRegistry) -> EyreResult<RouteSpecStore> {
let (max_route_hop_count, default_route_hop_count) = {
let config = registry.config();
let c = config.get();
(
c.network.rpc.max_route_hop_count as usize,
c.network.rpc.default_route_hop_count as usize,
)
};
// Get frozen blob from table store
let table_store = registry.lookup::<TableStore>().unwrap();
let routing_table = registry.lookup::<RoutingTable>().unwrap();
let content = RouteSpecStoreContent::load(&table_store, &routing_table).await?;
let mut inner = RouteSpecStoreInner {
content,
#[instrument(level = "trace", target = "route", skip_all)]
pub fn reset(&self) {
*self.inner.lock() = RouteSpecStoreInner {
content: RouteSpecStoreContent::new(),
cache: Default::default(),
};
}
// Rebuild the routespecstore cache
let rti = &*routing_table.inner.read();
for (_, rssd) in inner.content.iter_details() {
inner.cache.add_to_cache(rti, rssd);
}
#[instrument(level = "trace", target = "route", skip_all, err)]
pub async fn load(&self) -> EyreResult<()> {
let inner = {
let table_store = self.table_store();
let routing_table = self.routing_table();
// Return the loaded RouteSpecStore
let rss = RouteSpecStore {
registry,
inner: Arc::new(Mutex::new(inner)),
max_route_hop_count,
default_route_hop_count,
// Get frozen blob from table store
let content = RouteSpecStoreContent::load(&table_store, &routing_table).await?;
let mut inner = RouteSpecStoreInner {
content,
cache: Default::default(),
};
// Rebuild the routespecstore cache
let rti = &*routing_table.inner.read();
for (_, rssd) in inner.content.iter_details() {
inner.cache.add_to_cache(rti, rssd);
}
inner
};
Ok(rss)
// Return the loaded RouteSpecStore
*self.inner.lock() = inner;
Ok(())
}
#[instrument(level = "trace", target = "route", skip(self), err)]
@ -111,7 +109,7 @@ impl RouteSpecStore {
};
// Save our content
let table_store = self.registry.lookup::<TableStore>().unwrap();
let table_store = self.table_store();
content.save(&table_store).await?;
Ok(())
@ -140,7 +138,7 @@ impl RouteSpecStore {
/// Purge the route spec store
pub async fn purge(&self) -> VeilidAPIResult<()> {
// Briefly pause routing table ticker while changes are made
let routing_table = self.registry.lookup::<RoutingTable>().unwrap();
let routing_table = self.routing_table();
let _tick_guard = routing_table.pause_tasks().await;
routing_table.cancel_tasks().await;
@ -901,7 +899,8 @@ impl RouteSpecStore {
};
// Remove from hop cache
let rti = &*self.routing_table().inner.read();
let routing_table = self.routing_table();
let rti = &*routing_table.inner.read();
if !inner.cache.remove_from_cache(rti, id, &rssd) {
panic!("hop cache should have contained cache key");
}

@ -460,7 +460,6 @@ impl RoutingTableInner {
// Collect all entries that are 'needs_ping' and have some node info making them reachable somehow
pub(super) fn get_nodes_needing_ping(
&self,
registry: VeilidComponentRegistry,
routing_domain: RoutingDomain,
cur_ts: Timestamp,
) -> Vec<FilteredNodeRef> {
@ -603,6 +602,8 @@ impl RoutingTableInner {
entry: Arc<BucketEntry>,
node_ids: &[TypedKey],
) -> EyreResult<()> {
let routing_table = self.routing_table();
entry.with_mut_inner(|e| {
let mut existing_node_ids = e.node_ids();
@ -633,21 +634,21 @@ impl RoutingTableInner {
if let Some(old_node_id) = e.add_node_id(*node_id)? {
// Remove any old node id for this crypto kind
if VALID_CRYPTO_KINDS.contains(&ck) {
let bucket_index = self.unlocked_inner.calculate_bucket_index(&old_node_id);
let bucket_index = routing_table.calculate_bucket_index(&old_node_id);
let bucket = self.get_bucket_mut(bucket_index);
bucket.remove_entry(&old_node_id.value);
self.unlocked_inner.kick_queue.lock().insert(bucket_index);
routing_table.kick_queue.lock().insert(bucket_index);
}
}
// Bucket the entry appropriately
if VALID_CRYPTO_KINDS.contains(&ck) {
let bucket_index = self.unlocked_inner.calculate_bucket_index(node_id);
let bucket_index = routing_table.calculate_bucket_index(node_id);
let bucket = self.get_bucket_mut(bucket_index);
bucket.add_existing_entry(node_id.value, entry.clone());
// Kick bucket
self.unlocked_inner.kick_queue.lock().insert(bucket_index);
routing_table.kick_queue.lock().insert(bucket_index);
}
}
@ -655,7 +656,7 @@ impl RoutingTableInner {
for node_id in existing_node_ids.iter() {
let ck = node_id.kind;
if VALID_CRYPTO_KINDS.contains(&ck) {
let bucket_index = self.unlocked_inner.calculate_bucket_index(node_id);
let bucket_index = routing_table.calculate_bucket_index(node_id);
let bucket = self.get_bucket_mut(bucket_index);
bucket.remove_entry(&node_id.value);
entry.with_mut_inner(|e| e.remove_node_id(ck));

@ -144,11 +144,7 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail {
pi
};
if let Err(e) = rti
.unlocked_inner
.event_bus
.post(PeerInfoChangeEvent { peer_info })
{
if let Err(e) = rti.event_bus().post(PeerInfoChangeEvent { peer_info }) {
log_rtab!(debug "Failed to post event: {}", e);
}

@ -312,6 +312,9 @@ impl RoutingDomainDetailCommon {
// Internal functions
fn make_peer_info(&self, rti: &RoutingTableInner) -> PeerInfo {
let crypto = rti.crypto();
let routing_table = rti.routing_table();
let node_info = NodeInfo::new(
self.network_class().unwrap_or(NetworkClass::Invalid),
self.outbound_protocols,
@ -343,8 +346,8 @@ impl RoutingDomainDetailCommon {
let signed_node_info = match relay_info {
Some((relay_ids, relay_sdni)) => SignedNodeInfo::Relayed(
SignedRelayedNodeInfo::make_signatures(
rti.unlocked_inner.crypto(),
rti.unlocked_inner.node_id_typed_key_pairs(),
&crypto,
routing_table.node_id_typed_key_pairs(),
node_info,
relay_ids,
relay_sdni,
@ -353,8 +356,8 @@ impl RoutingDomainDetailCommon {
),
None => SignedNodeInfo::Direct(
SignedDirectNodeInfo::make_signatures(
rti.unlocked_inner.crypto(),
rti.unlocked_inner.node_id_typed_key_pairs(),
&crypto,
routing_table.node_id_typed_key_pairs(),
node_info,
)
.unwrap(),
@ -363,7 +366,7 @@ impl RoutingDomainDetailCommon {
PeerInfo::new(
self.routing_domain,
rti.unlocked_inner.node_ids(),
routing_table.node_ids(),
signed_node_info,
)
}

@ -263,9 +263,7 @@ impl<'a> RoutingDomainEditorCommonTrait for RoutingDomainEditorPublicInternet<'a
if changed {
// Clear the routespecstore cache if our PublicInternet dial info has changed
if let Some(rss) = self.routing_table.route_spec_store() {
rss.reset_cache();
}
self.routing_table.route_spec_store().reset_cache();
}
}

@ -122,11 +122,7 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
pi
};
if let Err(e) = rti
.unlocked_inner
.event_bus
.post(PeerInfoChangeEvent { peer_info })
{
if let Err(e) = rti.event_bus().post(PeerInfoChangeEvent { peer_info }) {
log_rtab!(debug "Failed to post event: {}", e);
}
@ -167,11 +163,8 @@ impl RoutingDomainDetail for PublicInternetRoutingDomainDetail {
dif_sort: Option<Arc<DialInfoDetailSort>>,
) -> ContactMethod {
let ip6_prefix_size = rti
.unlocked_inner
.config
.get()
.network
.max_connections_per_ip6_prefix_size as usize;
.config()
.with(|c| c.network.max_connections_per_ip6_prefix_size as usize);
// Get the nodeinfos for convenience
let node_a = peer_a.signed_node_info().node_info();

@ -280,19 +280,20 @@ impl RoutingTable {
for crypto_kind in crypto_kinds {
// Bootstrap this crypto kind
let nr = nr.unfiltered();
let routing_table = self.clone();
unord.push(Box::pin(
async move {
let network_manager = nr.network_manager();
let routing_table = nr.routing_table();
// Get what contact method would be used for contacting the bootstrap
let bsdi = match routing_table
.network_manager()
let bsdi = match network_manager
.get_node_contact_method(nr.default_filtered())
{
Ok(NodeContactMethod::Direct(v)) => v,
Ok(v) => {
log_rtab!(debug "invalid contact method for bootstrap, ignoring peer: {:?}", v);
// let _ = routing_table
// .network_manager()
// let _ =
// network_manager
// .get_node_contact_method(nr.clone());
return;
}
@ -312,7 +313,7 @@ impl RoutingTable {
log_rtab!(debug "bootstrap server is not responding for dialinfo: {}", bsdi);
// Try a different dialinfo next time
routing_table.network_manager().address_filter().set_dial_info_failed(bsdi);
network_manager.address_filter().set_dial_info_failed(bsdi);
} else {
// otherwise this bootstrap is valid, lets ask it to find ourselves now
routing_table.reverse_find_node(crypto_kind, nr, true, vec![]).await

@ -289,11 +289,10 @@ impl RoutingTable {
for nr in node_refs {
let nr = nr.sequencing_clone(Sequencing::PreferOrdered);
let rpc = rpc.clone();
// Just do a single ping with the best protocol for all the nodes
futurequeue.push_back(
async move {
let rpc = nr.rpc_processor();
#[cfg(feature = "verbose-tracing")]
log_rtab!(debug "--> LocalNetwork Validator ping to {:?}", nr);
let _ = rpc.rpc_call_status(Destination::direct(nr)).await?;

@ -49,13 +49,10 @@ impl RoutingTable {
.config()
.with(|c| c.network.rpc.default_route_hop_count as usize);
let Some(rss) = self.route_spec_store() else {
return vec![];
};
let mut must_test_routes = Vec::<RouteId>::new();
let mut unpublished_routes = Vec::<(RouteId, u64)>::new();
let mut expired_routes = Vec::<RouteId>::new();
rss.list_allocated_routes(|k, v| {
self.route_spec_store().list_allocated_routes(|k, v| {
let stats = v.get_stats();
// Ignore nodes that don't need testing
if !stats.needs_testing(cur_ts) {
@ -99,7 +96,7 @@ impl RoutingTable {
// Process dead routes
for r in expired_routes {
log_rtab!(debug "Expired route: {}", r);
rss.release_route(r);
self.route_spec_store().release_route(r);
}
// return routes to test
@ -118,11 +115,6 @@ impl RoutingTable {
}
log_rtab!("Testing routes: {:?}", routes_needing_testing);
// Test all the routes that need testing at the same time
let Some(rss) = self.route_spec_store() else {
return Ok(());
};
#[derive(Default, Debug)]
struct TestRouteContext {
dead_routes: Vec<RouteId>,
@ -135,9 +127,7 @@ impl RoutingTable {
let ctx = ctx.clone();
unord.push(
async move {
let rss = self.route_spec_store().unwrap();
let success = match rss.test_route(r).await {
let success = match self.route_spec_store().test_route(r).await {
// Test had result
Ok(Some(v)) => v,
// Test could not be performed at this time
@ -168,7 +158,7 @@ impl RoutingTable {
let ctx = Arc::try_unwrap(ctx).unwrap().into_inner();
for r in ctx.dead_routes {
log_rtab!(debug "Dead route failed to test: {}", r);
rss.release_route(r);
self.route_spec_store().release_route(r);
}
Ok(())
@ -195,10 +185,8 @@ impl RoutingTable {
.config()
.with(|c| c.network.rpc.default_route_hop_count as usize);
let mut local_unpublished_route_count = 0usize;
let Some(rss) = self.route_spec_store() else {
return Ok(());
};
rss.list_allocated_routes(|_k, v| {
self.route_spec_store().list_allocated_routes(|_k, v| {
if !v.is_published()
&& v.hop_count() == default_route_hop_count
&& v.get_route_set_keys().kinds() == VALID_CRYPTO_KINDS
@ -224,7 +212,7 @@ impl RoutingTable {
stability: Stability::Reliable,
sequencing: Sequencing::PreferOrdered,
};
match rss.allocate_route(
match self.route_spec_store().allocate_route(
&VALID_CRYPTO_KINDS,
&safety_spec,
DirectionSet::all(),
@ -249,7 +237,7 @@ impl RoutingTable {
}
// Test remote routes next
let remote_routes_needing_testing = rss.list_remote_routes(|k, v| {
let remote_routes_needing_testing = self.route_spec_store().list_remote_routes(|k, v| {
let stats = v.get_stats();
if stats.needs_testing(cur_ts) {
Some(*k)
@ -263,7 +251,7 @@ impl RoutingTable {
}
// Send update (also may send updates for released routes done by other parts of the program)
rss.send_route_update();
self.route_spec_store().send_route_update();
Ok(())
}

@ -162,11 +162,8 @@ impl RoutingTable {
.node_info()
.clone();
let ip6_prefix_size = self
.unlocked_inner
.config
.get()
.network
.max_connections_per_ip6_prefix_size as usize;
.config()
.with(|c| c.network.max_connections_per_ip6_prefix_size as usize);
move |e: &BucketEntryInner| {
// Ensure this node is not on the local network and is on the public internet
@ -285,6 +282,6 @@ impl RoutingTable {
Option::<()>::None
});
// Return the best inbound relay noderef
best_inbound_relay.map(|e| NodeRef::new(self.clone(), e))
best_inbound_relay.map(|e| NodeRef::new(self.registry(), e))
}
}

@ -27,9 +27,7 @@ impl RoutingTable {
}
// Roll all route transfers
if let Some(rss) = self.route_spec_store() {
rss.roll_transfers(last_ts, cur_ts);
}
self.route_spec_store().roll_transfers(last_ts, cur_ts);
Ok(())
}

@ -13,6 +13,7 @@ pub(crate) async fn mock_routing_table<'a>() -> VeilidComponentGuard<'a, Routing
registry.register(Crypto::new);
registry.register(StorageManager::new);
registry.register(RoutingTable::new);
registry.register(NetworkManager::new);
registry.init().await.unwrap();
registry.lookup::<RoutingTable>().unwrap()
}

@ -68,10 +68,10 @@ pub enum QuestionContext {
#[derive(Clone)]
pub struct RPCValidateContext {
pub crypto: Crypto,
// pub rpc_processor: RPCProcessor,
pub registry: VeilidComponentRegistry,
pub question_context: Option<QuestionContext>,
}
impl_veilid_component_registry_accessor!(RPCValidateContext);
#[derive(Clone)]
pub struct RPCDecodeContext {

@ -100,8 +100,9 @@ impl RPCOperation {
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
// Validate sender peer info
if let Some(sender_peer_info) = &self.sender_peer_info.opt_peer_info {
let crypto = validate_context.crypto();
sender_peer_info
.validate(validate_context.crypto.clone())
.validate(&crypto)
.map_err(RPCError::protocol)?;
}

@ -95,7 +95,8 @@ impl RPCOperationFindNodeA {
}
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone());
let crypto = validate_context.crypto();
PeerInfo::validate_vec(&mut self.peers, &crypto);
Ok(())
}

@ -106,6 +106,8 @@ impl RPCOperationGetValueA {
}
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
let crypto = validate_context.crypto();
let question_context = validate_context
.question_context
.as_ref()
@ -157,7 +159,7 @@ impl RPCOperationGetValueA {
}
}
PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone());
PeerInfo::validate_vec(&mut self.peers, &crypto);
Ok(())
}

@ -10,7 +10,8 @@ impl RPCOperationSignal {
Self { signal_info }
}
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
self.signal_info.validate(validate_context.crypto.clone())
let crypto = validate_context.crypto();
self.signal_info.validate(&crypto)
}
// pub fn signal_info(&self) -> &SignalInfo {
// &self.signal_info

@ -11,7 +11,8 @@ impl RPCQuestion {
Self { respond_to, detail }
}
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
self.respond_to.validate(validate_context.crypto.clone())?;
let crypto = validate_context.crypto();
self.respond_to.validate(&crypto)?;
self.detail.validate(validate_context)
}
pub fn respond_to(&self) -> &RespondTo {

@ -7,7 +7,7 @@ pub(in crate::rpc_processor) enum RespondTo {
}
impl RespondTo {
pub fn validate(&mut self, crypto: Crypto) -> Result<(), RPCError> {
pub fn validate(&mut self, crypto: &Crypto) -> Result<(), RPCError> {
match self {
RespondTo::Sender => Ok(()),
RespondTo::PrivateRoute(pr) => pr.validate(crypto).map_err(RPCError::protocol),

@ -298,9 +298,11 @@ impl RPCProcessor {
}
Target::PrivateRoute(rsid) => {
// Get remote private route
let rss = self.routing_table().route_spec_store();
let Some(private_route) = rss.best_remote_private_route(&rsid) else {
let Some(private_route) = self
.routing_table()
.route_spec_store()
.best_remote_private_route(&rsid)
else {
return Err(RPCError::network("could not get remote private route"));
};

@ -241,7 +241,6 @@ where
fn init_closest_nodes(&self) -> Result<(), RPCError> {
// Get the 'node_count' closest nodes to the key out of our routing table
let closest_nodes = {
let routing_table = self.routing_table;
let node_info_filter = self.node_info_filter.clone();
let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
@ -252,7 +251,7 @@ where
let entry = opt_entry.unwrap();
// Filter entries
entry.with(routing_table, rti, |_rt, _rti, e| {
entry.with(rti, |_rti, e| {
let Some(signed_node_info) =
e.signed_node_info(RoutingDomain::PublicInternet)
else {
@ -276,10 +275,10 @@ where
let filters = VecDeque::from([filter]);
let transform = |_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
NodeRef::new(routing_table.registry(), v.unwrap().clone())
NodeRef::new(self.routing_table.registry(), v.unwrap().clone())
};
routing_table
self.routing_table
.find_preferred_closest_nodes(self.node_count, self.node_id, filters, transform)
.map_err(RPCError::invalid_format)?
};

@ -674,7 +674,7 @@ impl StorageManager {
data: Vec<u8>,
writer: Option<KeyPair>,
) -> VeilidAPIResult<Option<ValueData>> {
let mut inner = self.lock().await?;
let mut inner = self.inner.lock().await;
// Get cryptosystem
let crypto = self.crypto();
@ -701,7 +701,8 @@ impl StorageManager {
};
// See if the subkey we are modifying has a last known local value
let last_get_result = inner.handle_get_local_value(key, subkey, true).await?;
let last_get_result =
Self::handle_get_local_value_inner(&mut *inner, key, subkey, true).await?;
// Get the descriptor and schema for the key
let Some(descriptor) = last_get_result.opt_descriptor else {
@ -741,20 +742,20 @@ impl StorageManager {
// Write the value locally first
log_stor!(debug "Writing subkey locally: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() );
inner
.handle_set_local_value(
key,
subkey,
signed_value_data.clone(),
WatchUpdateMode::NoUpdate,
)
.await?;
Self::handle_set_local_value_inner(
&mut *inner,
key,
subkey,
signed_value_data.clone(),
WatchUpdateMode::NoUpdate,
)
.await?;
// Get rpc processor and drop mutex so we don't block while getting the value from the network
let Some(rpc_processor) = Self::get_ready_rpc_processor(&inner) else {
let Some(rpc_processor) = self.get_ready_rpc_processor() else {
log_stor!(debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() );
// Add to offline writes to flush
inner.add_offline_subkey_write(key, subkey, safety_selection);
Self::add_offline_subkey_write_inner(&mut *inner, key, subkey, safety_selection);
return Ok(None);
};
@ -766,7 +767,7 @@ impl StorageManager {
// Use the safety selection we opened the record with
let res_rx = match self
.outbound_set_value(
rpc_processor,
&rpc_processor,
key,
subkey,
safety_selection,
@ -778,8 +779,8 @@ impl StorageManager {
Ok(v) => v,
Err(e) => {
// Failed to write, try again later
let mut inner = self.lock().await?;
inner.add_offline_subkey_write(key, subkey, safety_selection);
let mut inner = self.inner.lock().await;
Self::add_offline_subkey_write_inner(&mut *inner, key, subkey, safety_selection);
return Err(e);
}
};

@ -28,14 +28,15 @@ impl StorageManager {
#[instrument(level = "trace", target = "dht", skip_all, err)]
pub(super) async fn outbound_set_value(
&self,
rpc_processor: &RPCProcessor,
key: TypedKey,
subkey: ValueSubkey,
safety_selection: SafetySelection,
value: Arc<SignedValueData>,
descriptor: Arc<SignedValueDescriptor>,
) -> VeilidAPIResult<flume::Receiver<VeilidAPIResult<OutboundSetValueResult>>> {
let routing_table = rpc_processor.routing_table();
xxx switch this to registry mechanism
let routing_domain = RoutingDomain::PublicInternet;
// Get the DHT parameters for 'SetValue'
@ -53,8 +54,7 @@ impl StorageManager {
// Get the nodes we know are caching this value to seed the fanout
let init_fanout_queue = {
let inner = self.inner.lock().await;
inner
.get_value_nodes(key)?
Self::get_value_nodes_inner(&mut *inner, key)?
.unwrap_or_default()
.into_iter()
.filter(|x| {
@ -301,15 +301,17 @@ impl StorageManager {
last_value_data: ValueData,
safety_selection: SafetySelection,
) {
let this = self.clone();
let registry = self.registry();
let last_value_data = Arc::new(Mutex::new(last_value_data));
inner.process_deferred_results(
Self::process_deferred_results_inner(inner,
res_rx,
Box::new(
move |result: VeilidAPIResult<set_value::OutboundSetValueResult>| -> SendPinBoxFuture<bool> {
let this = this.clone();
let registry = registry.clone();
let last_value_data = last_value_data.clone();
Box::pin(async move {
let this = registry.storage_manager();
let result = match result {
Ok(v) => v,
Err(e) => {