mirror of
https://gitlab.com/veilid/veilid.git
synced 2024-10-01 01:26:08 -04:00
relay
This commit is contained in:
parent
36f95692f6
commit
895fbf77c5
@ -717,7 +717,8 @@ impl NetworkManager {
|
|||||||
fn get_contact_method(&self, mut target_node_ref: NodeRef) -> Result<ContactMethod, String> {
|
fn get_contact_method(&self, mut target_node_ref: NodeRef) -> Result<ContactMethod, String> {
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
|
|
||||||
// Get our network class and protocol config
|
// Get our network class and protocol config and node id
|
||||||
|
let our_node_id = routing_table.node_id();
|
||||||
let our_network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid);
|
let our_network_class = self.get_network_class().unwrap_or(NetworkClass::Invalid);
|
||||||
let our_protocol_config = self.get_protocol_config().unwrap();
|
let our_protocol_config = self.get_protocol_config().unwrap();
|
||||||
|
|
||||||
@ -744,6 +745,7 @@ impl NetworkManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get the target's inbound relay, it must have one or it is not reachable
|
// Get the target's inbound relay, it must have one or it is not reachable
|
||||||
|
// Note that .relay() never returns our own node. We can't relay to ourselves.
|
||||||
if let Some(inbound_relay_nr) = target_node_ref.relay() {
|
if let Some(inbound_relay_nr) = target_node_ref.relay() {
|
||||||
// Can we reach the inbound relay?
|
// Can we reach the inbound relay?
|
||||||
if inbound_relay_nr
|
if inbound_relay_nr
|
||||||
@ -1334,7 +1336,7 @@ impl NetworkManager {
|
|||||||
|
|
||||||
// Re-send our node info if we selected a relay
|
// Re-send our node info if we selected a relay
|
||||||
if node_info_changed {
|
if node_info_changed {
|
||||||
self.routing_table().send_node_info_updates().await;
|
self.routing_table().send_node_info_updates(true).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -519,7 +519,7 @@ impl Network {
|
|||||||
self.inner.lock().network_started = true;
|
self.inner.lock().network_started = true;
|
||||||
|
|
||||||
// Inform routing table entries that our dial info has changed
|
// Inform routing table entries that our dial info has changed
|
||||||
self.routing_table().send_node_info_updates().await;
|
self.routing_table().send_node_info_updates(true).await;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -644,7 +644,7 @@ impl Network {
|
|||||||
log_net!(debug "network class changed to {:?}", network_class);
|
log_net!(debug "network class changed to {:?}", network_class);
|
||||||
|
|
||||||
// Send updates to everyone
|
// Send updates to everyone
|
||||||
routing_table.send_node_info_updates().await;
|
routing_table.send_node_info_updates(true).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -335,23 +335,32 @@ impl RoutingTable {
|
|||||||
|
|
||||||
// Public dial info changed, go through all nodes and reset their 'seen our node info' bit
|
// Public dial info changed, go through all nodes and reset their 'seen our node info' bit
|
||||||
if matches!(domain, RoutingDomain::PublicInternet) {
|
if matches!(domain, RoutingDomain::PublicInternet) {
|
||||||
let cur_ts = intf::get_timestamp();
|
Self::reset_all_seen_our_node_info(&*inner);
|
||||||
Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_, v| {
|
|
||||||
v.with_mut(|e| e.set_seen_our_node_info(false));
|
|
||||||
Option::<()>::None
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn reset_all_seen_our_node_info(inner: &RoutingTableInner) {
|
||||||
|
let cur_ts = intf::get_timestamp();
|
||||||
|
Self::with_entries(&*inner, cur_ts, BucketEntryState::Dead, |_, v| {
|
||||||
|
v.with_mut(|e| e.set_seen_our_node_info(false));
|
||||||
|
Option::<()>::None
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
pub fn clear_dial_info_details(&self, domain: RoutingDomain) {
|
pub fn clear_dial_info_details(&self, domain: RoutingDomain) {
|
||||||
trace!("clearing dial info domain: {:?}", domain);
|
trace!("clearing dial info domain: {:?}", domain);
|
||||||
|
|
||||||
let mut inner = self.inner.write();
|
let mut inner = self.inner.write();
|
||||||
Self::with_routing_domain_mut(&mut *inner, domain, |rd| {
|
Self::with_routing_domain_mut(&mut *inner, domain, |rd| {
|
||||||
rd.dial_info_details.clear();
|
rd.dial_info_details.clear();
|
||||||
})
|
});
|
||||||
|
|
||||||
|
// Public dial info changed, go through all nodes and reset their 'seen our node info' bit
|
||||||
|
if matches!(domain, RoutingDomain::PublicInternet) {
|
||||||
|
Self::reset_all_seen_our_node_info(&*inner);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn bucket_depth(index: usize) -> usize {
|
fn bucket_depth(index: usize) -> usize {
|
||||||
@ -424,8 +433,9 @@ impl RoutingTable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Inform routing table entries that our dial info has changed
|
// Inform routing table entries that our dial info has changed
|
||||||
pub async fn send_node_info_updates(&self) {
|
pub async fn send_node_info_updates(&self, all: bool) {
|
||||||
let this = self.clone();
|
let this = self.clone();
|
||||||
|
|
||||||
// Run in background only once
|
// Run in background only once
|
||||||
let _ = self
|
let _ = self
|
||||||
.clone()
|
.clone()
|
||||||
@ -451,7 +461,7 @@ impl RoutingTable {
|
|||||||
let cur_ts = intf::get_timestamp();
|
let cur_ts = intf::get_timestamp();
|
||||||
Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
|
Self::with_entries(&*inner, cur_ts, BucketEntryState::Unreliable, |k, v| {
|
||||||
// Only update nodes that haven't seen our node info yet
|
// Only update nodes that haven't seen our node info yet
|
||||||
if !v.with(|e| e.has_seen_our_node_info()) {
|
if all || !v.with(|e| e.has_seen_our_node_info()) {
|
||||||
node_refs.push(NodeRef::new(this.clone(), k, v, None));
|
node_refs.push(NodeRef::new(this.clone(), k, v, None));
|
||||||
}
|
}
|
||||||
Option::<()>::None
|
Option::<()>::None
|
||||||
@ -460,7 +470,7 @@ impl RoutingTable {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Send the updates
|
// Send the updates
|
||||||
log_rtab!("Sending node info updates to {} nodes", node_refs.len());
|
log_rtab!(debug "Sending node info updates to {} nodes", node_refs.len());
|
||||||
let mut unord = FuturesUnordered::new();
|
let mut unord = FuturesUnordered::new();
|
||||||
for nr in node_refs {
|
for nr in node_refs {
|
||||||
let rpc = this.rpc_processor();
|
let rpc = this.rpc_processor();
|
||||||
@ -483,7 +493,7 @@ impl RoutingTable {
|
|||||||
// Wait for futures to complete
|
// Wait for futures to complete
|
||||||
while unord.next().await.is_some() {}
|
while unord.next().await.is_some() {}
|
||||||
|
|
||||||
log_rtab!("Finished sending node updates");
|
log_rtab!(debug "Finished sending node updates");
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
@ -594,7 +604,6 @@ impl RoutingTable {
|
|||||||
// Make new entry
|
// Make new entry
|
||||||
inner.bucket_entry_count += 1;
|
inner.bucket_entry_count += 1;
|
||||||
let cnt = inner.bucket_entry_count;
|
let cnt = inner.bucket_entry_count;
|
||||||
log_rtab!(debug "Routing table now has {} nodes, {} live", cnt, Self::get_entry_count(&mut *inner, BucketEntryState::Unreliable));
|
|
||||||
let bucket = &mut inner.buckets[idx];
|
let bucket = &mut inner.buckets[idx];
|
||||||
let nr = bucket.add_entry(node_id);
|
let nr = bucket.add_entry(node_id);
|
||||||
|
|
||||||
@ -603,8 +612,8 @@ impl RoutingTable {
|
|||||||
entry.with_mut(update_func);
|
entry.with_mut(update_func);
|
||||||
|
|
||||||
// Kick the bucket
|
// Kick the bucket
|
||||||
// It is important to do this in the same inner lock as the add_entry
|
|
||||||
inner.kick_queue.insert(idx);
|
inner.kick_queue.insert(idx);
|
||||||
|
log_rtab!(debug "Routing table now has {} nodes, {} live", cnt, Self::get_entry_count(&mut *inner, BucketEntryState::Unreliable));
|
||||||
|
|
||||||
nr
|
nr
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user