refactor checkpoint

This commit is contained in:
John Smith 2022-04-20 20:49:16 -04:00
parent 0440391189
commit 5b0ade9f49
6 changed files with 168 additions and 132 deletions

2
Cargo.lock generated
View File

@ -1366,6 +1366,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6216d2c19a6fb5f29d1ada1dc7bc4367a8cbf0fa4af5cf12e07b5bbdde6b5b2c"
dependencies = [
"enumset_derive",
"serde 1.0.136",
]
[[package]]
@ -4196,6 +4197,7 @@ dependencies = [
"digest 0.9.0",
"directories",
"ed25519-dalek",
"enumset",
"flume",
"futures-util",
"generic-array 0.14.5",

View File

@ -35,6 +35,7 @@ directories = "^4"
once_cell = "^1"
json = "^0"
flume = { version = "^0", features = ["async"] }
enumset = { version= "^1", features = ["serde"] }
ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] }
x25519-dalek = { package = "x25519-dalek-ng", version = "^1", default_features = false, features = ["u64_backend"] }

View File

@ -70,12 +70,18 @@ struct ClientWhitelistEntry {
// Mechanism required to contact another node
enum ContactMethod {
Unreachable, // Node is not reachable by any means
Direct(DialInfo), // Contact the node directly
SignalReverse(NodeRef), // Request via signal the node connect back directly
SignalHolePunch(NodeRef), // Request via signal the node negotiate a hole punch
InboundRelay(NodeRef), // Must use an inbound relay to reach the node
OutboundRelay(NodeRef), // Must use outbound relay to reach the node
Unreachable, // Node is not reachable by any means
Direct(DialInfo), // Contact the node directly
SignalReverse(NodeRef, NodeRef), // Request via signal the node connect back directly
SignalHolePunch(NodeRef, NodeRef), // Request via signal the node negotiate a hole punch
InboundRelay(NodeRef), // Must use an inbound relay to reach the node
OutboundRelay(NodeRef), // Must use outbound relay to reach the node
}
#[derive(Copy, Clone, Debug)]
pub enum SendDataKind {
Direct,
Indirect,
}
// The mutable state of the network manager
@ -488,18 +494,14 @@ impl NetworkManager {
let routing_table = self.routing_table();
// Add the peer info to our routing table
let peer_nr = routing_table
let mut peer_nr = routing_table
.register_node_with_node_info(peer_info.node_id.key, peer_info.node_info)?;
// Get the udp direct dialinfo for the hole punch
let hole_punch_dial_info = if let Some(hpdi) = peer_nr
.node_info()
.first_filtered_dial_info(|di| matches!(di.protocol_type(), ProtocolType::UDP))
{
hpdi
} else {
return Err("No hole punch capable dialinfo found for node".to_owned());
};
peer_nr.filter_protocols(ProtocolSet::only(ProtocolType::UDP));
let hole_punch_dial_info = peer_nr
.first_filtered_dial_info()
.ok_or_else(|| "No hole punch capable dialinfo found for node".to_owned())?;
// Do our half of the hole punch by sending an empty packet
// Both sides will do this and then the receipt will get sent over the punched hole
@ -609,60 +611,65 @@ impl NetworkManager {
}
// Figure out how to reach a node
fn get_contact_method(&self, node_ref: &NodeRef) -> Result<ContactMethod, String> {
fn get_contact_method(&self, node_ref: NodeRef) -> Result<ContactMethod, String> {
// Get our network class and protocol config
let our_network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid);
let our_protocol_config = self.get_protocol_config().unwrap();
// See if this is a local node reachable directly
let local_node_info = node_ref.local_node_info();
if let Some(local_direct_dial_info) = local_node_info
.first_filtered_dial_info(|di| our_protocol_config.outbound.filter_dial_info(di))
{
return Ok(ContactMethod::Direct(local_direct_dial_info));
// Scope noderef down to protocols we can do outbound
if !node_ref.filter_protocols(our_protocol_config.outbound) {
return Ok(ContactMethod::Unreachable);
}
// Get the best matching direct dial info if we have it
let target_node_info = node_ref.node_info();
let opt_direct_dial_info = target_node_info
.first_filtered_dial_info(|di| our_protocol_config.outbound.filter_dial_info(di));
let opt_direct_dial_info = node_ref.first_filtered_dial_info();
// See if this is a local node reachable directly
if let Some(direct_dial_info) = opt_direct_dial_info {
if direct_dial_info.is_local() {
return Ok(ContactMethod::Direct(direct_dial_info));
}
}
// Can the target node do inbound?
if target_node_info.network_class.inbound_capable() {
let target_network_class = node_ref.network_class();
if target_network_class.inbound_capable() {
// Do we need to signal before going inbound?
if target_node_info.network_class.inbound_requires_signal() {
if target_network_class.inbound_requires_signal() {
// Get the target's inbound relay, it must have one or it is not reachable
if let Some(target_rpi) = target_node_info.relay_peer_info {
if let Some(inbound_relay_nr) = node_ref.relay() {
// Can we reach the inbound relay?
if target_rpi
.node_info
.first_filtered_dial_info(|di| {
our_protocol_config.outbound.filter_dial_info(di)
})
.is_some()
{
let target_inbound_relay_nr =
self.routing_table().register_node_with_node_info(
target_rpi.node_id.key,
target_rpi.node_info,
)?;
if inbound_relay_nr.first_filtered_dial_info().is_some() {
// Can we receive anything inbound ever?
if our_network_class.inbound_capable() {
// Can we receive a direct reverse connection?
if !our_network_class.inbound_requires_signal() {
return Ok(ContactMethod::SignalReverse(target_inbound_relay_nr));
return Ok(ContactMethod::SignalReverse(
inbound_relay_nr,
node_ref,
));
}
// Can we hole-punch?
else if our_protocol_config.inbound.udp
&& target_node_info.outbound_protocols.udp
else if our_protocol_config.inbound.contains(ProtocolType::UDP)
&& node_ref.outbound_protocols().contains(ProtocolType::UDP)
{
return Ok(ContactMethod::SignalHolePunch(target_inbound_relay_nr));
let udp_inbound_relay_nr = inbound_relay_nr.clone();
let udp_target_nr = node_ref.clone();
let can_reach_inbound_relay = udp_inbound_relay_nr
.filter_protocols(ProtocolSet::only(ProtocolType::UDP));
let can_reach_target = udp_target_nr
.filter_protocols(ProtocolSet::only(ProtocolType::UDP));
if can_reach_inbound_relay && can_reach_target {
return Ok(ContactMethod::SignalHolePunch(
udp_inbound_relay_nr,
udp_target_nr,
));
}
}
// Otherwise we have to inbound relay
}
return Ok(ContactMethod::InboundRelay(target_inbound_relay_nr));
return Ok(ContactMethod::InboundRelay(inbound_relay_nr));
}
}
}
@ -675,20 +682,9 @@ impl NetworkManager {
}
} else {
// If the other node is not inbound capable at all, it is using a full relay
if let Some(target_rpi) = target_node_info.relay_peer_info {
if let Some(target_inbound_relay_nr) = node_ref.relay() {
// Can we reach the full relay?
if target_rpi
.node_info
.first_filtered_dial_info(|di| {
our_protocol_config.outbound.filter_dial_info(di)
})
.is_some()
{
let target_inbound_relay_nr =
self.routing_table().register_node_with_node_info(
target_rpi.node_id.key,
target_rpi.node_info,
)?;
if target_inbound_relay_nr.first_filtered_dial_info().is_some() {
return Ok(ContactMethod::InboundRelay(target_inbound_relay_nr));
}
}
@ -777,6 +773,16 @@ impl NetworkManager {
target_nr: NodeRef,
data: Vec<u8>,
) -> Result<(), String> {
// Ensure we are filtered down to UDP (the only hole punch protocol supported today)
assert!(relay_nr
.filter_ref()
.map(|dif| dif.protocol_set == ProtocolSet::only(ProtocolType::UDP))
.unwrap_or_default());
assert!(target_nr
.filter_ref()
.map(|dif| dif.protocol_set == ProtocolSet::only(ProtocolType::UDP))
.unwrap_or_default());
// Build a return receipt for the signal
let receipt_timeout =
ms_to_us(self.config.get().network.reverse_connection_receipt_time_ms);
@ -788,14 +794,9 @@ impl NetworkManager {
let peer_info = self.routing_table().get_own_peer_info();
// Get the udp direct dialinfo for the hole punch
let hole_punch_dial_info = if let Some(hpdi) = target_nr
.node_info()
.first_filtered_dial_info(|di| matches!(di.protocol_type(), ProtocolType::UDP))
{
hpdi
} else {
return Err("No hole punch capable dialinfo found for node".to_owned());
};
let hole_punch_dial_info = target_nr
.first_filtered_dial_info()
.ok_or_else(|| "No hole punch capable dialinfo found for node".to_owned())?;
// Do our half of the hole punch by sending an empty packet
// Both sides will do this and then the receipt will get sent over the punched hole
@ -887,18 +888,19 @@ impl NetworkManager {
};
// If we don't have last_connection, try to reach out to the peer via its dial info
match this.get_contact_method(&node_ref).map_err(logthru_net!())? {
match this.get_contact_method(node_ref).map_err(logthru_net!())? {
ContactMethod::OutboundRelay(relay_nr) | ContactMethod::InboundRelay(relay_nr) => {
this.send_data(relay_nr, data).await
}
ContactMethod::Direct(dial_info) => {
this.net().send_data_to_dial_info(dial_info, data).await
}
ContactMethod::SignalReverse(relay_nr) => {
this.do_reverse_connect(relay_nr, node_ref, data).await
ContactMethod::SignalReverse(relay_nr, target_node_ref) => {
this.do_reverse_connect(relay_nr, target_node_ref, data)
.await
}
ContactMethod::SignalHolePunch(relay_nr) => {
this.do_hole_punch(relay_nr, node_ref, data).await
ContactMethod::SignalHolePunch(relay_nr, target_node_ref) => {
this.do_hole_punch(relay_nr, target_node_ref, data).await
}
ContactMethod::Unreachable => Err("Can't send to this relay".to_owned()),
}

View File

@ -31,6 +31,30 @@ impl NodeRef {
self.node_id
}
pub fn filter_ref(&self) -> Option<&DialInfoFilter> {
self.filter.as_ref()
}
pub fn take_filter(&mut self) -> Option<DialInfoFilter> {
self.filter.take()
}
pub fn set_filter(&mut self, filter: Option<DialInfoFilter>) {
self.filter = filter
}
// Returns true if some protocols can still pass the filter and false if no protocols remain
pub fn filter_protocols(&mut self, protocol_set: ProtocolSet) -> bool {
if protocol_set != ProtocolSet::all() {
let mut dif = self.filter.unwrap_or_default();
dif.protocol_set &= protocol_set;
self.filter = Some(dif);
}
self.filter
.map(|f| !f.protocol_set.is_empty())
.unwrap_or(true)
}
pub fn operate<T, F>(&self, f: F) -> T
where
F: FnOnce(&mut BucketEntry) -> T,
@ -48,13 +72,31 @@ impl NodeRef {
self.operate(|e| e.set_seen_our_node_info(true));
}
pub fn network_class(&self) -> NetworkClass {
self.operate(|e| e.node_info().network_class)
}
pub fn outbound_protocols(&self) -> ProtocolSet {
self.operate(|e| e.node_info().outbound_protocols)
}
pub fn relay(&self) -> Option<NodeRef> {
let target_rpi = self.operate(|e| e.node_info().relay_peer_info)?;
self.routing_table
.register_node_with_node_info(target_rpi.node_id.key, target_rpi.node_info)
.map_err(logthru_rtab!(error))
.ok()
.map(|nr| {
nr.set_filter(self.filter_ref().cloned());
nr
})
}
pub fn first_filtered_dial_info(&self) -> Option<DialInfo> {
self.operate(|e| {
if matches!(
self.filter.map(|f| f.peer_scope).unwrap_or(PeerScope::All),
PeerScope::All | PeerScope::Global
PeerScope::All | PeerScope::Local
) {
e.node_info().first_filtered_dial_info(|di| {
e.local_node_info().first_filtered_dial_info(|di| {
if let Some(filter) = self.filter {
di.matches_filter(&filter)
} else {
@ -67,9 +109,9 @@ impl NodeRef {
.or_else(|| {
if matches!(
self.filter.map(|f| f.peer_scope).unwrap_or(PeerScope::All),
PeerScope::All | PeerScope::Local
PeerScope::All | PeerScope::Global
) {
e.local_node_info().first_filtered_dial_info(|di| {
e.node_info().first_filtered_dial_info(|di| {
if let Some(filter) = self.filter {
di.matches_filter(&filter)
} else {

View File

@ -5,10 +5,10 @@ pub fn encode_protocol_set(
protocol_set: &ProtocolSet,
builder: &mut veilid_capnp::protocol_set::Builder,
) -> Result<(), RPCError> {
builder.set_udp(protocol_set.udp);
builder.set_tcp(protocol_set.tcp);
builder.set_ws(protocol_set.ws);
builder.set_wss(protocol_set.wss);
builder.set_udp(protocol_set.contains(ProtocolType::UDP));
builder.set_tcp(protocol_set.contains(ProtocolType::TCP));
builder.set_ws(protocol_set.contains(ProtocolType::WS));
builder.set_wss(protocol_set.contains(ProtocolType::WSS));
Ok(())
}
@ -16,10 +16,18 @@ pub fn encode_protocol_set(
pub fn decode_protocol_set(
reader: &veilid_capnp::protocol_set::Reader,
) -> Result<ProtocolSet, RPCError> {
Ok(ProtocolSet {
udp: reader.reborrow().get_udp(),
tcp: reader.reborrow().get_tcp(),
ws: reader.reborrow().get_ws(),
wss: reader.reborrow().get_wss(),
})
let mut out = ProtocolSet::new();
if reader.reborrow().get_udp() {
out.insert(ProtocolType::UDP);
}
if reader.reborrow().get_tcp() {
out.insert(ProtocolType::TCP);
}
if reader.reborrow().get_ws() {
out.insert(ProtocolType::WS);
}
if reader.reborrow().get_wss() {
out.insert(ProtocolType::WSS);
}
Ok(out)
}

View File

@ -23,6 +23,7 @@ pub use rpc_processor::InfoAnswer;
use core::fmt;
use core_context::{api_shutdown, VeilidCoreContext};
use enumset::*;
use rpc_processor::{RPCError, RPCProcessor};
use serde::*;
use xx::*;
@ -375,7 +376,6 @@ impl NodeInfo {
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
pub struct LocalNodeInfo {
pub outbound_protocols: ProtocolSet,
pub dial_info_list: Vec<DialInfo>,
}
@ -411,7 +411,7 @@ impl LocalNodeInfo {
}
}
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
#[derive(Debug, PartialOrd, Ord, Hash, Serialize, Deserialize, EnumSetType)]
// The derived ordering here is the order of preference, lower is preferred for connections
// Must match DialInfo order
pub enum ProtocolType {
@ -430,27 +430,7 @@ impl ProtocolType {
}
}
#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)]
pub struct ProtocolSet {
pub udp: bool,
pub tcp: bool,
pub ws: bool,
pub wss: bool,
}
impl ProtocolSet {
pub fn contains(&self, protocol_type: ProtocolType) -> bool {
match protocol_type {
ProtocolType::UDP => self.udp,
ProtocolType::TCP => self.tcp,
ProtocolType::WS => self.ws,
ProtocolType::WSS => self.wss,
}
}
pub fn filter_dial_info(&self, di: &DialInfo) -> bool {
self.contains(di.protocol_type())
}
}
pub type ProtocolSet = EnumSet<ProtocolType>;
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize)]
pub enum AddressType {
@ -598,63 +578,68 @@ impl FromStr for SocketAddress {
//////////////////////////////////////////////////////////////////
#[derive(Clone, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct DialInfoFilter {
pub peer_scope: PeerScope,
pub protocol_type: Option<ProtocolType>,
pub protocol_set: ProtocolSet,
pub address_type: Option<AddressType>,
}
impl Default for DialInfoFilter {
fn default() -> Self {
Self {
peer_scope: PeerScope::All,
protocol_set: ProtocolSet::all(),
address_type: None,
}
}
}
impl DialInfoFilter {
pub fn all() -> Self {
Self {
peer_scope: PeerScope::All,
protocol_type: None,
protocol_set: ProtocolSet::all(),
address_type: None,
}
}
pub fn global() -> Self {
Self {
peer_scope: PeerScope::Global,
protocol_type: None,
protocol_set: ProtocolSet::all(),
address_type: None,
}
}
pub fn local() -> Self {
Self {
peer_scope: PeerScope::Local,
protocol_type: None,
protocol_set: ProtocolSet::all(),
address_type: None,
}
}
pub fn scoped(peer_scope: PeerScope) -> Self {
Self {
peer_scope,
protocol_type: None,
protocol_set: ProtocolSet::all(),
address_type: None,
}
}
pub fn with_protocol_type(mut self, protocol_type: ProtocolType) -> Self {
self.protocol_type = Some(protocol_type);
self.protocol_set = ProtocolSet::only(protocol_type);
self
}
pub fn with_address_type(mut self, address_type: AddressType) -> Self {
self.address_type = Some(address_type);
self
}
pub fn is_empty(&self) -> bool {
self.peer_scope == PeerScope::All
&& self.protocol_type.is_none()
&& self.address_type.is_none()
}
}
impl fmt::Debug for DialInfoFilter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
let mut out = String::new();
out += &format!("{:?}", self.peer_scope);
if let Some(pt) = self.protocol_type {
out += &format!("+{:?}", pt);
if self.protocol_set != ProtocolSet::all() {
out += &format!("+{:?}", self.protocol_set);
}
if let Some(at) = self.address_type {
out += &format!("+{:?}", at);
@ -919,7 +904,7 @@ impl DialInfo {
} else {
PeerScope::All
},
protocol_type: Some(self.protocol_type()),
protocol_set: ProtocolSet::only(self.protocol_type()),
address_type: Some(self.address_type()),
}
}
@ -930,10 +915,8 @@ impl MatchesDialInfoFilter for DialInfo {
if !self.matches_peer_scope(filter.peer_scope) {
return false;
}
if let Some(pt) = filter.protocol_type {
if self.protocol_type() != pt {
return false;
}
if !filter.protocol_set.contains(self.protocol_type()) {
return false;
}
if let Some(at) = filter.address_type {
if self.address_type() != at {
@ -1026,10 +1009,8 @@ impl MatchesDialInfoFilter for ConnectionDescriptor {
if !self.matches_peer_scope(filter.peer_scope) {
return false;
}
if let Some(pt) = filter.protocol_type {
if self.protocol_type() != pt {
return false;
}
if filter.protocol_set.contains(self.protocol_type()) {
return false;
}
if let Some(at) = filter.address_type {
if self.address_type() != at {