diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 1204a4a1..4bb5178a 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -1182,7 +1182,7 @@ impl NetworkManager { // Dial info filter comes from the target node ref let dial_info_filter = target_node_ref.dial_info_filter(); - let reliable = target_node_ref.reliable(); + let sequencing = target_node_ref.sequencing(); let cm = routing_table.get_contact_method( routing_domain, @@ -1191,7 +1191,7 @@ impl NetworkManager { &node_b_id, &node_b, dial_info_filter, - reliable, + sequencing, ); // Translate the raw contact method to a referenced contact method diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index efbdd077..3c1ed52c 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -263,7 +263,7 @@ impl RoutingTable { node_b_id: &DHTKey, node_b: &NodeInfo, dial_info_filter: DialInfoFilter, - reliable: bool, + sequencing: Sequencing, ) -> ContactMethod { self.inner.read().get_contact_method( routing_domain, @@ -272,7 +272,7 @@ impl RoutingTable { node_b_id, node_b, dial_info_filter, - reliable, + sequencing, ) } @@ -836,7 +836,7 @@ impl RoutingTable { .node_info(RoutingDomain::PublicInternet) .map(|n| { let dids = n.all_filtered_dial_info_details( - Some(DialInfoDetail::reliable_sort), // By default, choose reliable protocol for relay + Some(DialInfoDetail::ordered_sequencing_sort), // By default, choose connection-oriented protocol for relay |did| did.matches_filter(&outbound_dif), ); for did in &dids { @@ -889,6 +889,7 @@ impl RoutingTable { if let Some(best_inbound_relay) = best_inbound_relay.as_mut() { // Less is faster let better = best_inbound_relay.1.with(rti, |_rti, best| { + // choose low latency stability for relays BucketEntryInner::cmp_fastest_reliable(cur_ts, e, best) == std::cmp::Ordering::Less }); diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs index 19c14f9e..1e4385d8 100644 --- a/veilid-core/src/routing_table/node_ref.rs +++ b/veilid-core/src/routing_table/node_ref.rs @@ -71,7 +71,7 @@ pub struct NodeRef { node_id: DHTKey, entry: Arc, filter: Option, - reliable: bool, + sequencing: Sequencing, #[cfg(feature = "tracking")] track_id: usize, } @@ -90,7 +90,7 @@ impl NodeRef { node_id, entry, filter, - reliable: false, + sequencing: Sequencing::NoPreference, #[cfg(feature = "tracking")] track_id: entry.track(), } @@ -127,11 +127,11 @@ impl NodeRef { self.filter = filter } - pub fn set_reliable(&mut self) { - self.reliable = true; + pub fn set_sequencing(&mut self, sequencing: Sequencing) { + self.sequencing = sequencing; } - pub fn reliable(&self) -> bool { - self.reliable + pub fn sequencing(&self) -> Sequencing { + self.sequencing } pub fn merge_filter(&mut self, filter: NodeRefFilter) { @@ -278,10 +278,18 @@ impl NodeRef { let routing_domain_set = self.routing_domain_set(); let dial_info_filter = self.dial_info_filter(); - let sort = if self.reliable { - Some(DialInfoDetail::reliable_sort) - } else { - None + let (sort, dial_info_filter) = match self.sequencing { + Sequencing::NoPreference => (None, dial_info_filter), + Sequencing::PreferOrdered => ( + Some(DialInfoDetail::ordered_sequencing_sort), + dial_info_filter, + ), + Sequencing::EnsureOrdered => ( + Some(DialInfoDetail::ordered_sequencing_sort), + dial_info_filter.filtered( + &DialInfoFilter::all().with_protocol_type_set(ProtocolType::all_ordered_set()), + ), + ), }; self.operate(|_rt, e| { @@ -301,10 +309,18 @@ impl NodeRef { let routing_domain_set = self.routing_domain_set(); let dial_info_filter = self.dial_info_filter(); - let sort = if self.reliable { - Some(DialInfoDetail::reliable_sort) - } else { - None + let (sort, dial_info_filter) = match self.sequencing { + Sequencing::NoPreference => (None, dial_info_filter), + Sequencing::PreferOrdered => ( + Some(DialInfoDetail::ordered_sequencing_sort), + dial_info_filter, + ), + Sequencing::EnsureOrdered => ( + Some(DialInfoDetail::ordered_sequencing_sort), + dial_info_filter.filtered( + &DialInfoFilter::all().with_protocol_type_set(ProtocolType::all_ordered_set()), + ), + ), }; let mut out = Vec::new(); @@ -418,20 +434,20 @@ impl Clone for NodeRef { node_id: self.node_id, entry: self.entry.clone(), filter: self.filter.clone(), - reliable: self.reliable, + sequencing: self.sequencing, #[cfg(feature = "tracking")] track_id: e.track(), } } } -impl PartialEq for NodeRef { - fn eq(&self, other: &Self) -> bool { - self.node_id == other.node_id - } -} +// impl PartialEq for NodeRef { +// fn eq(&self, other: &Self) -> bool { +// self.node_id == other.node_id +// } +// } -impl Eq for NodeRef {} +// impl Eq for NodeRef {} impl fmt::Display for NodeRef { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -444,6 +460,7 @@ impl fmt::Debug for NodeRef { f.debug_struct("NodeRef") .field("node_id", &self.node_id) .field("filter", &self.filter) + .field("sequencing", &self.sequencing) .finish() } } diff --git a/veilid-core/src/routing_table/route_spec_store.rs b/veilid-core/src/routing_table/route_spec_store.rs index fd1d80ab..9d6e6485 100644 --- a/veilid-core/src/routing_table/route_spec_store.rs +++ b/veilid-core/src/routing_table/route_spec_store.rs @@ -9,8 +9,10 @@ pub struct SafetySpec { pub preferred_route: Option, /// 0 = no safety route, just use node's node id, more hops is safer but slower pub hop_count: usize, - /// prefer more reliable protocols and relays over faster ones - pub reliable: bool, + /// prefer reliability over speed + pub stability: Stability, + /// prefer connection-oriented sequenced protocols + pub sequencing: Sequencing, } /// Compiled route (safety route + private route) @@ -59,8 +61,10 @@ struct RouteSpecDetail { last_used_ts: Option, /// Directions this route is guaranteed to work in directions: DirectionSet, - /// Reliability - reliable: bool, + /// Stability preference (prefer reliable nodes over faster) + stability: Stability, + /// Sequencing preference (connection oriented protocols vs datagram) + sequencing: Sequencing, } /// The core representation of the RouteSpecStore that can be serialized @@ -81,6 +85,7 @@ pub struct RouteSpecStoreCache { hop_cache: HashSet>, } +/// The routing table's storage for private/safety routes #[derive(Debug)] pub struct RouteSpecStore { /// Maximum number of hops in a route @@ -93,6 +98,15 @@ pub struct RouteSpecStore { cache: RouteSpecStoreCache, } +/// The choice of safety route including in compiled routes +#[derive(Debug, Clone)] +pub enum SafetySelection { + /// Don't use a safety route, only specify the sequencing preference + Unsafe(Sequencing), + /// Use a safety route and parameters specified by a SafetySpec + Safe(SafetySpec), +} + fn route_hops_to_hop_cache(hops: &[DHTKey]) -> Vec { let mut cache: Vec = Vec::with_capacity(hops.len() * DHT_KEY_LENGTH); for hop in hops { @@ -293,7 +307,8 @@ impl RouteSpecStore { &mut self, rti: &RoutingTableInner, routing_table: RoutingTable, - reliable: bool, + stability: Stability, + sequencing: Sequencing, hop_count: usize, directions: DirectionSet, ) -> EyreResult> { @@ -327,7 +342,7 @@ impl RouteSpecStore { // Exclude nodes with no publicinternet nodeinfo, or incompatible nodeinfo or node status won't route v.with(rti, |_rti, e| { let node_info_ok = if let Some(ni) = e.node_info(RoutingDomain::PublicInternet) { - ni.has_any_dial_info() + ni.has_sequencing_matched_dial_info(sequencing) } else { false }; @@ -383,13 +398,16 @@ impl RouteSpecStore { // always prioritize reliable nodes, but sort by oldest or fastest let cmpout = v1.1.as_ref().unwrap().with(rti, |rti, e1| { - v2.1.as_ref().unwrap().with(rti, |_rti, e2| { - if reliable { - BucketEntryInner::cmp_oldest_reliable(cur_ts, e1, e2) - } else { - BucketEntryInner::cmp_fastest_reliable(cur_ts, e1, e2) - } - }) + v2.1.as_ref() + .unwrap() + .with(rti, |_rti, e2| match stability { + Stability::LowLatency => { + BucketEntryInner::cmp_fastest_reliable(cur_ts, e1, e2) + } + Stability::Reliable => { + BucketEntryInner::cmp_oldest_reliable(cur_ts, e1, e2) + } + }) }); cmpout }; @@ -448,7 +466,7 @@ impl RouteSpecStore { ¤t_node.0, ¤t_node.1, DialInfoFilter::all(), - reliable, + sequencing, ); if matches!(cm, ContactMethod::Unreachable) { reachable = false; @@ -474,7 +492,7 @@ impl RouteSpecStore { ¤t_node.0, ¤t_node.1, DialInfoFilter::all(), - reliable, + sequencing, ); if matches!(cm, ContactMethod::Unreachable) { reachable = false; @@ -525,7 +543,8 @@ impl RouteSpecStore { last_checked_ts: None, last_used_ts: None, directions, - reliable, + stability, + sequencing, }; // Add to cache @@ -575,15 +594,18 @@ impl RouteSpecStore { } } + /// Find first matching unpublished route that fits into the selection criteria pub fn first_unpublished_route( &mut self, - reliable: bool, min_hop_count: usize, max_hop_count: usize, + stability: Stability, + sequencing: Sequencing, directions: DirectionSet, ) -> Option { for detail in &self.content.details { - if detail.1.reliable == reliable + if detail.1.stability >= stability + && detail.1.sequencing >= sequencing && detail.1.hops.len() >= min_hop_count && detail.1.hops.len() <= max_hop_count && detail.1.directions.is_subset(directions) @@ -602,46 +624,54 @@ impl RouteSpecStore { /// Returns Ok(None) if no allocation could happen at this time (not an error) pub fn compile_safety_route( &mut self, - rti: &RoutingTableInner, + rti: &mut RoutingTableInner, routing_table: RoutingTable, - safety_spec: Option, + safety_selection: SafetySelection, private_route: PrivateRoute, ) -> Result, RPCError> { let pr_hopcount = private_route.hop_count as usize; - if pr_hopcount > self.max_route_hop_count { + let max_route_hop_count = self.max_route_hop_count; + if pr_hopcount > max_route_hop_count { return Err(RPCError::internal("private route hop count too long")); } // See if we are using a safety route, if not, short circuit this operation - if safety_spec.is_none() { - // Safety route stub with the node's public key as the safety route key since it's the 0th hop - if private_route.first_hop.is_none() { - return Err(RPCError::internal("can't compile zero length route")); - } - let first_hop = private_route.first_hop.as_ref().unwrap(); - let opt_first_hop_noderef = match &first_hop.node { - RouteNode::NodeId(id) => rti.lookup_node_ref(routing_table, id.key), - RouteNode::PeerInfo(pi) => rti.register_node_with_signed_node_info( - routing_table.clone(), - RoutingDomain::PublicInternet, - pi.node_id.key, - pi.signed_node_info, - false, - ), - }; - if opt_first_hop_noderef.is_none() { - // Can't reach this private route any more - log_rtab!(debug "can't reach private route any more"); - return Ok(None); - } + let safety_spec = match safety_selection { + SafetySelection::Unsafe(sequencing) => { + // Safety route stub with the node's public key as the safety route key since it's the 0th hop + if private_route.first_hop.is_none() { + return Err(RPCError::internal("can't compile zero length route")); + } + let first_hop = private_route.first_hop.as_ref().unwrap(); + let opt_first_hop = match &first_hop.node { + RouteNode::NodeId(id) => rti.lookup_node_ref(routing_table.clone(), id.key), + RouteNode::PeerInfo(pi) => rti.register_node_with_signed_node_info( + routing_table.clone(), + RoutingDomain::PublicInternet, + pi.node_id.key, + pi.signed_node_info.clone(), + false, + ), + }; + if opt_first_hop.is_none() { + // Can't reach this private route any more + log_rtab!(debug "can't reach private route any more"); + return Ok(None); + } + let mut first_hop = opt_first_hop.unwrap(); - return Ok(Some(CompiledRoute { - safety_route: SafetyRoute::new_stub(routing_table.node_id(), private_route), - secret: routing_table.node_id_secret(), - first_hop: opt_first_hop_noderef.unwrap(), - })); - } - let safety_spec = safety_spec.unwrap(); + // Set sequencing requirement + first_hop.set_sequencing(sequencing); + + // Return the compiled safety route + return Ok(Some(CompiledRoute { + safety_route: SafetyRoute::new_stub(routing_table.node_id(), private_route), + secret: routing_table.node_id_secret(), + first_hop, + })); + } + SafetySelection::Safe(safety_spec) => safety_spec, + }; // See if the preferred route is here let opt_safety_rsd: Option<(&mut RouteSpecDetail, DHTKey)> = @@ -658,9 +688,10 @@ impl RouteSpecStore { } else { // Select a safety route from the pool or make one if we don't have one that matches if let Some(sr_pubkey) = self.first_unpublished_route( - safety_spec.reliable, safety_spec.hop_count, safety_spec.hop_count, + safety_spec.stability, + safety_spec.sequencing, Direction::Outbound.into(), ) { // Found a route to use @@ -671,7 +702,8 @@ impl RouteSpecStore { .allocate_route( rti, routing_table.clone(), - safety_spec.reliable, + safety_spec.stability, + safety_spec.sequencing, safety_spec.hop_count, Direction::Outbound.into(), ) @@ -689,7 +721,7 @@ impl RouteSpecStore { if sr_hopcount == 0 { return Err(RPCError::internal("safety route hop count is zero")); } - if sr_hopcount > self.max_route_hop_count { + if sr_hopcount > max_route_hop_count { return Err(RPCError::internal("safety route hop count too long")); } @@ -747,7 +779,7 @@ impl RouteSpecStore { let node_id = safety_rsd.hops[h]; let pi = rti .with_node_entry(node_id, |entry| { - entry.with(rti, |rti, e| { + entry.with(rti, |_rti, e| { e.make_peer_info(node_id, RoutingDomain::PublicInternet) }) }) @@ -800,14 +832,21 @@ impl RouteSpecStore { hops, }; + let mut first_hop = safety_rsd.hop_node_refs.first().unwrap().clone(); + + // Ensure sequencing requirement is set on first hop + first_hop.set_sequencing(safety_spec.sequencing); + + // Build compiled route let compiled_route = CompiledRoute { safety_route, secret: safety_rsd.secret_key, - first_hop: safety_rsd.hop_node_refs.first().unwrap().clone(), + first_hop, }; // xxx: add cache here + // Return compiled route Ok(Some(compiled_route)) } diff --git a/veilid-core/src/routing_table/routing_domains.rs b/veilid-core/src/routing_table/routing_domains.rs index 6475cd01..0e164b34 100644 --- a/veilid-core/src/routing_table/routing_domains.rs +++ b/veilid-core/src/routing_table/routing_domains.rs @@ -179,7 +179,7 @@ pub trait RoutingDomainDetail { node_b_id: &DHTKey, node_b: &NodeInfo, dial_info_filter: DialInfoFilter, - reliable: bool, + sequencing: Sequencing, ) -> ContactMethod; } @@ -204,7 +204,7 @@ fn first_filtered_dial_info_detail( from_node: &NodeInfo, to_node: &NodeInfo, dial_info_filter: &DialInfoFilter, - reliable: bool, + reliable: bool, xxx continue here ) -> Option { let direct_dial_info_filter = dial_info_filter.clone().filtered( &DialInfoFilter::all() @@ -214,7 +214,7 @@ fn first_filtered_dial_info_detail( // Get first filtered dialinfo let sort = if reliable { - Some(DialInfoDetail::reliable_sort) + Some(DialInfoDetail::ordered_sequencing_sort) } else { None }; @@ -428,7 +428,7 @@ impl RoutingDomainDetail for LocalNetworkRoutingDomainDetail { // Get first filtered dialinfo let sort = if reliable { - Some(DialInfoDetail::reliable_sort) + Some(DialInfoDetail::ordered_sequencing_sort) } else { None }; diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index aded6846..10918799 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -224,7 +224,7 @@ impl RoutingTableInner { node_b_id: &DHTKey, node_b: &NodeInfo, dial_info_filter: DialInfoFilter, - reliable: bool, + sequencing: Sequencing, ) -> ContactMethod { self.with_routing_domain(routing_domain, |rdd| { rdd.get_contact_method( @@ -234,7 +234,7 @@ impl RoutingTableInner { node_b_id, node_b, dial_info_filter, - reliable, + sequencing, ) }) } diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 499fc543..709ab9b1 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -402,6 +402,7 @@ impl RPCProcessor { &self, safety_spec: Option, private_route: PrivateRoute, + reliable: bool, message_data: Vec, ) -> Result, RPCError> { let routing_table = self.routing_table(); @@ -411,7 +412,7 @@ impl RPCProcessor { let compiled_route: CompiledRoute = match self.routing_table().with_route_spec_store_mut(|rss, rti| { // Compile the safety route with the private route - rss.compile_safety_route(rti, routing_table, safety_spec, private_route) + rss.compile_safety_route(rti, routing_table, safety_spec, private_route, reliable) })? { Some(cr) => cr, None => { @@ -455,6 +456,7 @@ impl RPCProcessor { let out_node_id = compiled_route.first_hop.node_id(); let out_hop_count = (1 + sr_hop_count + pr_hop_count) as usize; + let out = RenderedOperation { message: out_message, node_id: out_node_id, @@ -537,12 +539,12 @@ impl RPCProcessor { Destination::PrivateRoute { private_route, safety_spec, - reliable, + reliable, xxxx does this need to be here? what about None safety spec, reliable is in there, does it need to not be? or something? } => { // Send to private route // --------------------- // Reply with 'route' operation - out = self.wrap_with_route(safety_spec, private_route, message)?; + out = self.wrap_with_route(safety_spec, private_route, reliable, message)?; } } diff --git a/veilid-core/src/veilid_api/mod.rs b/veilid-core/src/veilid_api/mod.rs index 92f6fede..8f2ac241 100644 --- a/veilid-core/src/veilid_api/mod.rs +++ b/veilid-core/src/veilid_api/mod.rs @@ -405,6 +405,21 @@ impl DialInfoClass { } } +// Ordering here matters, >= is used to check strength of sequencing requirement +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub enum Sequencing { + NoPreference, + PreferOrdered, + EnsureOrdered, +} + +// Ordering here matters, >= is used to check strength of stability requirement +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub enum Stability { + LowLatency, + Reliable, +} + // Keep member order appropriate for sorting < preference #[derive(Debug, Clone, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize, Hash)] pub struct DialInfoDetail { @@ -419,14 +434,14 @@ impl MatchesDialInfoFilter for DialInfoDetail { } impl DialInfoDetail { - pub fn reliable_sort(a: &DialInfoDetail, b: &DialInfoDetail) -> core::cmp::Ordering { + pub fn ordered_sequencing_sort(a: &DialInfoDetail, b: &DialInfoDetail) -> core::cmp::Ordering { if a.class < b.class { return core::cmp::Ordering::Less; } if a.class > b.class { return core::cmp::Ordering::Greater; } - DialInfo::reliable_sort(&a.dial_info, &b.dial_info) + DialInfo::ordered_sequencing_sort(&a.dial_info, &b.dial_info) } pub const NO_SORT: std::option::Option< for<'r, 's> fn( @@ -592,6 +607,39 @@ impl NodeInfo { .unwrap_or_default() } + pub fn has_sequencing_matched_dial_info(&self, sequencing: Sequencing) -> bool { + // Check our dial info + for did in &self.dial_info_detail_list { + match sequencing { + Sequencing::NoPreference | Sequencing::PreferOrdered => return true, + Sequencing::EnsureOrdered => { + if did.dial_info.protocol_type().is_connection_oriented() { + return true; + } + } + } + } + // Check our relay if we have one + return self + .relay_peer_info + .as_ref() + .map(|rpi| { + let relay_ni = &rpi.signed_node_info.node_info; + for did in relay_ni.dial_info_detail_list { + match sequencing { + Sequencing::NoPreference | Sequencing::PreferOrdered => return true, + Sequencing::EnsureOrdered => { + if did.dial_info.protocol_type().is_connection_oriented() { + return true; + } + } + } + } + false + }) + .unwrap_or_default(); + } + pub fn has_direct_dial_info(&self) -> bool { !self.dial_info_detail_list.is_empty() } @@ -693,31 +741,31 @@ impl ProtocolType { ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS => LowLevelProtocolType::TCP, } } - pub fn sort_order(&self, reliable: bool) -> usize { + pub fn sort_order(&self, sequencing: Sequencing) -> usize { match self { ProtocolType::UDP => { - if reliable { + if sequencing != Sequencing::NoPreference { 3 } else { 0 } } ProtocolType::TCP => { - if reliable { + if sequencing != Sequencing::NoPreference { 0 } else { 1 } } ProtocolType::WS => { - if reliable { + if sequencing != Sequencing::NoPreference { 1 } else { 2 } } ProtocolType::WSS => { - if reliable { + if sequencing != Sequencing::NoPreference { 2 } else { 3 @@ -725,6 +773,9 @@ impl ProtocolType { } } } + pub fn all_ordered_set() -> ProtocolTypeSet { + ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS + } } pub type ProtocolTypeSet = EnumSet; @@ -1499,7 +1550,7 @@ impl DialInfo { } } - pub fn reliable_sort(a: &DialInfo, b: &DialInfo) -> core::cmp::Ordering { + pub fn ordered_sequencing_sort(a: &DialInfo, b: &DialInfo) -> core::cmp::Ordering { let ca = a.protocol_type().sort_order(true); let cb = b.protocol_type().sort_order(true); if ca < cb {