This commit is contained in:
John Smith 2022-12-08 12:48:01 -05:00
parent 3b1fb5aba1
commit 2b9044fdfa
16 changed files with 92 additions and 322 deletions

View File

@ -303,11 +303,6 @@ struct OperationRoute @0x96741859ce6ac7dd {
operation @1 :RoutedOperation; # The operation to be routed
}
struct OperationNodeInfoUpdate @0xc9647b32a48b66ce {
signedNodeInfo @0 :SignedNodeInfo; # Our signed node info
}
struct OperationAppCallQ @0xade67b9f09784507 {
message @0 :Data; # Opaque request to application
}
@ -466,12 +461,12 @@ struct Question @0xd8510bc33492ef70 {
findNodeQ @3 :OperationFindNodeQ;
# Routable operations
getValueQ @4 :OperationGetValueQ;
setValueQ @5 :OperationSetValueQ;
watchValueQ @6 :OperationWatchValueQ;
supplyBlockQ @7 :OperationSupplyBlockQ;
findBlockQ @8 :OperationFindBlockQ;
appCallQ @9 :OperationAppCallQ;
appCallQ @4 :OperationAppCallQ;
getValueQ @5 :OperationGetValueQ;
setValueQ @6 :OperationSetValueQ;
watchValueQ @7 :OperationWatchValueQ;
supplyBlockQ @8 :OperationSupplyBlockQ;
findBlockQ @9 :OperationFindBlockQ;
# Tunnel operations
startTunnelQ @10 :OperationStartTunnelQ;
@ -486,13 +481,12 @@ struct Statement @0x990e20828f404ae1 {
# Direct operations
validateDialInfo @0 :OperationValidateDialInfo;
route @1 :OperationRoute;
nodeInfoUpdate @2 :OperationNodeInfoUpdate;
# Routable operations
valueChanged @3 :OperationValueChanged;
signal @4 :OperationSignal;
returnReceipt @5 :OperationReturnReceipt;
appMessage @6 :OperationAppMessage;
signal @2 :OperationSignal;
returnReceipt @3 :OperationReturnReceipt;
appMessage @4 :OperationAppMessage;
valueChanged @5 :OperationValueChanged;
}
}
@ -504,12 +498,12 @@ struct Answer @0xacacb8b6988c1058 {
findNodeA @1 :OperationFindNodeA;
# Routable operations
getValueA @2 :OperationGetValueA;
setValueA @3 :OperationSetValueA;
watchValueA @4 :OperationWatchValueA;
supplyBlockA @5 :OperationSupplyBlockA;
findBlockA @6 :OperationFindBlockA;
appCallA @7 :OperationAppCallA;
appCallA @2 :OperationAppCallA;
getValueA @3 :OperationGetValueA;
setValueA @4 :OperationSetValueA;
watchValueA @5 :OperationWatchValueA;
supplyBlockA @6 :OperationSupplyBlockA;
findBlockA @7 :OperationFindBlockA;
# Tunnel operations
startTunnelA @8 :OperationStartTunnelA;

View File

@ -23,7 +23,7 @@ pub use network_connection::*;
use connection_handle::*;
use connection_limits::*;
use crypto::*;
use futures_util::stream::{FuturesUnordered, StreamExt};
use futures_util::stream::FuturesUnordered;
use hashlink::LruCache;
use intf::*;
#[cfg(not(target_arch = "wasm32"))]
@ -155,7 +155,6 @@ struct NetworkManagerUnlockedInner {
// Background processes
rolling_transfers_task: TickTask<EyreReport>,
public_address_check_task: TickTask<EyreReport>,
node_info_update_single_future: MustJoinSingleFuture<()>,
}
#[derive(Clone)]
@ -191,7 +190,6 @@ impl NetworkManager {
update_callback: RwLock::new(None),
rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS),
public_address_check_task: TickTask::new(PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS),
node_info_update_single_future: MustJoinSingleFuture::new(),
}
}
@ -1734,62 +1732,4 @@ impl NetworkManager {
}
}
// Inform routing table entries that our dial info has changed
pub async fn send_node_info_updates(&self, routing_domain: RoutingDomain, all: bool) {
let this = self.clone();
// Run in background only once
let _ = self
.clone()
.unlocked_inner
.node_info_update_single_future
.single_spawn(
async move {
// Only update if we actually have valid signed node info for this routing domain
if !this.routing_table().has_valid_own_node_info(routing_domain) {
trace!(
"not sending node info update because our network class is not yet valid"
);
return;
}
// Get the list of refs to all nodes to update
let cur_ts = get_timestamp();
let node_refs =
this.routing_table()
.get_nodes_needing_updates(routing_domain, cur_ts, all);
// Send the updates
log_net!(debug "Sending node info updates to {} nodes", node_refs.len());
let mut unord = FuturesUnordered::new();
for nr in node_refs {
let rpc = this.rpc_processor();
unord.push(
async move {
// Update the node
if let Err(e) = rpc
.rpc_call_node_info_update(nr.clone(), routing_domain)
.await
{
// Not fatal, but we should be able to see if this is happening
trace!("failed to send node info update to {:?}: {}", nr, e);
return;
}
// Mark the node as having seen our node info
nr.set_seen_our_node_info(routing_domain);
}
.instrument(Span::current()),
);
}
// Wait for futures to complete
while unord.next().await.is_some() {}
log_rtab!(debug "Finished sending node updates");
}
.instrument(Span::current()),
)
.await;
}
}

