more refactor checkpoint

This commit is contained in:
John Smith 2022-09-04 15:40:35 -04:00
parent 79cda4a712
commit 75ade4200a
12 changed files with 65 additions and 67 deletions

View File

@ -537,15 +537,13 @@ impl NetworkManager {
} }
pub async fn tick(&self) -> EyreResult<()> { pub async fn tick(&self) -> EyreResult<()> {
let (routing_table, net, receipt_manager, protocol_config) = { let (routing_table, net, receipt_manager) = {
let inner = self.inner.lock(); let inner = self.inner.lock();
let components = inner.components.as_ref().unwrap(); let components = inner.components.as_ref().unwrap();
let protocol_config = inner.protocol_config.as_ref().unwrap();
( (
inner.routing_table.as_ref().unwrap().clone(), inner.routing_table.as_ref().unwrap().clone(),
components.net.clone(), components.net.clone(),
components.receipt_manager.clone(), components.receipt_manager.clone(),
protocol_config.clone(),
) )
}; };

View File

@ -608,7 +608,7 @@ impl Network {
self.unlocked_inner self.unlocked_inner
.interfaces .interfaces
.with_interfaces(|interfaces| { .with_interfaces(|interfaces| {
for (name, intf) in interfaces { for (_name, intf) in interfaces {
// Skip networks that we should never encounter // Skip networks that we should never encounter
if intf.is_loopback() || !intf.is_running() { if intf.is_loopback() || !intf.is_running() {
return; return;

View File

@ -411,12 +411,6 @@ impl NetworkManager {
// Get all nodes needing pings in the LocalNetwork routing domain // Get all nodes needing pings in the LocalNetwork routing domain
let node_refs = routing_table.get_nodes_needing_ping(RoutingDomain::LocalNetwork, cur_ts); let node_refs = routing_table.get_nodes_needing_ping(RoutingDomain::LocalNetwork, cur_ts);
// Get our LocalNetwork dial info
let dids = routing_table.all_filtered_dial_info_details(
RoutingDomain::LocalNetwork.into(),
&DialInfoFilter::all(),
);
// For all nodes needing pings, figure out how many and over what protocols // For all nodes needing pings, figure out how many and over what protocols
for nr in node_refs { for nr in node_refs {
let rpc = rpc.clone(); let rpc = rpc.clone();
@ -437,8 +431,6 @@ impl NetworkManager {
_last_ts: u64, _last_ts: u64,
cur_ts: u64, cur_ts: u64,
) -> EyreResult<()> { ) -> EyreResult<()> {
let rpc = self.rpc_processor();
let routing_table = self.routing_table();
let mut unord = FuturesUnordered::new(); let mut unord = FuturesUnordered::new();
// PublicInternet // PublicInternet

View File

@ -192,8 +192,8 @@ impl BucketEntryInner {
for routing_domain in routing_domain_set { for routing_domain in routing_domain_set {
// Get the correct signed_node_info for the chosen routing domain // Get the correct signed_node_info for the chosen routing domain
let opt_current_sni = match routing_domain { let opt_current_sni = match routing_domain {
RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, RoutingDomain::LocalNetwork => &self.local_network.signed_node_info,
RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, RoutingDomain::PublicInternet => &self.public_internet.signed_node_info,
}; };
if opt_current_sni.is_some() { if opt_current_sni.is_some() {
return true; return true;
@ -204,24 +204,24 @@ impl BucketEntryInner {
pub fn node_info(&self, routing_domain: RoutingDomain) -> Option<&NodeInfo> { pub fn node_info(&self, routing_domain: RoutingDomain) -> Option<&NodeInfo> {
let opt_current_sni = match routing_domain { let opt_current_sni = match routing_domain {
RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, RoutingDomain::LocalNetwork => &self.local_network.signed_node_info,
RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, RoutingDomain::PublicInternet => &self.public_internet.signed_node_info,
}; };
opt_current_sni.as_ref().map(|s| &s.node_info) opt_current_sni.as_ref().map(|s| &s.node_info)
} }
pub fn signed_node_info(&self, routing_domain: RoutingDomain) -> Option<&SignedNodeInfo> { pub fn signed_node_info(&self, routing_domain: RoutingDomain) -> Option<&SignedNodeInfo> {
let opt_current_sni = match routing_domain { let opt_current_sni = match routing_domain {
RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, RoutingDomain::LocalNetwork => &self.local_network.signed_node_info,
RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, RoutingDomain::PublicInternet => &self.public_internet.signed_node_info,
}; };
opt_current_sni.as_ref().map(|s| s.as_ref()) opt_current_sni.as_ref().map(|s| s.as_ref())
} }
pub fn make_peer_info(&self, key: DHTKey, routing_domain: RoutingDomain) -> Option<PeerInfo> { pub fn make_peer_info(&self, key: DHTKey, routing_domain: RoutingDomain) -> Option<PeerInfo> {
let opt_current_sni = match routing_domain { let opt_current_sni = match routing_domain {
RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, RoutingDomain::LocalNetwork => &self.local_network.signed_node_info,
RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, RoutingDomain::PublicInternet => &self.public_internet.signed_node_info,
}; };
opt_current_sni.as_ref().map(|s| PeerInfo { opt_current_sni.as_ref().map(|s| PeerInfo {
node_id: NodeId::new(key), node_id: NodeId::new(key),
@ -235,10 +235,10 @@ impl BucketEntryInner {
) -> Option<RoutingDomain> { ) -> Option<RoutingDomain> {
for routing_domain in routing_domain_set { for routing_domain in routing_domain_set {
let opt_current_sni = match routing_domain { let opt_current_sni = match routing_domain {
RoutingDomain::LocalNetwork => &mut self.local_network.signed_node_info, RoutingDomain::LocalNetwork => &self.local_network.signed_node_info,
RoutingDomain::PublicInternet => &mut self.public_internet.signed_node_info, RoutingDomain::PublicInternet => &self.public_internet.signed_node_info,
}; };
if let Some(sni) = opt_current_sni { if opt_current_sni.is_some() {
return Some(routing_domain); return Some(routing_domain);
} }
} }
@ -265,10 +265,10 @@ impl BucketEntryInner {
} }
// Gets the best 'last connection' that matches a set of routing domain, protocol types and address types // Gets the best 'last connection' that matches a set of routing domain, protocol types and address types
pub fn last_connection( pub(super) fn last_connection(
&self, &self,
routing_table_inner: &RoutingTableInner, routing_table_inner: &RoutingTableInner,
node_ref_filter: &Option<NodeRefFilter>, node_ref_filter: Option<NodeRefFilter>,
) -> Option<(ConnectionDescriptor, u64)> { ) -> Option<(ConnectionDescriptor, u64)> {
// Iterate peer scopes and protocol types and address type in order to ensure we pick the preferred protocols if all else is the same // Iterate peer scopes and protocol types and address type in order to ensure we pick the preferred protocols if all else is the same
let nrf = node_ref_filter.unwrap_or_default(); let nrf = node_ref_filter.unwrap_or_default();
@ -327,11 +327,13 @@ impl BucketEntryInner {
RoutingDomain::LocalNetwork => self RoutingDomain::LocalNetwork => self
.local_network .local_network
.node_status .node_status
.map(|ln| NodeStatus::LocalNetwork(ln)), .as_ref()
.map(|ln| NodeStatus::LocalNetwork(ln.clone())),
RoutingDomain::PublicInternet => self RoutingDomain::PublicInternet => self
.public_internet .public_internet
.node_status .node_status
.map(|pi| NodeStatus::PublicInternet(pi)), .as_ref()
.map(|pi| NodeStatus::PublicInternet(pi.clone())),
} }
} }
@ -426,7 +428,7 @@ impl BucketEntryInner {
} }
// Check if this node needs a ping right now to validate it is still reachable // Check if this node needs a ping right now to validate it is still reachable
pub(super) fn needs_ping(&self, node_id: &DHTKey, cur_ts: u64, needs_keepalive: bool) -> bool { pub(super) fn needs_ping(&self, cur_ts: u64, needs_keepalive: bool) -> bool {
// See which ping pattern we are to use // See which ping pattern we are to use
let state = self.state(cur_ts); let state = self.state(cur_ts);
@ -605,7 +607,7 @@ impl BucketEntry {
} }
} }
pub fn with<F, R>(&self, f: F) -> R pub(super) fn with<F, R>(&self, f: F) -> R
where where
F: FnOnce(&BucketEntryInner) -> R, F: FnOnce(&BucketEntryInner) -> R,
{ {
@ -613,7 +615,7 @@ impl BucketEntry {
f(&*inner) f(&*inner)
} }
pub fn with_mut<F, R>(&self, f: F) -> R pub(super) fn with_mut<F, R>(&self, f: F) -> R
where where
F: FnOnce(&mut BucketEntryInner) -> R, F: FnOnce(&mut BucketEntryInner) -> R,
{ {

View File

@ -132,7 +132,7 @@ impl RoutingTable {
// does it have some dial info we need? // does it have some dial info we need?
let filter = |n: &NodeInfo| { let filter = |n: &NodeInfo| {
let mut keep = false; let mut keep = false;
for did in n.dial_info_detail_list { for did in &n.dial_info_detail_list {
if matches!(did.dial_info.address_type(), AddressType::IPV4) { if matches!(did.dial_info.address_type(), AddressType::IPV4) {
for (n, protocol_type) in protocol_types.iter().enumerate() { for (n, protocol_type) in protocol_types.iter().enumerate() {
if nodes_proto_v4[n] < max_per_type if nodes_proto_v4[n] < max_per_type
@ -250,7 +250,7 @@ impl RoutingTable {
&self, &self,
node_count: usize, node_count: usize,
mut filter: F, mut filter: F,
mut transform: T, transform: T,
) -> Vec<O> ) -> Vec<O>
where where
F: FnMut(DHTKey, Option<Arc<BucketEntry>>) -> bool, F: FnMut(DHTKey, Option<Arc<BucketEntry>>) -> bool,
@ -331,7 +331,7 @@ impl RoutingTable {
pub fn find_closest_nodes<F, T, O>( pub fn find_closest_nodes<F, T, O>(
&self, &self,
node_id: DHTKey, node_id: DHTKey,
mut filter: F, filter: F,
mut transform: T, mut transform: T,
) -> Vec<O> ) -> Vec<O>
where where

View File

@ -140,7 +140,7 @@ impl RoutingTable {
self.inner.read().node_id_secret self.inner.read().node_id_secret
} }
pub fn routing_domain_for_address_inner( fn routing_domain_for_address_inner(
inner: &RoutingTableInner, inner: &RoutingTableInner,
address: Address, address: Address,
) -> Option<RoutingDomain> { ) -> Option<RoutingDomain> {
@ -189,7 +189,7 @@ impl RoutingTable {
} }
pub fn set_relay_node(&self, domain: RoutingDomain, opt_relay_node: Option<NodeRef>) { pub fn set_relay_node(&self, domain: RoutingDomain, opt_relay_node: Option<NodeRef>) {
let inner = self.inner.write(); let mut inner = self.inner.write();
Self::with_routing_domain_mut(&mut *inner, domain, |rd| rd.set_relay_node(opt_relay_node)); Self::with_routing_domain_mut(&mut *inner, domain, |rd| rd.set_relay_node(opt_relay_node));
} }
@ -275,13 +275,13 @@ impl RoutingTable {
return false; return false;
} }
// Ensure all of the dial info works in this routing domain // Ensure all of the dial info works in this routing domain
for did in node_info.dial_info_detail_list { for did in &node_info.dial_info_detail_list {
if !self.ensure_dial_info_is_valid(routing_domain, &did.dial_info) { if !self.ensure_dial_info_is_valid(routing_domain, &did.dial_info) {
return false; return false;
} }
} }
// Ensure the relay is also valid in this routing domain if it is provided // Ensure the relay is also valid in this routing domain if it is provided
if let Some(relay_peer_info) = node_info.relay_peer_info { if let Some(relay_peer_info) = node_info.relay_peer_info.as_ref() {
let relay_ni = &relay_peer_info.signed_node_info.node_info; let relay_ni = &relay_peer_info.signed_node_info.node_info;
if !self.node_info_is_valid_in_routing_domain(routing_domain, relay_ni) { if !self.node_info_is_valid_in_routing_domain(routing_domain, relay_ni) {
return false; return false;
@ -620,7 +620,7 @@ impl RoutingTable {
Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| { Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
if v.with(|e| { if v.with(|e| {
e.has_node_info(routing_domain.into()) e.has_node_info(routing_domain.into())
&& e.needs_ping(&k, cur_ts, opt_relay_id == Some(k)) && e.needs_ping(cur_ts, opt_relay_id == Some(k))
}) { }) {
node_refs.push(NodeRef::new( node_refs.push(NodeRef::new(
self.clone(), self.clone(),
@ -834,7 +834,7 @@ impl RoutingTable {
.collect() .collect()
} }
pub fn touch_recent_peer( fn touch_recent_peer(
inner: &mut RoutingTableInner, inner: &mut RoutingTableInner,
node_id: DHTKey, node_id: DHTKey,
last_connection: ConnectionDescriptor, last_connection: ConnectionDescriptor,

View File

@ -150,13 +150,15 @@ impl NodeRef {
pub fn routing_domain_set(&self) -> RoutingDomainSet { pub fn routing_domain_set(&self) -> RoutingDomainSet {
self.filter self.filter
.as_ref()
.map(|f| f.routing_domain_set) .map(|f| f.routing_domain_set)
.unwrap_or(RoutingDomainSet::all()) .unwrap_or(RoutingDomainSet::all())
} }
pub fn dial_info_filter(&self) -> DialInfoFilter { pub fn dial_info_filter(&self) -> DialInfoFilter {
self.filter self.filter
.map(|f| f.dial_info_filter) .as_ref()
.map(|f| f.dial_info_filter.clone())
.unwrap_or(DialInfoFilter::all()) .unwrap_or(DialInfoFilter::all())
} }
@ -164,6 +166,7 @@ impl NodeRef {
self.operate(|_rti, e| { self.operate(|_rti, e| {
e.best_routing_domain( e.best_routing_domain(
self.filter self.filter
.as_ref()
.map(|f| f.routing_domain_set) .map(|f| f.routing_domain_set)
.unwrap_or(RoutingDomainSet::all()), .unwrap_or(RoutingDomainSet::all()),
) )
@ -235,8 +238,10 @@ impl NodeRef {
dif dif
} }
pub fn relay(&self, routing_domain: RoutingDomain) -> Option<NodeRef> { pub fn relay(&self, routing_domain: RoutingDomain) -> Option<NodeRef> {
let target_rpi = let target_rpi = self.operate(|_rti, e| {
self.operate(|_rt, e| e.node_info(routing_domain).map(|n| n.relay_peer_info))?; e.node_info(routing_domain)
.map(|n| n.relay_peer_info.as_ref().map(|pi| pi.as_ref().clone()))
})?;
target_rpi.and_then(|t| { target_rpi.and_then(|t| {
// If relay is ourselves, then return None, because we can't relay through ourselves // If relay is ourselves, then return None, because we can't relay through ourselves
// and to contact this node we should have had an existing inbound connection // and to contact this node we should have had an existing inbound connection
@ -294,7 +299,7 @@ impl NodeRef {
pub async fn last_connection(&self) -> Option<ConnectionDescriptor> { pub async fn last_connection(&self) -> Option<ConnectionDescriptor> {
// Get the last connection and the last time we saw anything with this connection // Get the last connection and the last time we saw anything with this connection
let (last_connection, last_seen) = let (last_connection, last_seen) =
self.operate(|rti, e| e.last_connection(rti, &self.filter))?; self.operate(|rti, e| e.last_connection(rti, self.filter.clone()))?;
// Should we check the connection table? // Should we check the connection table?
if last_connection.protocol_type().is_connection_oriented() { if last_connection.protocol_type().is_connection_oriented() {

View File

@ -57,7 +57,7 @@ pub struct LocalInternetRoutingDomainDetail {
} }
impl LocalInternetRoutingDomainDetail { impl LocalInternetRoutingDomainDetail {
pub fn set_local_networks(&mut self, local_networks: Vec<(IpAddr, IpAddr)>) -> bool { pub fn set_local_networks(&mut self, mut local_networks: Vec<(IpAddr, IpAddr)>) -> bool {
local_networks.sort(); local_networks.sort();
if local_networks == self.local_networks { if local_networks == self.local_networks {
return false; return false;
@ -70,7 +70,7 @@ impl LocalInternetRoutingDomainDetail {
impl RoutingDomainDetail for LocalInternetRoutingDomainDetail { impl RoutingDomainDetail for LocalInternetRoutingDomainDetail {
fn can_contain_address(&self, address: Address) -> bool { fn can_contain_address(&self, address: Address) -> bool {
let ip = address.to_ip_addr(); let ip = address.to_ip_addr();
for localnet in self.local_networks { for localnet in &self.local_networks {
if ipaddr_in_network(ip, localnet.0, localnet.1) { if ipaddr_in_network(ip, localnet.0, localnet.1) {
return true; return true;
} }

View File

@ -25,7 +25,7 @@ impl RPCOperationKind {
let out = match which_reader { let out = match which_reader {
veilid_capnp::operation::kind::Which::Question(r) => { veilid_capnp::operation::kind::Which::Question(r) => {
let q_reader = r.map_err(RPCError::protocol)?; let q_reader = r.map_err(RPCError::protocol)?;
let out = RPCQuestion::decode(&q_reader, sender_node_id)?; let out = RPCQuestion::decode(&q_reader)?;
RPCOperationKind::Question(out) RPCOperationKind::Question(out)
} }
veilid_capnp::operation::kind::Which::Statement(r) => { veilid_capnp::operation::kind::Which::Statement(r) => {
@ -137,12 +137,12 @@ impl RPCOperation {
pub fn encode(&self, builder: &mut veilid_capnp::operation::Builder) -> Result<(), RPCError> { pub fn encode(&self, builder: &mut veilid_capnp::operation::Builder) -> Result<(), RPCError> {
builder.set_op_id(self.op_id); builder.set_op_id(self.op_id);
let mut k_builder = builder.reborrow().init_kind(); if let Some(sender_info) = &self.sender_node_info {
self.kind.encode(&mut k_builder)?; let mut si_builder = builder.reborrow().init_sender_node_info();
if let Some(sender_info) = self.sender_node_info {
let si_builder = builder.reborrow().init_sender_node_info();
encode_signed_node_info(&sender_info, &mut si_builder)?; encode_signed_node_info(&sender_info, &mut si_builder)?;
} }
let mut k_builder = builder.reborrow().init_kind();
self.kind.encode(&mut k_builder)?;
Ok(()) Ok(())
} }
} }

View File

@ -21,10 +21,7 @@ impl RPCQuestion {
pub fn desc(&self) -> &'static str { pub fn desc(&self) -> &'static str {
self.detail.desc() self.detail.desc()
} }
pub fn decode( pub fn decode(reader: &veilid_capnp::question::Reader) -> Result<RPCQuestion, RPCError> {
reader: &veilid_capnp::question::Reader,
sender_node_id: &DHTKey,
) -> Result<RPCQuestion, RPCError> {
let rt_reader = reader.get_respond_to(); let rt_reader = reader.get_respond_to();
let respond_to = RespondTo::decode(&rt_reader)?; let respond_to = RespondTo::decode(&rt_reader)?;
let d_reader = reader.get_detail(); let d_reader = reader.get_detail();

View File

@ -103,16 +103,16 @@ impl Destination {
pub fn safety_route_spec(&self) -> Option<Arc<SafetyRouteSpec>> { pub fn safety_route_spec(&self) -> Option<Arc<SafetyRouteSpec>> {
match self { match self {
Destination::Direct { Destination::Direct {
target, target: _,
safety_route_spec, safety_route_spec,
} => safety_route_spec.clone(), } => safety_route_spec.clone(),
Destination::Relay { Destination::Relay {
relay, relay: _,
target, target: _,
safety_route_spec, safety_route_spec,
} => safety_route_spec.clone(), } => safety_route_spec.clone(),
Destination::PrivateRoute { Destination::PrivateRoute {
private_route, private_route: _,
safety_route_spec, safety_route_spec,
} => safety_route_spec.clone(), } => safety_route_spec.clone(),
} }
@ -154,6 +154,7 @@ impl fmt::Display for Destination {
safety_route_spec, safety_route_spec,
} => { } => {
let sr = safety_route_spec let sr = safety_route_spec
.as_ref()
.map(|_sr| "+SR".to_owned()) .map(|_sr| "+SR".to_owned())
.unwrap_or_default(); .unwrap_or_default();
@ -165,6 +166,7 @@ impl fmt::Display for Destination {
safety_route_spec, safety_route_spec,
} => { } => {
let sr = safety_route_spec let sr = safety_route_spec
.as_ref()
.map(|_sr| "+SR".to_owned()) .map(|_sr| "+SR".to_owned())
.unwrap_or_default(); .unwrap_or_default();
@ -175,6 +177,7 @@ impl fmt::Display for Destination {
safety_route_spec, safety_route_spec,
} => { } => {
let sr = safety_route_spec let sr = safety_route_spec
.as_ref()
.map(|_sr| "+SR".to_owned()) .map(|_sr| "+SR".to_owned())
.unwrap_or_default(); .unwrap_or_default();

View File

@ -323,7 +323,7 @@ impl RPCProcessor {
.await .await
.into_timeout_or(); .into_timeout_or();
Ok(res.map(|res| { Ok(res.map(|res| {
let (span_id, rpcreader) = res.take_value().unwrap(); let (_span_id, rpcreader) = res.take_value().unwrap();
let end_ts = intf::get_timestamp(); let end_ts = intf::get_timestamp();
// fixme: causes crashes? "Missing otel data span extensions"?? // fixme: causes crashes? "Missing otel data span extensions"??
@ -385,13 +385,13 @@ impl RPCProcessor {
// To where are we sending the request // To where are we sending the request
match dest { match dest {
Destination::Direct { Destination::Direct {
target: node_ref, target: ref node_ref,
safety_route_spec, ref safety_route_spec,
} }
| Destination::Relay { | Destination::Relay {
relay: node_ref, relay: ref node_ref,
target: _, target: _,
safety_route_spec, ref safety_route_spec,
} => { } => {
// Send to a node without a private route // Send to a node without a private route
// -------------------------------------- // --------------------------------------
@ -399,7 +399,7 @@ impl RPCProcessor {
// Get the actual destination node id accounting for relays // Get the actual destination node id accounting for relays
let (node_ref, node_id) = if let Destination::Relay { let (node_ref, node_id) = if let Destination::Relay {
relay: _, relay: _,
target: dht_key, target: ref dht_key,
safety_route_spec: _, safety_route_spec: _,
} = dest } = dest
{ {
@ -410,7 +410,7 @@ impl RPCProcessor {
}; };
// Handle the existence of safety route // Handle the existence of safety route
match safety_route_spec { match safety_route_spec.as_ref() {
None => { None => {
// If no safety route is being used, and we're not sending to a private // If no safety route is being used, and we're not sending to a private
// route, we can use a direct envelope instead of routing // route, we can use a direct envelope instead of routing
@ -434,7 +434,8 @@ impl RPCProcessor {
.dial_info .dial_info
.node_id .node_id
.key; .key;
out_message = self.wrap_with_route(Some(sr), private_route, message_vec)?; out_message =
self.wrap_with_route(Some(sr.clone()), private_route, message_vec)?;
out_hop_count = 1 + sr.hops.len(); out_hop_count = 1 + sr.hops.len();
} }
}; };
@ -892,7 +893,7 @@ impl RPCProcessor {
stop_token: StopToken, stop_token: StopToken,
receiver: flume::Receiver<(Option<Id>, RPCMessageEncoded)>, receiver: flume::Receiver<(Option<Id>, RPCMessageEncoded)>,
) { ) {
while let Ok(Ok((span_id, msg))) = while let Ok(Ok((_span_id, msg))) =
receiver.recv_async().timeout_at(stop_token.clone()).await receiver.recv_async().timeout_at(stop_token.clone()).await
{ {
let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker"); let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker");