From 53cd521ba83ac6031e14a72ea120b5b2760518cd Mon Sep 17 00:00:00 2001
From: John Smith <jsmith@example.com>
Date: Sat, 26 Mar 2022 21:25:24 -0400
Subject: [PATCH] fixes

---
 Cargo.lock                                    |  12 +
 external/keyring-manager                      |   2 +-
 veilid-core/Cargo.toml                        |   1 +
 veilid-core/src/lease_manager.rs              |   9 +-
 veilid-core/src/network_manager.rs            |  25 ++-
 veilid-core/src/routing_table/bucket_entry.rs |   9 +-
 veilid-core/src/routing_table/mod.rs          |  28 ++-
 veilid-core/src/routing_table/node_ref.rs     |   2 -
 veilid-core/src/rpc_processor/debug.rs        |  33 ++-
 veilid-core/src/rpc_processor/mod.rs          | 211 ++++++++++++------
 veilid-core/src/xx/mod.rs                     |   1 +
 11 files changed, 232 insertions(+), 101 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 7f62cebf..83ebd359 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -269,6 +269,17 @@ dependencies = [
  "winapi",
 ]
 
+[[package]]
+name = "async-recursion"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "async-std"
 version = "1.10.0"
@@ -4176,6 +4187,7 @@ dependencies = [
  "android_logger",
  "anyhow",
  "async-lock",
+ "async-recursion",
  "async-std",
  "async-tls",
  "async-tungstenite 0.17.1",
diff --git a/external/keyring-manager b/external/keyring-manager
index f73c27e6..d475bd55 160000
--- a/external/keyring-manager
+++ b/external/keyring-manager
@@ -1 +1 @@
-Subproject commit f73c27e66e43763f0f63ca9e697e77419f157a52
+Subproject commit d475bd558872b6aa6c1b642899b7957e11734cdc
diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml
index a260be3a..563636f7 100644
--- a/veilid-core/Cargo.toml
+++ b/veilid-core/Cargo.toml
@@ -34,6 +34,7 @@ directories = "^4"
 once_cell = "^1"
 json = "^0"
 flume = { version = "^0", features = ["async"] }
+async-recursion = "^1"
 
 ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] }
 x25519-dalek = { package = "x25519-dalek-ng", version = "^1", default_features = false, features = ["u64_backend"] }
diff --git a/veilid-core/src/lease_manager.rs b/veilid-core/src/lease_manager.rs
index 18688cfd..6570db3a 100644
--- a/veilid-core/src/lease_manager.rs
+++ b/veilid-core/src/lease_manager.rs
@@ -1,5 +1,6 @@
 use crate::*;
 use network_manager::*;