View File

@ -831,12 +831,10 @@ impl Network {
debug!("clearing dial info");
let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet);
editor.disable_node_info_updates();
editor.clear_dial_info_details();
editor.commit().await;
let mut editor = routing_table.edit_routing_domain(RoutingDomain::LocalNetwork);
editor.disable_node_info_updates();
editor.clear_dial_info_details();
editor.commit().await;

View File

@ -68,15 +68,5 @@ impl NetworkManager {
if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await {
warn!("rolling_transfers_task not stopped: {}", e);
}
debug!("stopping node info update singlefuture");
if self
.unlocked_inner
.node_info_update_single_future
.join()
.await
.is_err()
{
error!("node_info_update_single_future not stopped");
}
}
}

View File

@ -318,7 +318,6 @@ impl Network {
// Drop all dial info
let mut editor = routing_table.edit_routing_domain(RoutingDomain::PublicInternet);
editor.disable_node_info_updates();
editor.clear_dial_info_details();
editor.commit().await;

View File

@ -275,6 +275,30 @@ impl BucketEntryInner {
false
}
pub fn exists_in_routing_domain(
&self,
rti: &RoutingTableInner,
routing_domain: RoutingDomain,
) -> bool {
// Check node info
if self.has_node_info(routing_domain.into()) {
return true;
}
// Check connections
let connection_manager = rti.network_manager().connection_manager();
let last_connections = self.last_connections(
rti,
Some(NodeRefFilter::new().with_routing_domain(routing_domain)),
);
for lc in last_connections {
if connection_manager.get_connection(lc.0).is_some() {
return true;
}
}
false
}
pub fn node_info(&self, routing_domain: RoutingDomain) -> Option<&NodeInfo> {
let opt_current_sni = match routing_domain {
RoutingDomain::LocalNetwork => &self.local_network.signed_node_info,
@ -304,8 +328,10 @@ impl BucketEntryInner {
pub fn best_routing_domain(
&self,
rti: &RoutingTableInner,
routing_domain_set: RoutingDomainSet,
) -> Option<RoutingDomain> {
// Check node info
for routing_domain in routing_domain_set {
let opt_current_sni = match routing_domain {
RoutingDomain::LocalNetwork => &self.local_network.signed_node_info,
@ -315,7 +341,27 @@ impl BucketEntryInner {
return Some(routing_domain);
}
}
None
// Check connections
let mut best_routing_domain: Option<RoutingDomain> = None;
let connection_manager = rti.network_manager().connection_manager();
let last_connections = self.last_connections(
rti,
Some(NodeRefFilter::new().with_routing_domain_set(routing_domain_set)),
);
for lc in last_connections {
if connection_manager.get_connection(lc.0).is_some() {
if let Some(rd) = rti.routing_domain_for_address(lc.0.remote_address().address()) {
if let Some(brd) = best_routing_domain {
if rd < brd {
best_routing_domain = Some(rd);
}
} else {
best_routing_domain = Some(rd);
}
}
}
}
best_routing_domain
}
fn descriptor_to_key(&self, last_connection: ConnectionDescriptor) -> LastConnectionKey {

View File

@ -457,17 +457,6 @@ impl RoutingTable {
.get_entry_count(routing_domain_set, min_state)
}
pub fn get_nodes_needing_updates(
&self,
routing_domain: RoutingDomain,
cur_ts: u64,
all: bool,
) -> Vec<NodeRef> {
self.inner
.read()
.get_nodes_needing_updates(self.clone(), routing_domain, cur_ts, all)
}
pub fn get_nodes_needing_ping(
&self,
routing_domain: RoutingDomain,

View File

@ -87,8 +87,9 @@ pub trait NodeRefBase: Sized {
}
fn best_routing_domain(&self) -> Option<RoutingDomain> {
self.operate(|_rti, e| {
self.operate(|rti, e| {
e.best_routing_domain(
rti,
self.common()
.filter
.as_ref()

View File

@ -23,7 +23,6 @@ pub struct RoutingDomainEditor {
routing_table: RoutingTable,
routing_domain: RoutingDomain,
changes: Vec<RoutingDomainChange>,
send_node_info_updates: bool,
}
impl RoutingDomainEditor {
@ -32,13 +31,8 @@ impl RoutingDomainEditor {
routing_table,
routing_domain,
changes: Vec::new(),
send_node_info_updates: true,
}
}
#[instrument(level = "debug", skip(self))]
pub fn disable_node_info_updates(&mut self) {
self.send_node_info_updates = false;
}
#[instrument(level = "debug", skip(self))]
pub fn clear_dial_info_details(&mut self) {
@ -199,7 +193,7 @@ impl RoutingDomainEditor {
}
});
if changed {
// Allow signed node info updates at same timestamp from dead nodes if our network has changed
// Allow signed node info updates at same timestamp for otherwise dead nodes if our network has changed
inner.reset_all_updated_since_last_network_change();
}
}
@ -210,12 +204,5 @@ impl RoutingDomainEditor {
rss.reset();
}
}
// Send our updated node info to all the nodes in the routing table
if changed && self.send_node_info_updates {
let network_manager = self.routing_table.unlocked_inner.network_manager.clone();
network_manager
.send_node_info_updates(self.routing_domain, true)
.await;
}
}
}

View File

@ -428,7 +428,7 @@ impl RoutingTableInner {
let mut count = 0usize;
let cur_ts = get_timestamp();
self.with_entries(cur_ts, min_state, |rti, _, e| {
if e.with(rti, |_rti, e| e.best_routing_domain(routing_domain_set))
if e.with(rti, |rti, e| e.best_routing_domain(rti, routing_domain_set))
.is_some()
{
count += 1;
@ -487,29 +487,6 @@ impl RoutingTableInner {
None
}
pub fn get_nodes_needing_updates(
&self,
outer_self: RoutingTable,
routing_domain: RoutingDomain,
cur_ts: u64,
all: bool,
) -> Vec<NodeRef> {
let mut node_refs = Vec::<NodeRef>::with_capacity(self.bucket_entry_count);
self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, k, v| {
// Only update nodes that haven't seen our node info yet
if all || !v.with(rti, |_rti, e| e.has_seen_our_node_info(routing_domain)) {
node_refs.push(NodeRef::new(
outer_self.clone(),
k,
v,
Some(NodeRefFilter::new().with_routing_domain(routing_domain)),
));
}
Option::<()>::None
});
node_refs
}
pub fn get_nodes_needing_ping(
&self,
outer_self: RoutingTable,
@ -525,9 +502,22 @@ impl RoutingTableInner {
// Collect all entries that are 'needs_ping' and have some node info making them reachable somehow
let mut node_refs = Vec::<NodeRef>::with_capacity(self.bucket_entry_count);
self.with_entries(cur_ts, BucketEntryState::Unreliable, |rti, k, v| {
if v.with(rti, |_rti, e| {
e.has_node_info(routing_domain.into())
&& e.needs_ping(cur_ts, opt_relay_id == Some(k))
if v.with(rti, |rti, e| {
// If this isn't in the routing domain we are checking, don't include it
if !e.exists_in_routing_domain(rti, routing_domain) {
return false;
}
// If we need a ping via the normal timing mechanism, then do it
if e.needs_ping(cur_ts, opt_relay_id == Some(k)) {
return true;
}
// If we need a ping because this node hasn't seen our latest node info, then do it
if let Some(own_node_info_ts) = own_node_info_ts {
if !e.has_seen_our_node_info_ts(routing_domain, own_node_info_ts) {
return true;
}
}
false
}) {
node_refs.push(NodeRef::new(
outer_self.clone(),

View File

@ -7,7 +7,6 @@ mod operation_complete_tunnel;
mod operation_find_block;
mod operation_find_node;
mod operation_get_value;
mod operation_node_info_update;
mod operation_return_receipt;
mod operation_route;
mod operation_set_value;
@ -31,7 +30,6 @@ pub use operation_complete_tunnel::*;
pub use operation_find_block::*;
pub use operation_find_node::*;
pub use operation_get_value::*;
pub use operation_node_info_update::*;
pub use operation_return_receipt::*;
pub use operation_route::*;
pub use operation_set_value::*;

View File

@ -16,10 +16,7 @@ impl RPCOperationKind {
}
}
pub fn decode(
kind_reader: &veilid_capnp::operation::kind::Reader,
opt_sender_node_id: Option<&DHTKey>,
) -> Result<Self, RPCError> {
pub fn decode(kind_reader: &veilid_capnp::operation::kind::Reader) -> Result<Self, RPCError> {
let which_reader = kind_reader.which().map_err(RPCError::protocol)?;
let out = match which_reader {
veilid_capnp::operation::kind::Which::Question(r) => {
@ -29,7 +26,7 @@ impl RPCOperationKind {
}
veilid_capnp::operation::kind::Which::Statement(r) => {
let q_reader = r.map_err(RPCError::protocol)?;
let out = RPCStatement::decode(&q_reader, opt_sender_node_id)?;
let out = RPCStatement::decode(&q_reader)?;
RPCOperationKind::Statement(out)
}
veilid_capnp::operation::kind::Which::Answer(r) => {
@ -141,7 +138,7 @@ impl RPCOperation {
let target_node_info_ts = operation_reader.get_target_node_info_ts();
let kind_reader = operation_reader.get_kind();
let kind = RPCOperationKind::decode(&kind_reader, opt_sender_node_id)?;
let kind = RPCOperationKind::decode(&kind_reader)?;
Ok(RPCOperation {
op_id,

View File

@ -1,32 +0,0 @@
use super::*;
#[derive(Debug, Clone)]
pub struct RPCOperationNodeInfoUpdate {
pub signed_node_info: SignedNodeInfo,
}
impl RPCOperationNodeInfoUpdate {
pub fn decode(
reader: &veilid_capnp::operation_node_info_update::Reader,
opt_sender_node_id: Option<&DHTKey>,
) -> Result<RPCOperationNodeInfoUpdate, RPCError> {
if opt_sender_node_id.is_none() {
return Err(RPCError::protocol(
"can't decode node info update without sender node id",
));
}
let sender_node_id = opt_sender_node_id.unwrap();
let sni_reader = reader.get_signed_node_info().map_err(RPCError::protocol)?;
let signed_node_info = decode_signed_node_info(&sni_reader, sender_node_id)?;
Ok(RPCOperationNodeInfoUpdate { signed_node_info })
}
pub fn encode(
&self,
builder: &mut veilid_capnp::operation_node_info_update::Builder,
) -> Result<(), RPCError> {
let mut sni_builder = builder.reborrow().init_signed_node_info();
encode_signed_node_info(&self.signed_node_info, &mut sni_builder)?;
Ok(())
}
}

View File

@ -18,12 +18,9 @@ impl RPCStatement {
pub fn desc(&self) -> &'static str {
self.detail.desc()
}
pub fn decode(
reader: &veilid_capnp::statement::Reader,
opt_sender_node_id: Option<&DHTKey>,
) -> Result<RPCStatement, RPCError> {
pub fn decode(reader: &veilid_capnp::statement::Reader) -> Result<RPCStatement, RPCError> {
let d_reader = reader.get_detail();
let detail = RPCStatementDetail::decode(&d_reader, opt_sender_node_id)?;
let detail = RPCStatementDetail::decode(&d_reader)?;
Ok(RPCStatement { detail })
}
pub fn encode(&self, builder: &mut veilid_capnp::statement::Builder) -> Result<(), RPCError> {
@ -36,7 +33,6 @@ impl RPCStatement {
pub enum RPCStatementDetail {
ValidateDialInfo(RPCOperationValidateDialInfo),
Route(RPCOperationRoute),
NodeInfoUpdate(RPCOperationNodeInfoUpdate),
ValueChanged(RPCOperationValueChanged),
Signal(RPCOperationSignal),
ReturnReceipt(RPCOperationReturnReceipt),
@ -48,7 +44,6 @@ impl RPCStatementDetail {
match self {
RPCStatementDetail::ValidateDialInfo(_) => "ValidateDialInfo",
RPCStatementDetail::Route(_) => "Route",
RPCStatementDetail::NodeInfoUpdate(_) => "NodeInfoUpdate",
RPCStatementDetail::ValueChanged(_) => "ValueChanged",
RPCStatementDetail::Signal(_) => "Signal",
RPCStatementDetail::ReturnReceipt(_) => "ReturnReceipt",
@ -57,7 +52,6 @@ impl RPCStatementDetail {
}
pub fn decode(
reader: &veilid_capnp::statement::detail::Reader,
opt_sender_node_id: Option<&DHTKey>,
) -> Result<RPCStatementDetail, RPCError> {
let which_reader = reader.which().map_err(RPCError::protocol)?;
let out = match which_reader {
@ -71,11 +65,6 @@ impl RPCStatementDetail {
let out = RPCOperationRoute::decode(&op_reader)?;
RPCStatementDetail::Route(out)
}
veilid_capnp::statement::detail::NodeInfoUpdate(r) => {
let op_reader = r.map_err(RPCError::protocol)?;
let out = RPCOperationNodeInfoUpdate::decode(&op_reader, opt_sender_node_id)?;
RPCStatementDetail::NodeInfoUpdate(out)
}
veilid_capnp::statement::detail::ValueChanged(r) => {
let op_reader = r.map_err(RPCError::protocol)?;
let out = RPCOperationValueChanged::decode(&op_reader)?;
@ -108,9 +97,6 @@ impl RPCStatementDetail {
d.encode(&mut builder.reborrow().init_validate_dial_info())
}
RPCStatementDetail::Route(d) => d.encode(&mut builder.reborrow().init_route()),
RPCStatementDetail::NodeInfoUpdate(d) => {
d.encode(&mut builder.reborrow().init_node_info_update())
}
RPCStatementDetail::ValueChanged(d) => {
d.encode(&mut builder.reborrow().init_value_changed())
}

View File

@ -9,7 +9,6 @@ mod rpc_error;
mod rpc_find_block;
mod rpc_find_node;
mod rpc_get_value;
mod rpc_node_info_update;
mod rpc_return_receipt;
mod rpc_route;
mod rpc_set_value;
@ -113,16 +112,6 @@ impl RPCMessageData {
}
}
// impl ReaderSegments for RPCMessageData {
// fn get_segment(&self, idx: u32) -> Option<&[u8]> {
// if idx > 0 {
// None
// } else {
// Some(self.contents.as_slice())
// }
// }
// }
#[derive(Debug)]
struct RPCMessageEncoded {
header: RPCMessageHeader,
@ -145,25 +134,8 @@ where
.map_err(RPCError::protocol)
.map_err(logthru_rpc!())?;
Ok(buffer)
// let wordvec = builder
// .into_reader()
// .canonicalize()
// .map_err(RPCError::protocol)
// .map_err(logthru_rpc!())?;
// Ok(capnp::Word::words_to_bytes(wordvec.as_slice()).to_vec())
}
// fn reader_to_vec<'a, T>(reader: &capnp::message::Reader<T>) -> Result<Vec<u8>, RPCError>
// where
// T: capnp::message::ReaderSegments + 'a,
// {
// let wordvec = reader
// .canonicalize()
// .map_err(RPCError::protocol)
// .map_err(logthru_rpc!())?;
// Ok(capnp::Word::words_to_bytes(wordvec.as_slice()).to_vec())
// }
#[derive(Debug)]
struct WaitableReply {
handle: OperationWaitHandle<RPCMessage>,
@ -209,7 +181,7 @@ struct RenderedOperation {
/// Node information exchanged during every RPC message
#[derive(Default, Debug, Clone)]
struct SenderSignedNodeInfo {
pub struct SenderSignedNodeInfo {
/// The current signed node info of the sender if required
signed_node_info: Option<SignedNodeInfo>,
/// The last timestamp of the target's node info to assist remote node with sending its latest node info
@ -558,8 +530,8 @@ impl RPCProcessor {
safety_route: compiled_route.safety_route,
operation,
};
let ssni_route =
self.get_sender_signed_node_info(&Destination::direct(compiled_route.first_hop))?;
let ssni_route = self
.get_sender_signed_node_info(&Destination::direct(compiled_route.first_hop.clone()))?;
let operation = RPCOperation::new_statement(
RPCStatement::new(RPCStatementDetail::Route(route_operation)),
ssni_route,
@ -1334,7 +1306,6 @@ impl RPCProcessor {
self.process_validate_dial_info(msg).await
}
RPCStatementDetail::Route(_) => self.process_route(msg).await,
RPCStatementDetail::NodeInfoUpdate(_) => self.process_node_info_update(msg).await,
RPCStatementDetail::ValueChanged(_) => self.process_value_changed(msg).await,
RPCStatementDetail::Signal(_) => self.process_signal(msg).await,
RPCStatementDetail::ReturnReceipt(_) => self.process_return_receipt(msg).await,

View File

@ -1,84 +0,0 @@
use super::*;
impl RPCProcessor {
// Sends a our node info to another node
#[instrument(level = "trace", skip(self), ret, err)]
pub async fn rpc_call_node_info_update(
self,
target: NodeRef,
routing_domain: RoutingDomain,
) -> Result<NetworkResult<()>, RPCError> {
// Get the signed node info for the desired routing domain to send update with
let signed_node_info = self
.routing_table()
.get_own_peer_info(routing_domain)
.signed_node_info;
let node_info_update = RPCOperationNodeInfoUpdate { signed_node_info };
let statement = RPCStatement::new(RPCStatementDetail::NodeInfoUpdate(node_info_update));
// Send the node_info_update request to the specific routing domain requested
network_result_try!(
self.statement(
Destination::direct(
target.filtered_clone(NodeRefFilter::new().with_routing_domain(routing_domain))
),
statement,
)
.await?
);
Ok(NetworkResult::value(()))
}
#[instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err)]
pub(crate) async fn process_node_info_update(
&self,
msg: RPCMessage,
) -> Result<NetworkResult<()>, RPCError> {
let detail = match msg.header.detail {
RPCMessageHeaderDetail::Direct(detail) => detail,
RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
return Ok(NetworkResult::invalid_message(
"node_info_update must be direct",
));
}
};
let sender_node_id = detail.envelope.get_sender_id();
let routing_domain = detail.routing_domain;
// Get the statement
let node_info_update = match msg.operation.into_kind() {
RPCOperationKind::Statement(s) => match s.into_detail() {
RPCStatementDetail::NodeInfoUpdate(s) => s,
_ => panic!("not a node info update"),
},
_ => panic!("not a statement"),
};
// Update our routing table with signed node info
if !self.filter_node_info(routing_domain, &node_info_update.signed_node_info) {
return Ok(NetworkResult::invalid_message(format!(
"node info doesn't belong in {:?} routing domain: {}",
routing_domain, sender_node_id
)));
}
if self
.routing_table()
.register_node_with_signed_node_info(
routing_domain,
sender_node_id,
node_info_update.signed_node_info,
false,
)
.is_none()
{
return Ok(NetworkResult::invalid_message(format!(
"could not register node info update {}",
sender_node_id
)));
}
Ok(NetworkResult::value(()))
}
}