diff --git a/veilid-core/proto/veilid.capnp b/veilid-core/proto/veilid.capnp index 3a247908..a0d13643 100644 --- a/veilid-core/proto/veilid.capnp +++ b/veilid-core/proto/veilid.capnp @@ -303,11 +303,6 @@ struct OperationRoute @0x96741859ce6ac7dd { operation @1 :RoutedOperation; # The operation to be routed } -struct OperationNodeInfoUpdate @0xc9647b32a48b66ce { - signedNodeInfo @0 :SignedNodeInfo; # Our signed node info -} - - struct OperationAppCallQ @0xade67b9f09784507 { message @0 :Data; # Opaque request to application } @@ -466,12 +461,12 @@ struct Question @0xd8510bc33492ef70 { findNodeQ @3 :OperationFindNodeQ; # Routable operations - getValueQ @4 :OperationGetValueQ; - setValueQ @5 :OperationSetValueQ; - watchValueQ @6 :OperationWatchValueQ; - supplyBlockQ @7 :OperationSupplyBlockQ; - findBlockQ @8 :OperationFindBlockQ; - appCallQ @9 :OperationAppCallQ; + appCallQ @4 :OperationAppCallQ; + getValueQ @5 :OperationGetValueQ; + setValueQ @6 :OperationSetValueQ; + watchValueQ @7 :OperationWatchValueQ; + supplyBlockQ @8 :OperationSupplyBlockQ; + findBlockQ @9 :OperationFindBlockQ; # Tunnel operations startTunnelQ @10 :OperationStartTunnelQ; @@ -486,13 +481,12 @@ struct Statement @0x990e20828f404ae1 { # Direct operations validateDialInfo @0 :OperationValidateDialInfo; route @1 :OperationRoute; - nodeInfoUpdate @2 :OperationNodeInfoUpdate; # Routable operations - valueChanged @3 :OperationValueChanged; - signal @4 :OperationSignal; - returnReceipt @5 :OperationReturnReceipt; - appMessage @6 :OperationAppMessage; + signal @2 :OperationSignal; + returnReceipt @3 :OperationReturnReceipt; + appMessage @4 :OperationAppMessage; + valueChanged @5 :OperationValueChanged; } } @@ -504,12 +498,12 @@ struct Answer @0xacacb8b6988c1058 { findNodeA @1 :OperationFindNodeA; # Routable operations - getValueA @2 :OperationGetValueA; - setValueA @3 :OperationSetValueA; - watchValueA @4 :OperationWatchValueA; - supplyBlockA @5 :OperationSupplyBlockA; - findBlockA @6 :OperationFindBlockA; - appCallA @7 :OperationAppCallA; + appCallA @2 :OperationAppCallA; + getValueA @3 :OperationGetValueA; + setValueA @4 :OperationSetValueA; + watchValueA @5 :OperationWatchValueA; + supplyBlockA @6 :OperationSupplyBlockA; + findBlockA @7 :OperationFindBlockA; # Tunnel operations startTunnelA @8 :OperationStartTunnelA; diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index d204a982..56b95702 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -23,7 +23,7 @@ pub use network_connection::*; use connection_handle::*; use connection_limits::*; use crypto::*; -use futures_util::stream::{FuturesUnordered, StreamExt}; +use futures_util::stream::FuturesUnordered; use hashlink::LruCache; use intf::*; #[cfg(not(target_arch = "wasm32"))] @@ -155,7 +155,6 @@ struct NetworkManagerUnlockedInner { // Background processes rolling_transfers_task: TickTask, public_address_check_task: TickTask, - node_info_update_single_future: MustJoinSingleFuture<()>, } #[derive(Clone)] @@ -191,7 +190,6 @@ impl NetworkManager { update_callback: RwLock::new(None), rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), public_address_check_task: TickTask::new(PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS), - node_info_update_single_future: MustJoinSingleFuture::new(), } } @@ -1734,62 +1732,4 @@ impl NetworkManager { } } - // Inform routing table entries that our dial info has changed - pub async fn send_node_info_updates(&self, routing_domain: RoutingDomain, all: bool) { - let this = self.clone(); - - // Run in background only once - let _ = self - .clone() - .unlocked_inner - .node_info_update_single_future - .single_spawn( - async move { - // Only update if we actually have valid signed node info for this routing domain - if !this.routing_table().has_valid_own_node_info(routing_domain) { - trace!( - "not sending node info update because our network class is not yet valid" - ); - return; - } - - // Get the list of refs to all nodes to update - let cur_ts = get_timestamp(); - let node_refs = - this.routing_table() - .get_nodes_needing_updates(routing_domain, cur_ts, all); - - // Send the updates - log_net!(debug "Sending node info updates to {} nodes", node_refs.len()); - let mut unord = FuturesUnordered::new(); - for nr in node_refs { - let rpc = this.rpc_processor(); - unord.push( - async move { - // Update the node - if let Err(e) = rpc - .rpc_call_node_info_update(nr.clone(), routing_domain) - .await - { - // Not fatal, but we should be able to see if this is happening - trace!("failed to send node info update to {:?}: {}", nr, e); - return; - } - - // Mark the node as having seen our node info - nr.set_seen_our_node_info(routing_domain); - } - .instrument(Span::current()), - ); - } - - // Wait for futures to complete - while unord.next().await.is_some() {} - - log_rtab!(debug "Finished sending node updates"); - } - .instrument(Span::current()), - ) - .await; - } } diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 23b05614..0ea3563b 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -831,12 +831,10 @@ impl Network { debug!("clearing dial info"); let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet); - editor.disable_node_info_updates(); editor.clear_dial_info_details(); editor.commit().await; let mut editor = routing_table.edit_routing_domain(RoutingDomain::LocalNetwork); - editor.disable_node_info_updates(); editor.clear_dial_info_details(); editor.commit().await; diff --git a/veilid-core/src/network_manager/tasks/mod.rs b/veilid-core/src/network_manager/tasks/mod.rs index 108afd2d..e5ec11f8 100644 --- a/veilid-core/src/network_manager/tasks/mod.rs +++ b/veilid-core/src/network_manager/tasks/mod.rs @@ -68,15 +68,5 @@ impl NetworkManager { if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await { warn!("rolling_transfers_task not stopped: {}", e); } - debug!("stopping node info update singlefuture"); - if self - .unlocked_inner - .node_info_update_single_future - .join() - .await - .is_err() - { - error!("node_info_update_single_future not stopped"); - } } } diff --git a/veilid-core/src/network_manager/wasm/mod.rs b/veilid-core/src/network_manager/wasm/mod.rs index 3287ea0b..b71aee14 100644 --- a/veilid-core/src/network_manager/wasm/mod.rs +++ b/veilid-core/src/network_manager/wasm/mod.rs @@ -318,7 +318,6 @@ impl Network { // Drop all dial info let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet); - editor.disable_node_info_updates(); editor.clear_dial_info_details(); editor.commit().await; diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index 56af85b0..3224ac3a 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -275,6 +275,30 @@ impl BucketEntryInner { false } + pub fn exists_in_routing_domain( + &self, + rti: &RoutingTableInner, + routing_domain: RoutingDomain, + ) -> bool { + // Check node info + if self.has_node_info(routing_domain.into()) { + return true; + } + + // Check connections + let connection_manager = rti.network_manager().connection_manager(); + let last_connections = self.last_connections( + rti, + Some(NodeRefFilter::new().with_routing_domain(routing_domain)), + ); + for lc in last_connections { + if connection_manager.get_connection(lc.0).is_some() { + return true; + } + } + false + } + pub fn node_info(&self, routing_domain: RoutingDomain) -> Option<&NodeInfo> { let opt_current_sni = match routing_domain { RoutingDomain::LocalNetwork => &self.local_network.signed_node_info, @@ -304,8 +328,10 @@ impl BucketEntryInner { pub fn best_routing_domain( &self, + rti: &RoutingTableInner, routing_domain_set: RoutingDomainSet, ) -> Option { + // Check node info for routing_domain in routing_domain_set { let opt_current_sni = match routing_domain { RoutingDomain::LocalNetwork => &self.local_network.signed_node_info, @@ -315,7 +341,27 @@ impl BucketEntryInner { return Some(routing_domain); } } - None + // Check connections + let mut best_routing_domain: Option = None; + let connection_manager = rti.network_manager().connection_manager(); + let last_connections = self.last_connections( + rti, + Some(NodeRefFilter::new().with_routing_domain_set(routing_domain_set)), + ); + for lc in last_connections { + if connection_manager.get_connection(lc.0).is_some() { + if let Some(rd) = rti.routing_domain_for_address(lc.0.remote_address().address()) { + if let Some(brd) = best_routing_domain { + if rd < brd { + best_routing_domain = Some(rd); + } + } else { + best_routing_domain = Some(rd); + } + } + } + } + best_routing_domain } fn descriptor_to_key(&self, last_connection: ConnectionDescriptor) -> LastConnectionKey { diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index b1718cff..987841b5 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -457,17 +457,6 @@ impl RoutingTable { .get_entry_count(routing_domain_set, min_state) } - pub fn get_nodes_needing_updates( - &self, - routing_domain: RoutingDomain, - cur_ts: u64, - all: bool, - ) -> Vec { - self.inner - .read() - .get_nodes_needing_updates(self.clone(), routing_domain, cur_ts, all) - } - pub fn get_nodes_needing_ping( &self, routing_domain: RoutingDomain, diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index ca4f0001..b99ec68f 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -87,8 +87,9 @@ pub trait NodeRefBase: Sized { } fn best_routing_domain(&self) -> Option { - self.operate(|_rti, e| { + self.operate(|rti, e| { e.best_routing_domain( + rti, self.common() .filter .as_ref() diff --git a/veilid-core/src/routing_table/routing_domain_editor.rs b/veilid-core/src/routing_table/routing_domain_editor.rs index dce7d3cf..28785c12 100644 --- a/veilid-core/src/routing_table/routing_domain_editor.rs +++ b/veilid-core/src/routing_table/routing_domain_editor.rs @@ -23,7 +23,6 @@ pub struct RoutingDomainEditor { routing_table: RoutingTable, routing_domain: RoutingDomain, changes: Vec, - send_node_info_updates: bool, } impl RoutingDomainEditor { @@ -32,13 +31,8 @@ impl RoutingDomainEditor { routing_table, routing_domain, changes: Vec::new(), - send_node_info_updates: true, } } - #[instrument(level = "debug", skip(self))] - pub fn disable_node_info_updates(&mut self) { - self.send_node_info_updates = false; - } #[instrument(level = "debug", skip(self))] pub fn clear_dial_info_details(&mut self) { @@ -199,7 +193,7 @@ impl RoutingDomainEditor { } }); if changed { - // Allow signed node info updates at same timestamp from dead nodes if our network has changed + // Allow signed node info updates at same timestamp for otherwise dead nodes if our network has changed inner.reset_all_updated_since_last_network_change(); } } @@ -210,12 +204,5 @@ impl RoutingDomainEditor { rss.reset(); } } - // Send our updated node info to all the nodes in the routing table - if changed && self.send_node_info_updates { - let network_manager = self.routing_table.unlocked_inner.network_manager.clone(); - network_manager - .send_node_info_updates(self.routing_domain, true) - .await; - } } } diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index 8741451c..8338be29 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -428,7 +428,7 @@ impl RoutingTableInner { let mut count = 0usize; let cur_ts = get_timestamp(); self.with_entries(cur_ts, min_state, |rti, _, e| { - if e.with(rti, |_rti, e| e.best_routing_domain(routing_domain_set)) + if e.with(rti, |rti, e| e.best_routing_domain(rti, routing_domain_set)) .is_some() { count += 1; @@ -487,29 +487,6 @@ impl RoutingTableInner { None } - pub fn get_nodes_needing_updates( - &self, - outer_self: RoutingTable, - routing_domain: RoutingDomain, - cur_ts: u64, - all: bool, - ) -> Vec { - let mut node_refs = Vec::::with_capacity(self.bucket_entry_count); - self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, k, v| { - // Only update nodes that haven't seen our node info yet - if all || !v.with(rti, |_rti, e| e.has_seen_our_node_info(routing_domain)) { - node_refs.push(NodeRef::new( - outer_self.clone(), - k, - v, - Some(NodeRefFilter::new().with_routing_domain(routing_domain)), - )); - } - Option::<()>::None - }); - node_refs - } - pub fn get_nodes_needing_ping( &self, outer_self: RoutingTable, @@ -525,9 +502,22 @@ impl RoutingTableInner { // Collect all entries that are 'needs_ping' and have some node info making them reachable somehow let mut node_refs = Vec::::with_capacity(self.bucket_entry_count); self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, k, v| { - if v.with(rti, |_rti, e| { - e.has_node_info(routing_domain.into()) - && e.needs_ping(cur_ts, opt_relay_id == Some(k)) + if v.with(rti, |rti, e| { + // If this isn't in the routing domain we are checking, don't include it + if !e.exists_in_routing_domain(rti, routing_domain) { + return false; + } + // If we need a ping via the normal timing mechanism, then do it + if e.needs_ping(cur_ts, opt_relay_id == Some(k)) { + return true; + } + // If we need a ping because this node hasn't seen our latest node info, then do it + if let Some(own_node_info_ts) = own_node_info_ts { + if !e.has_seen_our_node_info_ts(routing_domain, own_node_info_ts) { + return true; + } + } + false }) { node_refs.push(NodeRef::new( outer_self.clone(), diff --git a/veilid-core/src/rpc_processor/coders/operations/mod.rs b/veilid-core/src/rpc_processor/coders/operations/mod.rs index 3c91d344..7caf0fb2 100644 --- a/veilid-core/src/rpc_processor/coders/operations/mod.rs +++ b/veilid-core/src/rpc_processor/coders/operations/mod.rs @@ -7,7 +7,6 @@ mod operation_complete_tunnel; mod operation_find_block; mod operation_find_node; mod operation_get_value; -mod operation_node_info_update; mod operation_return_receipt; mod operation_route; mod operation_set_value; @@ -31,7 +30,6 @@ pub use operation_complete_tunnel::*; pub use operation_find_block::*; pub use operation_find_node::*; pub use operation_get_value::*; -pub use operation_node_info_update::*; pub use operation_return_receipt::*; pub use operation_route::*; pub use operation_set_value::*; diff --git a/veilid-core/src/rpc_processor/coders/operations/operation.rs b/veilid-core/src/rpc_processor/coders/operations/operation.rs index 34dfbf8c..46651748 100644 --- a/veilid-core/src/rpc_processor/coders/operations/operation.rs +++ b/veilid-core/src/rpc_processor/coders/operations/operation.rs @@ -16,10 +16,7 @@ impl RPCOperationKind { } } - pub fn decode( - kind_reader: &veilid_capnp::operation::kind::Reader, - opt_sender_node_id: Option<&DHTKey>, - ) -> Result { + pub fn decode(kind_reader: &veilid_capnp::operation::kind::Reader) -> Result { let which_reader = kind_reader.which().map_err(RPCError::protocol)?; let out = match which_reader { veilid_capnp::operation::kind::Which::Question(r) => { @@ -29,7 +26,7 @@ impl RPCOperationKind { } veilid_capnp::operation::kind::Which::Statement(r) => { let q_reader = r.map_err(RPCError::protocol)?; - let out = RPCStatement::decode(&q_reader, opt_sender_node_id)?; + let out = RPCStatement::decode(&q_reader)?; RPCOperationKind::Statement(out) } veilid_capnp::operation::kind::Which::Answer(r) => { @@ -141,7 +138,7 @@ impl RPCOperation { let target_node_info_ts = operation_reader.get_target_node_info_ts(); let kind_reader = operation_reader.get_kind(); - let kind = RPCOperationKind::decode(&kind_reader, opt_sender_node_id)?; + let kind = RPCOperationKind::decode(&kind_reader)?; Ok(RPCOperation { op_id, diff --git a/veilid-core/src/rpc_processor/coders/operations/operation_node_info_update.rs b/veilid-core/src/rpc_processor/coders/operations/operation_node_info_update.rs deleted file mode 100644 index 386805a3..00000000 --- a/veilid-core/src/rpc_processor/coders/operations/operation_node_info_update.rs +++ /dev/null @@ -1,32 +0,0 @@ -use super::*; - -#[derive(Debug, Clone)] -pub struct RPCOperationNodeInfoUpdate { - pub signed_node_info: SignedNodeInfo, -} - -impl RPCOperationNodeInfoUpdate { - pub fn decode( - reader: &veilid_capnp::operation_node_info_update::Reader, - opt_sender_node_id: Option<&DHTKey>, - ) -> Result { - if opt_sender_node_id.is_none() { - return Err(RPCError::protocol( - "can't decode node info update without sender node id", - )); - } - let sender_node_id = opt_sender_node_id.unwrap(); - let sni_reader = reader.get_signed_node_info().map_err(RPCError::protocol)?; - let signed_node_info = decode_signed_node_info(&sni_reader, sender_node_id)?; - - Ok(RPCOperationNodeInfoUpdate { signed_node_info }) - } - pub fn encode( - &self, - builder: &mut veilid_capnp::operation_node_info_update::Builder, - ) -> Result<(), RPCError> { - let mut sni_builder = builder.reborrow().init_signed_node_info(); - encode_signed_node_info(&self.signed_node_info, &mut sni_builder)?; - Ok(()) - } -} diff --git a/veilid-core/src/rpc_processor/coders/operations/statement.rs b/veilid-core/src/rpc_processor/coders/operations/statement.rs index 3a019dce..1c2ddf59 100644 --- a/veilid-core/src/rpc_processor/coders/operations/statement.rs +++ b/veilid-core/src/rpc_processor/coders/operations/statement.rs @@ -18,12 +18,9 @@ impl RPCStatement { pub fn desc(&self) -> &'static str { self.detail.desc() } - pub fn decode( - reader: &veilid_capnp::statement::Reader, - opt_sender_node_id: Option<&DHTKey>, - ) -> Result { + pub fn decode(reader: &veilid_capnp::statement::Reader) -> Result { let d_reader = reader.get_detail(); - let detail = RPCStatementDetail::decode(&d_reader, opt_sender_node_id)?; + let detail = RPCStatementDetail::decode(&d_reader)?; Ok(RPCStatement { detail }) } pub fn encode(&self, builder: &mut veilid_capnp::statement::Builder) -> Result<(), RPCError> { @@ -36,7 +33,6 @@ impl RPCStatement { pub enum RPCStatementDetail { ValidateDialInfo(RPCOperationValidateDialInfo), Route(RPCOperationRoute), - NodeInfoUpdate(RPCOperationNodeInfoUpdate), ValueChanged(RPCOperationValueChanged), Signal(RPCOperationSignal), ReturnReceipt(RPCOperationReturnReceipt), @@ -48,7 +44,6 @@ impl RPCStatementDetail { match self { RPCStatementDetail::ValidateDialInfo(_) => "ValidateDialInfo", RPCStatementDetail::Route(_) => "Route", - RPCStatementDetail::NodeInfoUpdate(_) => "NodeInfoUpdate", RPCStatementDetail::ValueChanged(_) => "ValueChanged", RPCStatementDetail::Signal(_) => "Signal", RPCStatementDetail::ReturnReceipt(_) => "ReturnReceipt", @@ -57,7 +52,6 @@ impl RPCStatementDetail { } pub fn decode( reader: &veilid_capnp::statement::detail::Reader, - opt_sender_node_id: Option<&DHTKey>, ) -> Result { let which_reader = reader.which().map_err(RPCError::protocol)?; let out = match which_reader { @@ -71,11 +65,6 @@ impl RPCStatementDetail { let out = RPCOperationRoute::decode(&op_reader)?; RPCStatementDetail::Route(out) } - veilid_capnp::statement::detail::NodeInfoUpdate(r) => { - let op_reader = r.map_err(RPCError::protocol)?; - let out = RPCOperationNodeInfoUpdate::decode(&op_reader, opt_sender_node_id)?; - RPCStatementDetail::NodeInfoUpdate(out) - } veilid_capnp::statement::detail::ValueChanged(r) => { let op_reader = r.map_err(RPCError::protocol)?; let out = RPCOperationValueChanged::decode(&op_reader)?; @@ -108,9 +97,6 @@ impl RPCStatementDetail { d.encode(&mut builder.reborrow().init_validate_dial_info()) } RPCStatementDetail::Route(d) => d.encode(&mut builder.reborrow().init_route()), - RPCStatementDetail::NodeInfoUpdate(d) => { - d.encode(&mut builder.reborrow().init_node_info_update()) - } RPCStatementDetail::ValueChanged(d) => { d.encode(&mut builder.reborrow().init_value_changed()) } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 49247749..77d0dda2 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -9,7 +9,6 @@ mod rpc_error; mod rpc_find_block; mod rpc_find_node; mod rpc_get_value; -mod rpc_node_info_update; mod rpc_return_receipt; mod rpc_route; mod rpc_set_value; @@ -113,16 +112,6 @@ impl RPCMessageData { } } -// impl ReaderSegments for RPCMessageData { -// fn get_segment(&self, idx: u32) -> Option<&[u8]> { -// if idx > 0 { -// None -// } else { -// Some(self.contents.as_slice()) -// } -// } -// } - #[derive(Debug)] struct RPCMessageEncoded { header: RPCMessageHeader, @@ -145,25 +134,8 @@ where .map_err(RPCError::protocol) .map_err(logthru_rpc!())?; Ok(buffer) - // let wordvec = builder - // .into_reader() - // .canonicalize() - // .map_err(RPCError::protocol) - // .map_err(logthru_rpc!())?; - // Ok(capnp::Word::words_to_bytes(wordvec.as_slice()).to_vec()) } -// fn reader_to_vec<'a, T>(reader: &capnp::message::Reader) -> Result, RPCError> -// where -// T: capnp::message::ReaderSegments + 'a, -// { -// let wordvec = reader -// .canonicalize() -// .map_err(RPCError::protocol) -// .map_err(logthru_rpc!())?; -// Ok(capnp::Word::words_to_bytes(wordvec.as_slice()).to_vec()) -// } - #[derive(Debug)] struct WaitableReply { handle: OperationWaitHandle, @@ -209,7 +181,7 @@ struct RenderedOperation { /// Node information exchanged during every RPC message #[derive(Default, Debug, Clone)] -struct SenderSignedNodeInfo { +pub struct SenderSignedNodeInfo { /// The current signed node info of the sender if required signed_node_info: Option, /// The last timestamp of the target's node info to assist remote node with sending its latest node info @@ -558,8 +530,8 @@ impl RPCProcessor { safety_route: compiled_route.safety_route, operation, }; - let ssni_route = - self.get_sender_signed_node_info(&Destination::direct(compiled_route.first_hop))?; + let ssni_route = self + .get_sender_signed_node_info(&Destination::direct(compiled_route.first_hop.clone()))?; let operation = RPCOperation::new_statement( RPCStatement::new(RPCStatementDetail::Route(route_operation)), ssni_route, @@ -1334,7 +1306,6 @@ impl RPCProcessor { self.process_validate_dial_info(msg).await } RPCStatementDetail::Route(_) => self.process_route(msg).await, - RPCStatementDetail::NodeInfoUpdate(_) => self.process_node_info_update(msg).await, RPCStatementDetail::ValueChanged(_) => self.process_value_changed(msg).await, RPCStatementDetail::Signal(_) => self.process_signal(msg).await, RPCStatementDetail::ReturnReceipt(_) => self.process_return_receipt(msg).await, diff --git a/veilid-core/src/rpc_processor/rpc_node_info_update.rs b/veilid-core/src/rpc_processor/rpc_node_info_update.rs deleted file mode 100644 index 427ec30f..00000000 --- a/veilid-core/src/rpc_processor/rpc_node_info_update.rs +++ /dev/null @@ -1,84 +0,0 @@ -use super::*; - -impl RPCProcessor { - // Sends a our node info to another node - #[instrument(level = "trace", skip(self), ret, err)] - pub async fn rpc_call_node_info_update( - self, - target: NodeRef, - routing_domain: RoutingDomain, - ) -> Result, RPCError> { - // Get the signed node info for the desired routing domain to send update with - let signed_node_info = self - .routing_table() - .get_own_peer_info(routing_domain) - .signed_node_info; - let node_info_update = RPCOperationNodeInfoUpdate { signed_node_info }; - let statement = RPCStatement::new(RPCStatementDetail::NodeInfoUpdate(node_info_update)); - - // Send the node_info_update request to the specific routing domain requested - network_result_try!( - self.statement( - Destination::direct( - target.filtered_clone(NodeRefFilter::new().with_routing_domain(routing_domain)) - ), - statement, - ) - .await? - ); - - Ok(NetworkResult::value(())) - } - - #[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)] - pub(crate) async fn process_node_info_update( - &self, - msg: RPCMessage, - ) -> Result, RPCError> { - let detail = match msg.header.detail { - RPCMessageHeaderDetail::Direct(detail) => detail, - RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => { - return Ok(NetworkResult::invalid_message( - "node_info_update must be direct", - )); - } - }; - let sender_node_id = detail.envelope.get_sender_id(); - let routing_domain = detail.routing_domain; - - // Get the statement - let node_info_update = match msg.operation.into_kind() { - RPCOperationKind::Statement(s) => match s.into_detail() { - RPCStatementDetail::NodeInfoUpdate(s) => s, - _ => panic!("not a node info update"), - }, - _ => panic!("not a statement"), - }; - - // Update our routing table with signed node info - if !self.filter_node_info(routing_domain, &node_info_update.signed_node_info) { - return Ok(NetworkResult::invalid_message(format!( - "node info doesn't belong in {:?} routing domain: {}", - routing_domain, sender_node_id - ))); - } - - if self - .routing_table() - .register_node_with_signed_node_info( - routing_domain, - sender_node_id, - node_info_update.signed_node_info, - false, - ) - .is_none() - { - return Ok(NetworkResult::invalid_message(format!( - "could not register node info update {}", - sender_node_id - ))); - } - - Ok(NetworkResult::value(())) - } -}