last_connection expiration

This commit is contained in:
John Smith 2022-04-17 19:10:10 -04:00
parent 5527740f6a
commit 9cee8c292f
6 changed files with 176 additions and 21 deletions

View File

@ -82,6 +82,7 @@ impl ConnectionManager {
} }
// Returns a network connection if one already is established // Returns a network connection if one already is established
pub async fn get_connection( pub async fn get_connection(
&self, &self,
descriptor: ConnectionDescriptor, descriptor: ConnectionDescriptor,

View File

@ -432,7 +432,7 @@ impl NetworkManager {
} }
// Process a received out-of-band receipt // Process a received out-of-band receipt
pub async fn process_out_of_band_receipt<R: AsRef<[u8]>>( pub async fn handle_out_of_band_receipt<R: AsRef<[u8]>>(
&self, &self,
receipt_data: R, receipt_data: R,
descriptor: ConnectionDescriptor, descriptor: ConnectionDescriptor,
@ -455,7 +455,7 @@ impl NetworkManager {
} }
// Process a received in-band receipt // Process a received in-band receipt
pub async fn process_in_band_receipt<R: AsRef<[u8]>>( pub async fn handle_in_band_receipt<R: AsRef<[u8]>>(
&self, &self,
receipt_data: R, receipt_data: R,
inbound_nr: NodeRef, inbound_nr: NodeRef,
@ -468,6 +468,57 @@ impl NetworkManager {
receipt_manager.handle_receipt(inbound_nr, receipt).await receipt_manager.handle_receipt(inbound_nr, receipt).await
} }
// Process a received signal
pub async fn handle_signal(&self, signal_info: SignalInfo) -> Result<(), String> {
match signal_info {
SignalInfo::ReverseConnect { receipt, peer_info } => {
let routing_table = self.routing_table();
let rpc = self.rpc_processor();
// Add the peer info to our routing table
let peer_nr = routing_table
.register_node_with_node_info(peer_info.node_id.key, peer_info.node_info)?;
// Make a reverse connection to the peer and send the receipt to it
rpc.rpc_call_return_receipt(Destination::Direct(peer_nr), None, receipt)
.await
.map_err(map_to_string)?;
}
SignalInfo::HolePunch { receipt, peer_info } => {
let routing_table = self.routing_table();
// Add the peer info to our routing table
let peer_nr = routing_table
.register_node_with_node_info(peer_info.node_id.key, peer_info.node_info)?;
// Get the udp direct dialinfo for the hole punch
let hole_punch_dial_info = if let Some(hpdi) = peer_nr
.node_info()
.first_filtered_dial_info(|di| matches!(di.protocol_type(), ProtocolType::UDP))
{
hpdi
} else {
return Err("No hole punch capable dialinfo found for node".to_owned());
};
// Do our half of the hole punch by sending an empty packet
// Both sides will do this and then the receipt will get sent over the punched hole
self.net()
.send_data_to_dial_info(hole_punch_dial_info.clone(), Vec::new())
.await?;
// XXX: do we need a delay here? or another hole punch packet?
// Return the receipt over the direct channel since we want to use exactly the same dial info
self.send_direct_receipt(hole_punch_dial_info, receipt, false)
.await
.map_err(map_to_string)?;
}
}
Ok(())
}
// Builds an envelope for sending over the network // Builds an envelope for sending over the network
fn build_envelope<B: AsRef<[u8]>>( fn build_envelope<B: AsRef<[u8]>>(
&self, &self,
@ -703,7 +754,7 @@ impl NetworkManager {
} }
// And now use the existing connection to send over // And now use the existing connection to send over
if let Some(descriptor) = inbound_nr.last_connection() { if let Some(descriptor) = inbound_nr.last_connection().await {
match self match self
.net() .net()
.send_data_to_existing_connection(descriptor, data) .send_data_to_existing_connection(descriptor, data)
@ -787,7 +838,7 @@ impl NetworkManager {
} }
// And now use the existing connection to send over // And now use the existing connection to send over
if let Some(descriptor) = inbound_nr.last_connection() { if let Some(descriptor) = inbound_nr.last_connection().await {
match self match self
.net() .net()
.send_data_to_existing_connection(descriptor, data) .send_data_to_existing_connection(descriptor, data)
@ -819,7 +870,7 @@ impl NetworkManager {
let this = self.clone(); let this = self.clone();
Box::pin(async move { Box::pin(async move {
// First try to send data to the last socket we've seen this peer on // First try to send data to the last socket we've seen this peer on
let data = if let Some(descriptor) = node_ref.last_connection() { let data = if let Some(descriptor) = node_ref.last_connection().await {
match this match this
.net() .net()
.send_data_to_existing_connection(descriptor, data) .send_data_to_existing_connection(descriptor, data)
@ -874,7 +925,7 @@ impl NetworkManager {
// Is this an out-of-band receipt instead of an envelope? // Is this an out-of-band receipt instead of an envelope?
if data[0..4] == *RECEIPT_MAGIC { if data[0..4] == *RECEIPT_MAGIC {
self.process_out_of_band_receipt(data, descriptor).await?; self.handle_out_of_band_receipt(data, descriptor).await?;
return Ok(true); return Ok(true);
} }

View File

@ -21,7 +21,7 @@ const UNRELIABLE_PING_INTERVAL_SECS: u32 = 5;
// Keepalive pings are done occasionally to ensure holepunched public dialinfo // Keepalive pings are done occasionally to ensure holepunched public dialinfo
// remains valid, as well as to make sure we remain in any relay node's routing table // remains valid, as well as to make sure we remain in any relay node's routing table
const KEEPALIVE_PING_INTERVAL_SECS: u32 = 30; const KEEPALIVE_PING_INTERVAL_SECS: u32 = 20;
// Do not change order here, it will mess up other sorts // Do not change order here, it will mess up other sorts
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
@ -131,8 +131,8 @@ impl BucketEntry {
self.last_connection = Some((last_connection, timestamp)); self.last_connection = Some((last_connection, timestamp));
} }
pub fn last_connection(&self) -> Option<ConnectionDescriptor> { pub fn last_connection(&self) -> Option<(ConnectionDescriptor, u64)> {
self.last_connection.as_ref().map(|x| x.0) self.last_connection
} }
pub fn set_min_max_version(&mut self, min_max_version: (u8, u8)) { pub fn set_min_max_version(&mut self, min_max_version: (u8, u8)) {

View File

@ -2,6 +2,10 @@ use super::*;
use crate::dht::*; use crate::dht::*;
use alloc::fmt; use alloc::fmt;
// Connectionless protocols like UDP are dependent on a NAT translation timeout
// We should ping them with some frequency and 30 seconds is typical timeout
const CONNECTIONLESS_TIMEOUT_SECS: u32 = 29;
pub struct NodeRef { pub struct NodeRef {
routing_table: RoutingTable, routing_table: RoutingTable,
node_id: DHTKey, node_id: DHTKey,
@ -42,9 +46,35 @@ impl NodeRef {
pub fn set_seen_our_node_info(&self) { pub fn set_seen_our_node_info(&self) {
self.operate(|e| e.set_seen_our_node_info(true)); self.operate(|e| e.set_seen_our_node_info(true));
} }
pub fn last_connection(&self) -> Option<ConnectionDescriptor> { pub async fn last_connection(&self) -> Option<ConnectionDescriptor> {
self.operate(|e| e.last_connection()) // Get the last connection and the last time we saw anything with this connection
let (last_connection, last_seen) = self.operate(|e| {
if let Some((last_connection, connection_ts)) = e.last_connection() {
if let Some(last_seen) = e.peer_stats().last_seen {
Some((last_connection, u64::max(last_seen, connection_ts)))
} else {
Some((last_connection, connection_ts))
} }
} else {
None
}
})?;
// Should we check the connection table?
if last_connection.protocol_type().is_connection_oriented() {
// Look the connection up in the connection manager and see if it's still there
let connection_manager = self.routing_table.network_manager().connection_manager();
connection_manager.get_connection(last_connection).await?;
} else {
// If this is not connection oriented, then we check our last seen time
// to see if this mapping has expired (beyond our timeout)
let cur_ts = intf::get_timestamp();
if (last_seen + (CONNECTIONLESS_TIMEOUT_SECS as u64 * 1_000_000u64)) < cur_ts {
return None;
}
}
Some(last_connection)
}
pub fn has_any_dial_info(&self) -> bool { pub fn has_any_dial_info(&self) -> bool {
self.operate(|e| e.node_info().has_any_dial_info() || e.local_node_info().has_dial_info()) self.operate(|e| e.node_info().has_any_dial_info() || e.local_node_info().has_dial_info())
} }

View File

@ -791,16 +791,18 @@ impl RPCProcessor {
////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////
fn generate_sender_info(&self, rpcreader: &RPCMessageReader) -> SenderInfo { async fn generate_sender_info(&self, peer_noderef: NodeRef) -> SenderInfo {
let socket_address = rpcreader let socket_address = peer_noderef
.header .last_connection()
.peer_noderef .await
.operate(|entry| entry.last_connection().map(|c| c.remote.socket_address)); .map(|c| c.remote.socket_address);
SenderInfo { socket_address } SenderInfo { socket_address }
} }
async fn process_info_q(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { async fn process_info_q(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> {
// let peer_noderef = rpcreader.header.peer_noderef.clone();
let sender_info = self.generate_sender_info(peer_noderef).await;
let reply_msg = { let reply_msg = {
let operation = rpcreader let operation = rpcreader
.reader .reader
@ -849,7 +851,6 @@ impl RPCProcessor {
encode_node_status(&node_status, &mut nsb)?; encode_node_status(&node_status, &mut nsb)?;
// Add sender info // Add sender info
let sender_info = self.generate_sender_info(&rpcreader);
let mut sib = info_a.reborrow().init_sender_info(); let mut sib = info_a.reborrow().init_sender_info();
encode_sender_info(&sender_info, &mut sib)?; encode_sender_info(&sender_info, &mut sib)?;
@ -1078,8 +1079,35 @@ impl RPCProcessor {
Err(rpc_error_unimplemented("process_find_block_q")) Err(rpc_error_unimplemented("process_find_block_q"))
} }
async fn process_signal(&self, _rpcreader: RPCMessageReader) -> Result<(), RPCError> { async fn process_signal(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> {
Err(rpc_error_unimplemented("process_signal")) let signal_info = {
let operation = rpcreader
.reader
.get_root::<veilid_capnp::operation::Reader>()
.map_err(map_error_capnp_error!())
.map_err(logthru_rpc!())?;
// This should never want an answer
if self.wants_answer(&operation)? {
return Err(RPCError::InvalidFormat);
}
// get signal reader
let sig_reader = match operation.get_detail().which() {
Ok(veilid_capnp::operation::detail::Which::Signal(Ok(x))) => x,
_ => panic!("invalid operation type in process_signal"),
};
// Get signal info
decode_signal_info(&sig_reader)?
};
// Handle it
let network_manager = self.network_manager();
network_manager
.handle_signal(signal_info)
.await
.map_err(map_error_string!())
} }
async fn process_return_receipt(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> { async fn process_return_receipt(&self, rpcreader: RPCMessageReader) -> Result<(), RPCError> {
@ -1112,7 +1140,7 @@ impl RPCProcessor {
// Handle it // Handle it
let network_manager = self.network_manager(); let network_manager = self.network_manager();
network_manager network_manager
.process_in_band_receipt(rcpt_data, rpcreader.header.peer_noderef) .handle_in_band_receipt(rcpt_data, rpcreader.header.peer_noderef)
.await .await
.map_err(map_error_string!()) .map_err(map_error_string!())
} }
@ -1612,5 +1640,41 @@ impl RPCProcessor {
Ok(()) Ok(())
} }
// Sends a unidirectional in-band return receipt
// Can be sent via all methods including relays and routes
pub async fn rpc_call_return_receipt<B: AsRef<[u8]>>(
&self,
dest: Destination,
safety_route: Option<&SafetyRouteSpec>,
rcpt_data: B,
) -> Result<(), RPCError> {
// Validate receipt before we send it, otherwise this may be arbitrary data!
let _ = Receipt::from_signed_data(rcpt_data.as_ref())
.map_err(|_| "failed to validate direct receipt".to_owned())
.map_err(map_error_string!())?;
let rr_msg = {
let mut rr_msg = ::capnp::message::Builder::new_default();
let mut question = rr_msg.init_root::<veilid_capnp::operation::Builder>();
question.set_op_id(self.get_next_op_id());
let mut respond_to = question.reborrow().init_respond_to();
respond_to.set_none(());
let detail = question.reborrow().init_detail();
let rr_builder = detail.init_return_receipt();
let r_builder = rr_builder.init_receipt(rcpt_data.as_ref().len().try_into().map_err(
map_error_protocol!("invalid receipt length in return receipt"),
)?);
r_builder.copy_from_slice(rcpt_data.as_ref());
rr_msg.into_reader()
};
// Send the return receipt request
self.request(dest, rr_msg, safety_route).await?;
Ok(())
}
// xxx do not process latency for routed messages // xxx do not process latency for routed messages
} }

View File

@ -421,6 +421,15 @@ pub enum ProtocolType {
WSS, WSS,
} }
impl ProtocolType {
pub fn is_connection_oriented(&self) -> bool {
matches!(
self,
ProtocolType::TCP | ProtocolType::WS | ProtocolType::WSS
)
}
}
#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)] #[derive(Copy, Clone, Debug, Default, Serialize, Deserialize)]
pub struct ProtocolSet { pub struct ProtocolSet {
pub udp: bool, pub udp: bool,