+use routing_table::*;
 use xx::*;
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
@@ -102,9 +103,9 @@ impl LeaseManager {
     // Server-side
 
     // Signal leases
-    pub fn server_has_valid_signal_lease(&self, _recipient_id: &DHTKey) -> bool {
+    pub fn server_has_valid_signal_lease(&self, _recipient_id: &DHTKey) -> Option<NodeRef> {
         error!("unimplemented");
-        false
+        None
     }
     pub fn server_can_provide_signal_lease(&self) -> bool {
         let inner = self.inner.lock();
@@ -147,9 +148,9 @@ impl LeaseManager {
     }
 
     // Relay leases
-    pub fn server_has_valid_relay_lease(&self, _recipient_id: &DHTKey) -> bool {
+    pub fn server_has_valid_relay_lease(&self, _recipient_id: &DHTKey) -> Option<NodeRef> {
         error!("unimplemented");
-        false
+        None
     }
     pub fn server_can_provide_relay_lease(&self) -> bool {
         let inner = self.inner.lock();
diff --git a/veilid-core/src/network_manager.rs b/veilid-core/src/network_manager.rs
index 9ad6ba7e..4ece77cf 100644
--- a/veilid-core/src/network_manager.rs
+++ b/veilid-core/src/network_manager.rs
@@ -534,19 +534,22 @@ impl NetworkManager {
         let recipient_id = envelope.get_recipient_id();
         if recipient_id != routing_table.node_id() {
             // Ensure a lease exists for this node before we relay it
-            if !lease_manager.server_has_valid_relay_lease(&recipient_id)
-                && !lease_manager.server_has_valid_relay_lease(&sender_id)
+            let relay_nr = if let Some(lease_nr) =
+                lease_manager.server_has_valid_relay_lease(&recipient_id)
             {
+                // Inbound lease
+                lease_nr
+            } else if let Some(lease_nr) = lease_manager.server_has_valid_relay_lease(&sender_id) {
+                // Resolve the node to send this to
+                rpc.resolve_node(recipient_id, Some(lease_nr.clone())).await.map_err(|e| {
+                    format!(
+                        "failed to resolve recipient node for relay, dropping outbound relayed packet...: {:?}",
+                        e
+                    )
+                })?
+            } else {
                 return Err("received envelope not intended for this node".to_owned());
-            }
-
-            // Resolve the node to send this to
-            let relay_nr = rpc.resolve_node(recipient_id).await.map_err(|e| {
-                format!(
-                    "failed to resolve recipient node for relay, dropping packet...: {:?}",
-                    e
-                )
-            })?;
+            };
 
             // Re-send the packet to the leased node
             self.net()
diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs
index d0cece65..cc634886 100644
--- a/veilid-core/src/routing_table/bucket_entry.rs
+++ b/veilid-core/src/routing_table/bucket_entry.rs
@@ -65,6 +65,13 @@ impl BucketEntry {
         self.dial_infos.sort();
     }
 
+    pub fn update_single_dial_info(&mut self, dial_info: &DialInfo) {
+        let dif = dial_info.make_filter(true);
+        self.dial_infos.retain(|di| !di.matches_filter(&dif));
+        self.dial_infos.push(dial_info.clone());
+        self.dial_infos.sort();
+    }
+
     pub fn first_filtered_dial_info<F>(&self, filter: F) -> Option<DialInfo>
     where
         F: Fn(&DialInfo) -> bool,
@@ -189,7 +196,7 @@ impl BucketEntry {
             state = BucketEntryState::Unreliable;
         }
 
-        match self.state(cur_ts) {
+        match state {
             BucketEntryState::Reliable => {
                 // If we are in a reliable state, we need a ping on an exponential scale
                 match self.peer_stats.ping_stats.last_pinged {
diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs
index 51c43e9d..117bb0f7 100644
--- a/veilid-core/src/routing_table/mod.rs
+++ b/veilid-core/src/routing_table/mod.rs
@@ -263,7 +263,7 @@ impl RoutingTable {
 
     fn trigger_changed_dial_info(inner: &mut RoutingTableInner) {
         // Clear 'seen dial info' bits on routing table entries so we know to ping them
-        for b in inner.buckets {
+        for b in &mut inner.buckets {
             for e in b.entries_mut() {
                 e.1.set_seen_our_dial_info(false);
             }
@@ -451,6 +451,21 @@ impl RoutingTable {
         Ok(nr)
     }
 
+    // Add a node if it doesn't exist, or update a single dial info on an already registered node
+    pub fn update_node_with_single_dial_info(
+        &self,
+        node_id: DHTKey,
+        dial_info: &DialInfo,
+    ) -> Result<NodeRef, String> {
+        let nr = self.create_node_ref(node_id)?;
+        nr.operate(move |e| -> Result<(), String> {
+            e.update_single_dial_info(dial_info);
+            Ok(())
+        })?;
+
+        Ok(nr)
+    }
+
     fn operate_on_bucket_entry<T, F>(&self, node_id: DHTKey, f: F) -> T
     where
         F: FnOnce(&mut BucketEntry) -> T,
@@ -484,8 +499,15 @@ impl RoutingTable {
         );
 
         // register nodes we'd found
-        let mut out = Vec::<NodeRef>::with_capacity(res.peers.len());
-        for p in res.peers {
+        self.register_find_node_answer(res)
+    }
+
+    pub fn register_find_node_answer(&self, fna: FindNodeAnswer) -> Result<Vec<NodeRef>, String> {
+        let node_id = self.node_id();
+
+        // register nodes we'd found
+        let mut out = Vec::<NodeRef>::with_capacity(fna.peers.len());
+        for p in fna.peers {
             // if our own node if is in the list then ignore it, as we don't add ourselves to our own routing table
             if p.node_id.key == node_id {
                 continue;
diff --git a/veilid-core/src/routing_table/node_ref.rs b/veilid-core/src/routing_table/node_ref.rs
index 04351df9..b665efae 100644
--- a/veilid-core/src/routing_table/node_ref.rs
+++ b/veilid-core/src/routing_table/node_ref.rs
@@ -48,11 +48,9 @@ impl NodeRef {
 
     // Returns if this node has seen and acknowledged our node's dial info yet
     pub fn has_seen_our_dial_info(&self) -> bool {
-        let nm = self.routing_table.network_manager();
         self.operate(|e| e.has_seen_our_dial_info())
     }
     pub fn set_seen_our_dial_info(&self) {
-        let nm = self.routing_table.network_manager();
         self.operate(|e| e.set_seen_our_dial_info(true));
     }
 
diff --git a/veilid-core/src/rpc_processor/debug.rs b/veilid-core/src/rpc_processor/debug.rs
index b1a8d7ac..492375cf 100644
--- a/veilid-core/src/rpc_processor/debug.rs
+++ b/veilid-core/src/rpc_processor/debug.rs
@@ -201,23 +201,34 @@ impl RPCProcessor {
                         return format!("(invalid node id: {})", e);
                     }
                 };
-                let pir = match fnqr.get_peer_info() {
-                    Ok(pir) => pir,
+
+                let dil_reader = match fnqr.reborrow().get_dial_info_list() {
+                    Ok(dilr) => dilr,
                     Err(e) => {
-                        return format!("(invalid peer_info: {})", e);
+                        return format!("(invalid dial info list: {})", e);
                     }
                 };
+                let mut dial_infos =
+                    Vec::<DialInfo>::with_capacity(match dil_reader.len().try_into() {
+                        Ok(v) => v,
+                        Err(e) => {
+                            return format!("(too many dial infos: {})", e);
+                        }
+                    });
+                for di in dil_reader.iter() {
+                    dial_infos.push(match decode_dial_info(&di) {
+                        Ok(v) => v,
+                        Err(e) => {
+                            return format!("(unable to decode dial info: {})", e);
+                        }
+                    });
+                }
+
                 let node_id = decode_public_key(&nidr);
-                let peer_info = match decode_peer_info(&pir) {
-                    Ok(pi) => pi,
-                    Err(e) => {
-                        return e.to_string();
-                    }
-                };
                 format!(
-                    "FindNodeQ: node_id={} peer_info={:?}",
+                    "FindNodeQ: node_id={} dial_infos={:?}",
                     node_id.encode(),
-                    peer_info
+                    dial_infos
                 )
             }
             veilid_capnp::operation::detail::FindNodeA(_) => {
diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs
index 335d25da..8d8373fa 100644
--- a/veilid-core/src/rpc_processor/mod.rs
+++ b/veilid-core/src/rpc_processor/mod.rs
@@ -45,11 +45,11 @@ impl RespondTo {
                 builder.set_none(());
             }
             Self::Sender(Some(di)) => {
-                let mut di_builder = builder.init_sender();
+                let mut di_builder = builder.reborrow().init_sender();
                 encode_dial_info(di, &mut di_builder)?;
             }
             Self::Sender(None) => {
-                builder.init_sender();
+                builder.reborrow().init_sender();
             }
             Self::PrivateRoute(pr) => {
                 let mut pr_builder = builder.reborrow().init_private_route();
@@ -232,7 +232,7 @@ impl RPCProcessor {
 
     //////////////////////////////////////////////////////////////////////
 
-    // Search the DHT for a single node closest to a key unless we have that node in our routing table already, and return the node reference
+    // Search the DHT for a single node closest to a key and add it to the routing table and return the node reference
     pub async fn search_dht_single_key(
         &self,
         node_id: key::DHTKey,
@@ -242,15 +242,6 @@ impl RPCProcessor {
     ) -> Result<NodeRef, RPCError> {
         let routing_table = self.routing_table();
 
-        // First see if we have the node in our routing table already
-        if let Some(nr) = routing_table.lookup_node_ref(node_id) {
-            // ensure we have dial_info for the entry already,
-            // if not, we should do the find_node anyway
-            if !nr.operate(|e| e.dial_infos().is_empty()) {
-                return Ok(nr);
-            }
-        }
-
         // xxx find node but stop if we find the exact node we want
         // xxx return whatever node is closest after the timeout
         Err(rpc_error_unimplemented("search_dht_single_key")).map_err(logthru_rpc!(error))
@@ -269,26 +260,66 @@ impl RPCProcessor {
     }
 
     // Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference
-    pub async fn resolve_node(&self, node_id: key::DHTKey) -> Result<NodeRef, RPCError> {
-        let (count, fanout, timeout) = {
-            let c = self.config.get();
-            (
-                c.network.dht.resolve_node_count,
-                c.network.dht.resolve_node_fanout,
-                c.network.dht.resolve_node_timeout_ms.map(ms_to_us),
-            )
-        };
+    // Note: This routine can possible be recursive, hence the SystemPinBoxFuture async form
+    pub fn resolve_node(
+        &self,
+        node_id: key::DHTKey,
+        lease_holder: Option<NodeRef>,
+    ) -> SystemPinBoxFuture<Result<NodeRef, RPCError>> {
+        let this = self.clone();
+        Box::pin(async move {
+            let routing_table = this.routing_table();
 
-        let nr = self
-            .search_dht_single_key(node_id, count, fanout, timeout)
-            .await?;
+            // First see if we have the node in our routing table already
+            if let Some(nr) = routing_table.lookup_node_ref(node_id) {
+                // ensure we have dial_info for the entry already,
+                // if not, we should do the find_node anyway
+                if !nr.operate(|e| e.dial_infos().is_empty()) {
+                    return Ok(nr);
+                }
+            }
 
-        if nr.node_id() != node_id {
-            // found a close node, but not exact within our configured resolve_node timeout
-            return Err(RPCError::Timeout).map_err(logthru_rpc!());
-        }
+            // If not, if we are resolving on behalf of a lease holder, ask them for their routing table around the node first
+            if let Some(lhnr) = lease_holder {
+                let fna = this
+                    .clone()
+                    .rpc_call_find_node(
+                        Destination::Direct(lhnr.clone()),
+                        node_id,
+                        None,
+                        RespondTo::Sender(None),
+                    )
+                    .await?;
+                if let Ok(nrefs) = routing_table.register_find_node_answer(fna) {
+                    for nr in nrefs {
+                        if !nr.operate(|e| e.dial_infos().is_empty()) {
+                            return Ok(nr);
+                        }
+                    }
+                }
+            }
 
-        Ok(nr)
+            // If nobody knows where this node is, ask the DHT for it
+            let (count, fanout, timeout) = {
+                let c = this.config.get();
+                (
+                    c.network.dht.resolve_node_count,
+                    c.network.dht.resolve_node_fanout,
+                    c.network.dht.resolve_node_timeout_ms.map(ms_to_us),
+                )
+            };
+
+            let nr = this
+                .search_dht_single_key(node_id, count, fanout, timeout)
+                .await?;
+
+            if nr.node_id() != node_id {
+                // found a close node, but not exact within our configured resolve_node timeout
+                return Err(RPCError::Timeout).map_err(logthru_rpc!());
+            }
+
+            Ok(nr)
+        })
     }
 
     // set up wait for reply
@@ -510,7 +541,7 @@ impl RPCProcessor {
         let node_ref = match out_noderef {
             None => {
                 // resolve node
-                self.resolve_node(out_node_id)
+                self.resolve_node(out_node_id, None)
                     .await
                     .map_err(logthru_rpc!(error))?
             }
@@ -708,7 +739,7 @@ impl RPCProcessor {
         let node_ref = match out_noderef {
             None => {
                 // resolve node
-                self.resolve_node(out_node_id).await?
+                self.resolve_node(out_node_id, None).await?
             }
             Some(nr) => {
                 // got the node in the routing table already
@@ -983,22 +1014,32 @@ impl RPCProcessor {
                 _ => panic!("invalid operation type in process_find_node_q"),
             };
 
-            // ensure find_node peerinfo matches the envelope
+            // get the node id we want to look up
             let target_node_id = decode_public_key(
                 &fnq_reader
                     .get_node_id()
                     .map_err(map_error_capnp_error!())
                     .map_err(logthru_rpc!())?,
             );
-            let peer_info = decode_peer_info(
-                &fnq_reader
-                    .get_peer_info()
-                    .map_err(map_error_capnp_error!())
-                    .map_err(logthru_rpc!())?,
-            )?;
-            if peer_info.node_id.key != rpcreader.header.envelope.get_sender_id() {
-                return Err(RPCError::InvalidFormat);
+
+            // get the peerinfo/dialinfos of the requesting node
+            let dil_reader = fnq_reader
+                .reborrow()
+                .get_dial_info_list()
+                .map_err(map_error_capnp_error!())?;
+            let mut dial_infos = Vec::<DialInfo>::with_capacity(
+                dil_reader
+                    .len()
+                    .try_into()
+                    .map_err(map_error_protocol!("too many dial infos"))?,
+            );
+            for di in dil_reader.iter() {
+                dial_infos.push(decode_dial_info(&di)?)
             }
+            let peer_info = PeerInfo {
+                node_id: NodeId::new(rpcreader.header.envelope.get_sender_id()),
+                dial_infos,
+            };
 
             // filter out attempts to pass non-public addresses in for peers
             if !self.filter_peer_scope(&peer_info) {
@@ -1153,14 +1194,14 @@ impl RPCProcessor {
             reader,
         };
 
-        let (which, is_q) = {
+        let which = {
             let operation = rpcreader
                 .reader
                 .get_root::<veilid_capnp::operation::Reader>()
                 .map_err(map_error_capnp_error!())
                 .map_err(logthru_rpc!())?;
 
-            match operation
+            let (which, is_q) = match operation
                 .get_detail()
                 .which()
                 .map_err(map_error_capnp_notinschema!())?
@@ -1191,30 +1232,54 @@ impl RPCProcessor {
                 veilid_capnp::operation::detail::CompleteTunnelA(_) => (23u32, false),
                 veilid_capnp::operation::detail::CancelTunnelQ(_) => (24u32, true),
                 veilid_capnp::operation::detail::CancelTunnelA(_) => (25u32, false),
-            }
-        };
-        // Accounting for questions we receive
-        if is_q {
-            // look up sender node, in case it's different than our peer due to relaying
-            if let Some(sender_nr) = self
-                .routing_table()
-                .lookup_node_ref(rpcreader.header.envelope.get_sender_id())
-            {
-                if which == 0u32 {
-                    self.routing_table().stats_ping_rcvd(
-                        sender_nr,
-                        rpcreader.header.timestamp,
-                        rpcreader.header.body_len,
-                    );
-                } else {
-                    self.routing_table().stats_question_rcvd(
-                        sender_nr,
-                        rpcreader.header.timestamp,
-                        rpcreader.header.body_len,
-                    );
+            };
+
+            // Accounting for questions we receive
+            if is_q {
+                // See if we have some Sender DialInfo to incorporate
+                let opt_sender_nr =
+                    if let veilid_capnp::operation::respond_to::Sender(Ok(sender_di_reader)) =
+                        operation
+                            .get_respond_to()
+                            .which()
+                            .map_err(map_error_capnp_notinschema!())?
+                    {
+                        // Sender DialInfo was specified, update our routing table with it
+                        let sender_di = decode_dial_info(&sender_di_reader)?;
+                        let nr = self
+                            .routing_table()
+                            .update_node_with_single_dial_info(
+                                rpcreader.header.envelope.get_sender_id(),
+                                &sender_di,
+                            )
+                            .map_err(RPCError::Internal)?;
+                        Some(nr)
+                    } else {
+                        self.routing_table()
+                            .lookup_node_ref(rpcreader.header.envelope.get_sender_id())
+                    };
+
+                // look up sender node, in case it's different than our peer due to relaying
+                if let Some(sender_nr) = opt_sender_nr {
+                    if which == 0u32 {
+                        self.routing_table().stats_ping_rcvd(
+                            sender_nr,
+                            rpcreader.header.timestamp,
+                            rpcreader.header.body_len,
+                        );
+                    } else {
+                        self.routing_table().stats_question_rcvd(
+                            sender_nr,
+                            rpcreader.header.timestamp,
+                            rpcreader.header.body_len,
+                        );
+                    }
                 }
-            }
+            };
+
+            which
         };
+
         match which {
             0 => self.process_info_q(rpcreader).await, // InfoQ
             1 => self.process_answer(rpcreader).await, // InfoA
@@ -1349,7 +1414,7 @@ impl RPCProcessor {
             .routing_table()
             .first_filtered_dial_info_detail(peer.dial_info_filter())
         {
-            RespondTo::Sender(Some(did.dial_info.clone()))
+            RespondTo::Sender(Some(did.dial_info))
         } else {
             RespondTo::Sender(None)
         }
@@ -1363,7 +1428,7 @@ impl RPCProcessor {
             question.set_op_id(self.get_next_op_id());
             let mut respond_to = question.reborrow().init_respond_to();
             self.get_respond_to_sender(peer.clone())
-                .encode(&mut respond_to);
+                .encode(&mut respond_to)?;
             let detail = question.reborrow().init_detail();
             detail.init_info_q();
 
@@ -1506,13 +1571,23 @@ impl RPCProcessor {
             let mut fnq = detail.init_find_node_q();
             let mut node_id_builder = fnq.reborrow().init_node_id();
             encode_public_key(&key, &mut node_id_builder)?;
-            let mut peer_info_builder = fnq.reborrow().init_peer_info();
 
             let own_peer_info = self
                 .routing_table()
                 .get_own_peer_info(self.default_peer_scope);
 
-            encode_peer_info(&own_peer_info, &mut peer_info_builder)?;
+            let mut dil_builder = fnq.reborrow().init_dial_info_list(
+                own_peer_info
+                    .dial_infos
+                    .len()
+                    .try_into()
+                    .map_err(map_error_internal!("too many dial infos in peer info"))?,
+            );
+
+            for idx in 0..own_peer_info.dial_infos.len() {
+                let mut di_builder = dil_builder.reborrow().get(idx as u32);
+                encode_dial_info(&own_peer_info.dial_infos[idx], &mut di_builder)?;
+            }
 
             find_node_q_msg.into_reader()
         };
diff --git a/veilid-core/src/xx/mod.rs b/veilid-core/src/xx/mod.rs
index b0055419..83a8022d 100644
--- a/veilid-core/src/xx/mod.rs
+++ b/veilid-core/src/xx/mod.rs
@@ -86,6 +86,7 @@ cfg_if! {
 
 // pub use bump_port::*;
 pub use async_peek_stream::*;
+pub use async_recursion::async_recursion;
 pub use clone_stream::*;
 pub use eventual::*;
 pub use eventual_base::{EventualCommon, EventualResolvedFuture};