This commit is contained in:
John Smith 2022-03-26 21:25:24 -04:00
parent cc715dfc96
commit 53cd521ba8
11 changed files with 232 additions and 101 deletions

12
Cargo.lock generated
View File

@ -269,6 +269,17 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "async-recursion"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "async-std" name = "async-std"
version = "1.10.0" version = "1.10.0"
@ -4176,6 +4187,7 @@ dependencies = [
"android_logger", "android_logger",
"anyhow", "anyhow",
"async-lock", "async-lock",
"async-recursion",
"async-std", "async-std",
"async-tls", "async-tls",
"async-tungstenite 0.17.1", "async-tungstenite 0.17.1",

@ -1 +1 @@
Subproject commit f73c27e66e43763f0f63ca9e697e77419f157a52 Subproject commit d475bd558872b6aa6c1b642899b7957e11734cdc

View File

@ -34,6 +34,7 @@ directories = "^4"
once_cell = "^1" once_cell = "^1"
json = "^0" json = "^0"
flume = { version = "^0", features = ["async"] } flume = { version = "^0", features = ["async"] }
async-recursion = "^1"
ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] } ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] }
x25519-dalek = { package = "x25519-dalek-ng", version = "^1", default_features = false, features = ["u64_backend"] } x25519-dalek = { package = "x25519-dalek-ng", version = "^1", default_features = false, features = ["u64_backend"] }

View File

@ -1,5 +1,6 @@
use crate::*; use crate::*;
use network_manager::*; use network_manager::*;
use routing_table::*;
use xx::*; use xx::*;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
@ -102,9 +103,9 @@ impl LeaseManager {
// Server-side // Server-side
// Signal leases // Signal leases
pub fn server_has_valid_signal_lease(&self, _recipient_id: &DHTKey) -> bool { pub fn server_has_valid_signal_lease(&self, _recipient_id: &DHTKey) -> Option<NodeRef> {
error!("unimplemented"); error!("unimplemented");
false None
} }
pub fn server_can_provide_signal_lease(&self) -> bool { pub fn server_can_provide_signal_lease(&self) -> bool {
let inner = self.inner.lock(); let inner = self.inner.lock();
@ -147,9 +148,9 @@ impl LeaseManager {
} }
// Relay leases // Relay leases
pub fn server_has_valid_relay_lease(&self, _recipient_id: &DHTKey) -> bool { pub fn server_has_valid_relay_lease(&self, _recipient_id: &DHTKey) -> Option<NodeRef> {
error!("unimplemented"); error!("unimplemented");
false None
} }
pub fn server_can_provide_relay_lease(&self) -> bool { pub fn server_can_provide_relay_lease(&self) -> bool {
let inner = self.inner.lock(); let inner = self.inner.lock();

View File

@ -534,19 +534,22 @@ impl NetworkManager {
let recipient_id = envelope.get_recipient_id(); let recipient_id = envelope.get_recipient_id();
if recipient_id != routing_table.node_id() { if recipient_id != routing_table.node_id() {
// Ensure a lease exists for this node before we relay it // Ensure a lease exists for this node before we relay it
if !lease_manager.server_has_valid_relay_lease(&recipient_id) let relay_nr = if let Some(lease_nr) =
&& !lease_manager.server_has_valid_relay_lease(&sender_id) lease_manager.server_has_valid_relay_lease(&recipient_id)
{ {
// Inbound lease
lease_nr
} else if let Some(lease_nr) = lease_manager.server_has_valid_relay_lease(&sender_id) {
// Resolve the node to send this to
rpc.resolve_node(recipient_id, Some(lease_nr.clone())).await.map_err(|e| {
format!(
"failed to resolve recipient node for relay, dropping outbound relayed packet...: {:?}",
e
)
})?
} else {
return Err("received envelope not intended for this node".to_owned()); return Err("received envelope not intended for this node".to_owned());
} };
// Resolve the node to send this to
let relay_nr = rpc.resolve_node(recipient_id).await.map_err(|e| {
format!(
"failed to resolve recipient node for relay, dropping packet...: {:?}",
e
)
})?;
// Re-send the packet to the leased node // Re-send the packet to the leased node
self.net() self.net()

View File

@ -65,6 +65,13 @@ impl BucketEntry {
self.dial_infos.sort(); self.dial_infos.sort();
} }
pub fn update_single_dial_info(&mut self, dial_info: &DialInfo) {
let dif = dial_info.make_filter(true);
self.dial_infos.retain(|di| !di.matches_filter(&dif));
self.dial_infos.push(dial_info.clone());
self.dial_infos.sort();
}
pub fn first_filtered_dial_info<F>(&self, filter: F) -> Option<DialInfo> pub fn first_filtered_dial_info<F>(&self, filter: F) -> Option<DialInfo>
where where
F: Fn(&DialInfo) -> bool, F: Fn(&DialInfo) -> bool,
@ -189,7 +196,7 @@ impl BucketEntry {
state = BucketEntryState::Unreliable; state = BucketEntryState::Unreliable;
} }
match self.state(cur_ts) { match state {
BucketEntryState::Reliable => { BucketEntryState::Reliable => {
// If we are in a reliable state, we need a ping on an exponential scale // If we are in a reliable state, we need a ping on an exponential scale
match self.peer_stats.ping_stats.last_pinged { match self.peer_stats.ping_stats.last_pinged {

View File

@ -263,7 +263,7 @@ impl RoutingTable {
fn trigger_changed_dial_info(inner: &mut RoutingTableInner) { fn trigger_changed_dial_info(inner: &mut RoutingTableInner) {
// Clear 'seen dial info' bits on routing table entries so we know to ping them // Clear 'seen dial info' bits on routing table entries so we know to ping them
for b in inner.buckets { for b in &mut inner.buckets {
for e in b.entries_mut() { for e in b.entries_mut() {
e.1.set_seen_our_dial_info(false); e.1.set_seen_our_dial_info(false);
} }
@ -451,6 +451,21 @@ impl RoutingTable {
Ok(nr) Ok(nr)
} }
// Add a node if it doesn't exist, or update a single dial info on an already registered node
pub fn update_node_with_single_dial_info(
&self,
node_id: DHTKey,
dial_info: &DialInfo,
) -> Result<NodeRef, String> {
let nr = self.create_node_ref(node_id)?;
nr.operate(move |e| -> Result<(), String> {
e.update_single_dial_info(dial_info);
Ok(())
})?;
Ok(nr)
}
fn operate_on_bucket_entry<T, F>(&self, node_id: DHTKey, f: F) -> T fn operate_on_bucket_entry<T, F>(&self, node_id: DHTKey, f: F) -> T
where where
F: FnOnce(&mut BucketEntry) -> T, F: FnOnce(&mut BucketEntry) -> T,
@ -484,8 +499,15 @@ impl RoutingTable {
); );
// register nodes we'd found // register nodes we'd found
let mut out = Vec::<NodeRef>::with_capacity(res.peers.len()); self.register_find_node_answer(res)
for p in res.peers { }
pub fn register_find_node_answer(&self, fna: FindNodeAnswer) -> Result<Vec<NodeRef>, String> {
let node_id = self.node_id();
// register nodes we'd found
let mut out = Vec::<NodeRef>::with_capacity(fna.peers.len());
for p in fna.peers {
// if our own node if is in the list then ignore it, as we don't add ourselves to our own routing table // if our own node if is in the list then ignore it, as we don't add ourselves to our own routing table
if p.node_id.key == node_id { if p.node_id.key == node_id {
continue; continue;

View File

@ -48,11 +48,9 @@ impl NodeRef {
// Returns if this node has seen and acknowledged our node's dial info yet // Returns if this node has seen and acknowledged our node's dial info yet
pub fn has_seen_our_dial_info(&self) -> bool { pub fn has_seen_our_dial_info(&self) -> bool {
let nm = self.routing_table.network_manager();
self.operate(|e| e.has_seen_our_dial_info()) self.operate(|e| e.has_seen_our_dial_info())
} }
pub fn set_seen_our_dial_info(&self) { pub fn set_seen_our_dial_info(&self) {
let nm = self.routing_table.network_manager();
self.operate(|e| e.set_seen_our_dial_info(true)); self.operate(|e| e.set_seen_our_dial_info(true));
} }

View File

@ -201,23 +201,34 @@ impl RPCProcessor {
return format!("(invalid node id: {})", e); return format!("(invalid node id: {})", e);
} }
}; };
let pir = match fnqr.get_peer_info() {
Ok(pir) => pir, let dil_reader = match fnqr.reborrow().get_dial_info_list() {
Ok(dilr) => dilr,
Err(e) => { Err(e) => {
return format!("(invalid peer_info: {})", e); return format!("(invalid dial info list: {})", e);
} }
}; };
let mut dial_infos =
Vec::<DialInfo>::with_capacity(match dil_reader.len().try_into() {
Ok(v) => v,
Err(e) => {
return format!("(too many dial infos: {})", e);
}
});
for di in dil_reader.iter() {
dial_infos.push(match decode_dial_info(&di) {
Ok(v) => v,
Err(e) => {
return format!("(unable to decode dial info: {})", e);
}
});
}
let node_id = decode_public_key(&nidr); let node_id = decode_public_key(&nidr);
let peer_info = match decode_peer_info(&pir) {
Ok(pi) => pi,
Err(e) => {
return e.to_string();
}
};
format!( format!(
"FindNodeQ: node_id={} peer_info={:?}", "FindNodeQ: node_id={} dial_infos={:?}",
node_id.encode(), node_id.encode(),
peer_info dial_infos
) )
} }
veilid_capnp::operation::detail::FindNodeA(_) => { veilid_capnp::operation::detail::FindNodeA(_) => {

View File

@ -45,11 +45,11 @@ impl RespondTo {
builder.set_none(()); builder.set_none(());
} }
Self::Sender(Some(di)) => { Self::Sender(Some(di)) => {
let mut di_builder = builder.init_sender(); let mut di_builder = builder.reborrow().init_sender();
encode_dial_info(di, &mut di_builder)?; encode_dial_info(di, &mut di_builder)?;
} }
Self::Sender(None) => { Self::Sender(None) => {
builder.init_sender(); builder.reborrow().init_sender();
} }
Self::PrivateRoute(pr) => { Self::PrivateRoute(pr) => {
let mut pr_builder = builder.reborrow().init_private_route(); let mut pr_builder = builder.reborrow().init_private_route();
@ -232,7 +232,7 @@ impl RPCProcessor {
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////
// Search the DHT for a single node closest to a key unless we have that node in our routing table already, and return the node reference // Search the DHT for a single node closest to a key and add it to the routing table and return the node reference
pub async fn search_dht_single_key( pub async fn search_dht_single_key(
&self, &self,
node_id: key::DHTKey, node_id: key::DHTKey,
@ -242,15 +242,6 @@ impl RPCProcessor {
) -> Result<NodeRef, RPCError> { ) -> Result<NodeRef, RPCError> {
let routing_table = self.routing_table(); let routing_table = self.routing_table();
// First see if we have the node in our routing table already
if let Some(nr) = routing_table.lookup_node_ref(node_id) {
// ensure we have dial_info for the entry already,
// if not, we should do the find_node anyway
if !nr.operate(|e| e.dial_infos().is_empty()) {
return Ok(nr);
}
}
// xxx find node but stop if we find the exact node we want // xxx find node but stop if we find the exact node we want
// xxx return whatever node is closest after the timeout // xxx return whatever node is closest after the timeout
Err(rpc_error_unimplemented("search_dht_single_key")).map_err(logthru_rpc!(error)) Err(rpc_error_unimplemented("search_dht_single_key")).map_err(logthru_rpc!(error))
@ -269,26 +260,66 @@ impl RPCProcessor {
} }
// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference // Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference
pub async fn resolve_node(&self, node_id: key::DHTKey) -> Result<NodeRef, RPCError> { // Note: This routine can possible be recursive, hence the SystemPinBoxFuture async form
let (count, fanout, timeout) = { pub fn resolve_node(
let c = self.config.get(); &self,
( node_id: key::DHTKey,
c.network.dht.resolve_node_count, lease_holder: Option<NodeRef>,
c.network.dht.resolve_node_fanout, ) -> SystemPinBoxFuture<Result<NodeRef, RPCError>> {
c.network.dht.resolve_node_timeout_ms.map(ms_to_us), let this = self.clone();
) Box::pin(async move {
}; let routing_table = this.routing_table();
let nr = self // First see if we have the node in our routing table already
.search_dht_single_key(node_id, count, fanout, timeout) if let Some(nr) = routing_table.lookup_node_ref(node_id) {
.await?; // ensure we have dial_info for the entry already,
// if not, we should do the find_node anyway
if !nr.operate(|e| e.dial_infos().is_empty()) {
return Ok(nr);
}
}
if nr.node_id() != node_id { // If not, if we are resolving on behalf of a lease holder, ask them for their routing table around the node first
// found a close node, but not exact within our configured resolve_node timeout if let Some(lhnr) = lease_holder {
return Err(RPCError::Timeout).map_err(logthru_rpc!()); let fna = this
} .clone()
.rpc_call_find_node(
Destination::Direct(lhnr.clone()),
node_id,
None,
RespondTo::Sender(None),
)
.await?;
if let Ok(nrefs) = routing_table.register_find_node_answer(fna) {
for nr in nrefs {
if !nr.operate(|e| e.dial_infos().is_empty()) {
return Ok(nr);
}
}
}
}
Ok(nr) // If nobody knows where this node is, ask the DHT for it
let (count, fanout, timeout) = {
let c = this.config.get();
(
c.network.dht.resolve_node_count,
c.network.dht.resolve_node_fanout,
c.network.dht.resolve_node_timeout_ms.map(ms_to_us),
)
};
let nr = this
.search_dht_single_key(node_id, count, fanout, timeout)
.await?;
if nr.node_id() != node_id {
// found a close node, but not exact within our configured resolve_node timeout
return Err(RPCError::Timeout).map_err(logthru_rpc!());
}
Ok(nr)
})
} }
// set up wait for reply // set up wait for reply
@ -510,7 +541,7 @@ impl RPCProcessor {
let node_ref = match out_noderef { let node_ref = match out_noderef {
None => { None => {
// resolve node // resolve node
self.resolve_node(out_node_id) self.resolve_node(out_node_id, None)
.await .await
.map_err(logthru_rpc!(error))? .map_err(logthru_rpc!(error))?
} }
@ -708,7 +739,7 @@ impl RPCProcessor {
let node_ref = match out_noderef { let node_ref = match out_noderef {
None => { None => {
// resolve node // resolve node
self.resolve_node(out_node_id).await? self.resolve_node(out_node_id, None).await?
} }
Some(nr) => { Some(nr) => {
// got the node in the routing table already // got the node in the routing table already
@ -983,22 +1014,32 @@ impl RPCProcessor {
_ => panic!("invalid operation type in process_find_node_q"), _ => panic!("invalid operation type in process_find_node_q"),
}; };
// ensure find_node peerinfo matches the envelope // get the node id we want to look up
let target_node_id = decode_public_key( let target_node_id = decode_public_key(
&fnq_reader &fnq_reader
.get_node_id() .get_node_id()
.map_err(map_error_capnp_error!()) .map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?, .map_err(logthru_rpc!())?,
); );
let peer_info = decode_peer_info(
&fnq_reader // get the peerinfo/dialinfos of the requesting node
.get_peer_info() let dil_reader = fnq_reader
.map_err(map_error_capnp_error!()) .reborrow()
.map_err(logthru_rpc!())?, .get_dial_info_list()
)?; .map_err(map_error_capnp_error!())?;
if peer_info.node_id.key != rpcreader.header.envelope.get_sender_id() { let mut dial_infos = Vec::<DialInfo>::with_capacity(
return Err(RPCError::InvalidFormat); dil_reader
.len()
.try_into()
.map_err(map_error_protocol!("too many dial infos"))?,
);
for di in dil_reader.iter() {
dial_infos.push(decode_dial_info(&di)?)
} }
let peer_info = PeerInfo {
node_id: NodeId::new(rpcreader.header.envelope.get_sender_id()),
dial_infos,
};
// filter out attempts to pass non-public addresses in for peers // filter out attempts to pass non-public addresses in for peers
if !self.filter_peer_scope(&peer_info) { if !self.filter_peer_scope(&peer_info) {
@ -1153,14 +1194,14 @@ impl RPCProcessor {
reader, reader,
}; };
let (which, is_q) = { let which = {
let operation = rpcreader let operation = rpcreader
.reader .reader
.get_root::<veilid_capnp::operation::Reader>() .get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_capnp_error!()) .map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?; .map_err(logthru_rpc!())?;
match operation let (which, is_q) = match operation
.get_detail() .get_detail()
.which() .which()
.map_err(map_error_capnp_notinschema!())? .map_err(map_error_capnp_notinschema!())?
@ -1191,30 +1232,54 @@ impl RPCProcessor {
veilid_capnp::operation::detail::CompleteTunnelA(_) => (23u32, false), veilid_capnp::operation::detail::CompleteTunnelA(_) => (23u32, false),
veilid_capnp::operation::detail::CancelTunnelQ(_) => (24u32, true), veilid_capnp::operation::detail::CancelTunnelQ(_) => (24u32, true),
veilid_capnp::operation::detail::CancelTunnelA(_) => (25u32, false), veilid_capnp::operation::detail::CancelTunnelA(_) => (25u32, false),
} };
};
// Accounting for questions we receive // Accounting for questions we receive
if is_q { if is_q {
// look up sender node, in case it's different than our peer due to relaying // See if we have some Sender DialInfo to incorporate
if let Some(sender_nr) = self let opt_sender_nr =
.routing_table() if let veilid_capnp::operation::respond_to::Sender(Ok(sender_di_reader)) =
.lookup_node_ref(rpcreader.header.envelope.get_sender_id()) operation
{ .get_respond_to()
if which == 0u32 { .which()
self.routing_table().stats_ping_rcvd( .map_err(map_error_capnp_notinschema!())?
sender_nr, {
rpcreader.header.timestamp, // Sender DialInfo was specified, update our routing table with it
rpcreader.header.body_len, let sender_di = decode_dial_info(&sender_di_reader)?;
); let nr = self
} else { .routing_table()
self.routing_table().stats_question_rcvd( .update_node_with_single_dial_info(
sender_nr, rpcreader.header.envelope.get_sender_id(),
rpcreader.header.timestamp, &sender_di,
rpcreader.header.body_len, )
); .map_err(RPCError::Internal)?;
Some(nr)
} else {
self.routing_table()
.lookup_node_ref(rpcreader.header.envelope.get_sender_id())
};
// look up sender node, in case it's different than our peer due to relaying
if let Some(sender_nr) = opt_sender_nr {
if which == 0u32 {
self.routing_table().stats_ping_rcvd(
sender_nr,
rpcreader.header.timestamp,
rpcreader.header.body_len,
);
} else {
self.routing_table().stats_question_rcvd(
sender_nr,
rpcreader.header.timestamp,
rpcreader.header.body_len,
);
}
} }
} };
which
}; };
match which { match which {
0 => self.process_info_q(rpcreader).await, // InfoQ 0 => self.process_info_q(rpcreader).await, // InfoQ
1 => self.process_answer(rpcreader).await, // InfoA 1 => self.process_answer(rpcreader).await, // InfoA
@ -1349,7 +1414,7 @@ impl RPCProcessor {
.routing_table() .routing_table()
.first_filtered_dial_info_detail(peer.dial_info_filter()) .first_filtered_dial_info_detail(peer.dial_info_filter())
{ {
RespondTo::Sender(Some(did.dial_info.clone())) RespondTo::Sender(Some(did.dial_info))
} else { } else {
RespondTo::Sender(None) RespondTo::Sender(None)
} }
@ -1363,7 +1428,7 @@ impl RPCProcessor {
question.set_op_id(self.get_next_op_id()); question.set_op_id(self.get_next_op_id());
let mut respond_to = question.reborrow().init_respond_to(); let mut respond_to = question.reborrow().init_respond_to();
self.get_respond_to_sender(peer.clone()) self.get_respond_to_sender(peer.clone())
.encode(&mut respond_to); .encode(&mut respond_to)?;
let detail = question.reborrow().init_detail(); let detail = question.reborrow().init_detail();
detail.init_info_q(); detail.init_info_q();
@ -1506,13 +1571,23 @@ impl RPCProcessor {
let mut fnq = detail.init_find_node_q(); let mut fnq = detail.init_find_node_q();
let mut node_id_builder = fnq.reborrow().init_node_id(); let mut node_id_builder = fnq.reborrow().init_node_id();
encode_public_key(&key, &mut node_id_builder)?; encode_public_key(&key, &mut node_id_builder)?;
let mut peer_info_builder = fnq.reborrow().init_peer_info();
let own_peer_info = self let own_peer_info = self
.routing_table() .routing_table()
.get_own_peer_info(self.default_peer_scope); .get_own_peer_info(self.default_peer_scope);
encode_peer_info(&own_peer_info, &mut peer_info_builder)?; let mut dil_builder = fnq.reborrow().init_dial_info_list(
own_peer_info
.dial_infos
.len()
.try_into()
.map_err(map_error_internal!("too many dial infos in peer info"))?,
);
for idx in 0..own_peer_info.dial_infos.len() {
let mut di_builder = dil_builder.reborrow().get(idx as u32);
encode_dial_info(&own_peer_info.dial_infos[idx], &mut di_builder)?;
}
find_node_q_msg.into_reader() find_node_q_msg.into_reader()
}; };

View File

@ -86,6 +86,7 @@ cfg_if! {
// pub use bump_port::*; // pub use bump_port::*;
pub use async_peek_stream::*; pub use async_peek_stream::*;
pub use async_recursion::async_recursion;
pub use clone_stream::*; pub use clone_stream::*;
pub use eventual::*; pub use eventual::*;
pub use eventual_base::{EventualCommon, EventualResolvedFuture}; pub use eventual_base::{EventualCommon, EventualResolvedFuture};