Merge branch 'geolocation-2' into 'main'

IP geolocation, extend BucketEntry

See merge request veilid/veilid!331
This commit is contained in:
Christien Rioux 2024-11-12 17:11:51 +00:00
commit 995f078bd6
11 changed files with 240 additions and 43 deletions

View File

@ -1,4 +1,4 @@
pub static DEFAULT_LOG_FACILITIES_IGNORE_LIST: [&str; 29] = [ pub static DEFAULT_LOG_FACILITIES_IGNORE_LIST: &[&str] = &[
"mio", "mio",
"h2", "h2",
"hyper", "hyper",
@ -28,9 +28,11 @@ pub static DEFAULT_LOG_FACILITIES_IGNORE_LIST: [&str; 29] = [
"fanout", "fanout",
"receipt", "receipt",
"rpc_message", "rpc_message",
#[cfg(feature = "geolocation")]
"maxminddb",
]; ];
pub static FLAME_LOG_FACILITIES_IGNORE_LIST: [&str; 22] = [ pub static FLAME_LOG_FACILITIES_IGNORE_LIST: &[&str] = &[
"mio", "mio",
"h2", "h2",
"hyper", "hyper",
@ -53,9 +55,11 @@ pub static FLAME_LOG_FACILITIES_IGNORE_LIST: [&str; 22] = [
"hickory_proto", "hickory_proto",
"attohttpc", "attohttpc",
"ws_stream_wasm", "ws_stream_wasm",
#[cfg(feature = "geolocation")]
"maxminddb",
]; ];
pub static DEFAULT_LOG_FACILITIES_ENABLED_LIST: [&str; 8] = [ pub static DEFAULT_LOG_FACILITIES_ENABLED_LIST: &[&str] = &[
"net", "net",
"rpc", "rpc",
"rtab", "rtab",
@ -66,7 +70,7 @@ pub static DEFAULT_LOG_FACILITIES_ENABLED_LIST: [&str; 8] = [
"crypto", "crypto",
]; ];
pub static DURATION_LOG_FACILITIES: [&str; 1] = ["veilid_api"]; pub static DURATION_LOG_FACILITIES: &[&str] = &["veilid_api"];
#[macro_export] #[macro_export]
macro_rules! fn_string { macro_rules! fn_string {

View File

@ -19,8 +19,9 @@ impl VeilidLayerFilter {
ignore_change_list: &[String], ignore_change_list: &[String],
) -> VeilidLayerFilter { ) -> VeilidLayerFilter {
let mut ignore_list = DEFAULT_LOG_FACILITIES_IGNORE_LIST let mut ignore_list = DEFAULT_LOG_FACILITIES_IGNORE_LIST
.map(|x| x.to_owned()) .iter()
.to_vec(); .map(|&x| x.to_owned())
.collect::<Vec<_>>();
Self::apply_ignore_change_list(&mut ignore_list, ignore_change_list); Self::apply_ignore_change_list(&mut ignore_list, ignore_change_list);
Self { Self {
inner: Arc::new(RwLock::new(VeilidLayerFilterInner { inner: Arc::new(RwLock::new(VeilidLayerFilterInner {
@ -64,8 +65,9 @@ impl VeilidLayerFilter {
let mut inner = self.inner.write(); let mut inner = self.inner.write();
inner.ignore_list = ignore_list.unwrap_or_else(|| { inner.ignore_list = ignore_list.unwrap_or_else(|| {
DEFAULT_LOG_FACILITIES_IGNORE_LIST DEFAULT_LOG_FACILITIES_IGNORE_LIST
.map(|x| x.to_owned()) .iter()
.to_vec() .map(|&x| x.to_owned())
.collect::<Vec<_>>()
}); });
} }
callsite::rebuild_interest_cache(); callsite::rebuild_interest_cache();

View File

@ -174,6 +174,10 @@ pub(crate) struct BucketEntryInner {
last_sender_info: HashMap<LastSenderInfoKey, SenderInfo>, last_sender_info: HashMap<LastSenderInfoKey, SenderInfo>,
/// The node info for this entry on the publicinternet routing domain /// The node info for this entry on the publicinternet routing domain
public_internet: BucketEntryPublicInternet, public_internet: BucketEntryPublicInternet,
/// Node location
#[cfg(feature = "geolocation")]
#[serde(skip)]
geolocation_info: GeolocationInfo,
/// The node info for this entry on the localnetwork routing domain /// The node info for this entry on the localnetwork routing domain
local_network: BucketEntryLocalNetwork, local_network: BucketEntryLocalNetwork,
/// Statistics gathered for the peer /// Statistics gathered for the peer
@ -461,6 +465,12 @@ impl BucketEntryInner {
self.updated_since_last_network_change = true; self.updated_since_last_network_change = true;
self.make_not_dead(Timestamp::now()); self.make_not_dead(Timestamp::now());
// Update geolocation info
#[cfg(feature = "geolocation")]
{
self.geolocation_info = signed_node_info.get_geolocation_info(routing_domain);
}
// If we're updating an entry's node info, purge all // If we're updating an entry's node info, purge all
// but the last connection in our last connections list // but the last connection in our last connections list
// because the dial info could have changed and it's safer to just reconnect. // because the dial info could have changed and it's safer to just reconnect.
@ -473,6 +483,13 @@ impl BucketEntryInner {
node_info_changed node_info_changed
} }
#[cfg(feature = "geolocation")]
pub(super) fn update_geolocation_info(&mut self) {
if let Some(ref sni) = self.public_internet.signed_node_info {
self.geolocation_info = sni.get_geolocation_info(RoutingDomain::PublicInternet);
}
}
pub fn has_node_info(&self, routing_domain_set: RoutingDomainSet) -> bool { pub fn has_node_info(&self, routing_domain_set: RoutingDomainSet) -> bool {
for routing_domain in routing_domain_set { for routing_domain in routing_domain_set {
// Get the correct signed_node_info for the chosen routing domain // Get the correct signed_node_info for the chosen routing domain
@ -726,6 +743,11 @@ impl BucketEntryInner {
self.state_reason(cur_ts).into() self.state_reason(cur_ts).into()
} }
#[cfg(feature = "geolocation")]
pub fn geolocation_info(&self) -> &GeolocationInfo {
&self.geolocation_info
}
pub fn set_punished(&mut self, punished: Option<PunishmentReason>) { pub fn set_punished(&mut self, punished: Option<PunishmentReason>) {
self.punishment = punished; self.punishment = punished;
if punished.is_some() { if punished.is_some() {
@ -1099,6 +1121,8 @@ impl BucketEntry {
signed_node_info: None, signed_node_info: None,
node_status: None, node_status: None,
}, },
#[cfg(feature = "geolocation")]
geolocation_info: Default::default(),
peer_stats: PeerStats { peer_stats: PeerStats {
time_added: now, time_added: now,
rpc_stats: RPCStats::default(), rpc_stats: RPCStats::default(),

View File

@ -144,45 +144,88 @@ impl RoutingTable {
e: &BucketEntryInner, e: &BucketEntryInner,
relay_tag: &str, relay_tag: &str,
) -> String { ) -> String {
format!( let state_reason = Self::format_state_reason(e.state_reason(cur_ts));
let average_latency = e
.peer_stats()
.latency
.as_ref()
.map(|l| l.to_string())
.unwrap_or_else(|| "???".to_string());
let capabilities = if let Some(ni) = e.node_info(RoutingDomain::PublicInternet) {
ni.capabilities()
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>()
.join(",")
} else {
"???".to_owned()
};
let since_last_question = e
.peer_stats()
.rpc_stats
.last_question_ts
.as_ref()
.map(|l| cur_ts.saturating_sub(*l).to_string())
.unwrap_or_else(|| "???".to_string());
let since_last_seen = e
.peer_stats()
.rpc_stats
.last_seen_ts
.as_ref()
.map(|l| cur_ts.saturating_sub(*l).to_string())
.unwrap_or_else(|| "???".to_string());
#[allow(unused_mut)]
let mut result = format!(
" {} [{}][{}] {} [{}] lastq@{} seen@{}", " {} [{}][{}] {} [{}] lastq@{} seen@{}",
// node id // node id
node, node,
// state reason // state reason
Self::format_state_reason(e.state_reason(cur_ts)), state_reason,
// Relay tag // Relay tag
relay_tag, relay_tag,
// average latency // average latency
e.peer_stats() average_latency,
.latency
.as_ref()
.map(|l| l.to_string())
.unwrap_or_else(|| "???".to_string()),
// capabilities // capabilities
if let Some(ni) = e.node_info(RoutingDomain::PublicInternet) { capabilities,
ni.capabilities()
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>()
.join(",")
} else {
"???".to_owned()
},
// duration since last question // duration since last question
e.peer_stats() since_last_question,
.rpc_stats
.last_question_ts
.as_ref()
.map(|l| cur_ts.saturating_sub(*l).to_string())
.unwrap_or_else(|| "???".to_string()),
// duration since last seen // duration since last seen
e.peer_stats() since_last_seen,
.rpc_stats );
.last_seen_ts
.as_ref() #[cfg(feature = "geolocation")]
.map(|l| cur_ts.saturating_sub(*l).to_string()) {
.unwrap_or_else(|| "???".to_string()), let geolocation_info = e.geolocation_info();
)
if let Some(cc) = geolocation_info.country_code() {
result += &format!(" {cc}");
} else {
result += " ??";
}
if !geolocation_info.relay_country_codes().is_empty() {
result += "/";
}
for (i, cc) in geolocation_info.relay_country_codes().iter().enumerate() {
if i > 0 {
result += ",";
}
if let Some(cc) = cc {
result += &format!("{cc}");
} else {
result += "??";
}
}
}
result
} }
pub fn debug_info_entries( pub fn debug_info_entries(

View File

@ -1,5 +1,3 @@
#![allow(unused)]
use crate::CountryCode; use crate::CountryCode;
use maxminddb::MaxMindDBError; use maxminddb::MaxMindDBError;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
@ -63,6 +61,7 @@ pub fn query_country_code(addr: IpAddr) -> Option<CountryCode> {
mod tests { mod tests {
use crate::CountryCode; use crate::CountryCode;
use core::str::FromStr; use core::str::FromStr;
use maxminddb::WithinItem;
#[test] #[test]
fn test_query_country_code() { fn test_query_country_code() {
@ -91,4 +90,30 @@ mod tests {
assert!(super::query_country_code("10.0.0.1".parse().unwrap()).is_none()); assert!(super::query_country_code("10.0.0.1".parse().unwrap()).is_none());
assert!(super::query_country_code("::1".parse().unwrap()).is_none()); assert!(super::query_country_code("::1".parse().unwrap()).is_none());
} }
#[test]
fn test_iter_over_ipv4_mmdb() {
let db = super::IPV4.as_ref().unwrap();
let count = db
.within("0.0.0.0/0".parse().unwrap())
.unwrap()
.map(|item: Result<WithinItem<super::Country>, _>| item.unwrap())
.count();
assert!(count > 100, "Expecting some IPv4 subnets in IPv4 MMDB");
}
#[test]
fn test_iter_over_ipv6_mmdb() {
let db = super::IPV6.as_ref().unwrap();
let count = db
.within("::/0".parse().unwrap())
.unwrap()
.map(|item: Result<WithinItem<super::Country>, _>| item.unwrap())
.count();
assert!(count > 100, "Expecting some IPv6 subnets in IPv6 MMDB");
}
} }

View File

@ -493,8 +493,15 @@ impl RoutingTable {
) -> EyreResult<()> { ) -> EyreResult<()> {
let mut all_entries: Vec<Arc<BucketEntry>> = Vec::with_capacity(all_entry_bytes.len()); let mut all_entries: Vec<Arc<BucketEntry>> = Vec::with_capacity(all_entry_bytes.len());
for entry_bytes in all_entry_bytes { for entry_bytes in all_entry_bytes {
let entryinner = deserialize_json_bytes(&entry_bytes) #[allow(unused_mut)]
let mut entryinner: BucketEntryInner = deserialize_json_bytes(&entry_bytes)
.wrap_err("failed to deserialize bucket entry")?; .wrap_err("failed to deserialize bucket entry")?;
#[cfg(feature = "geolocation")]
{
entryinner.update_geolocation_info();
}
let entry = Arc::new(BucketEntry::new_with_inner(entryinner)); let entry = Arc::new(BucketEntry::new_with_inner(entryinner));
// Keep strong reference in table // Keep strong reference in table

View File

@ -0,0 +1,31 @@
use super::*;
#[derive(Debug, Default)]
pub struct GeolocationInfo {
country_code: Option<CountryCode>,
relay_country_codes: Vec<Option<CountryCode>>,
}
impl GeolocationInfo {
pub fn new(
country_code: Option<CountryCode>,
relay_country_codes: Vec<Option<CountryCode>>,
) -> Self {
GeolocationInfo {
country_code,
relay_country_codes,
}
}
/// Get node country code. Might be `None` if unable to determine.
pub fn country_code(&self) -> Option<CountryCode> {
self.country_code
}
/// Get country codes of relays used by the node.
/// There will be exactly one entry for each relay.
/// Empty if no relays are used.
pub fn relay_country_codes(&self) -> &[Option<CountryCode>] {
&self.relay_country_codes
}
}

View File

@ -1,6 +1,8 @@
mod contact_method; mod contact_method;
mod dial_info_detail; mod dial_info_detail;
mod direction; mod direction;
#[cfg(feature = "geolocation")]
mod geolocation_info;
mod node_info; mod node_info;
mod node_status; mod node_status;
mod peer_info; mod peer_info;
@ -14,6 +16,8 @@ use super::*;
pub use contact_method::*; pub use contact_method::*;
pub use dial_info_detail::*; pub use dial_info_detail::*;
pub use direction::*; pub use direction::*;
#[cfg(feature = "geolocation")]
pub use geolocation_info::*;
pub use node_info::*; pub use node_info::*;
pub use node_status::*; pub use node_status::*;
pub use peer_info::*; pub use peer_info::*;

View File

@ -115,6 +115,54 @@ impl SignedNodeInfo {
.unwrap_or_default(); .unwrap_or_default();
} }
#[cfg(feature = "geolocation")]
/// Get geolocation info of node and its relays.
pub fn get_geolocation_info(&self, routing_domain: RoutingDomain) -> GeolocationInfo {
if routing_domain != RoutingDomain::PublicInternet {
// Country code is irrelevant for local network
return GeolocationInfo::new(None, vec![]);
}
let get_node_country_code = |node_info: &NodeInfo| {
let country_codes = node_info
.dial_info_detail_list()
.iter()
.map(|did| match &did.dial_info {
DialInfo::UDP(di) => di.socket_address.ip_addr(),
DialInfo::TCP(di) => di.socket_address.ip_addr(),
DialInfo::WS(di) => di.socket_address.ip_addr(),
DialInfo::WSS(di) => di.socket_address.ip_addr(),
})
.map(geolocation::query_country_code)
.collect::<Vec<_>>();
if country_codes.is_empty() {
return None;
}
// Indexing cannot panic, guarded by a check above
let cc0 = country_codes[0];
if !country_codes.iter().all(|cc| cc.is_some() && *cc == cc0) {
// Lookup failed for some address or results are different
return None;
}
cc0
};
match self {
SignedNodeInfo::Direct(sni) => {
GeolocationInfo::new(get_node_country_code(sni.node_info()), vec![])
}
SignedNodeInfo::Relayed(sni) => {
let relay_cc = get_node_country_code(sni.relay_info().node_info());
GeolocationInfo::new(get_node_country_code(sni.node_info()), vec![relay_cc])
}
}
}
/// Compare this SignedNodeInfo to another one /// Compare this SignedNodeInfo to another one
/// Exclude the signature and timestamp and any other fields that are not /// Exclude the signature and timestamp and any other fields that are not
/// semantically valuable /// semantically valuable

View File

@ -321,7 +321,10 @@ pub extern "C" fn initialize_veilid_core(platform_config: FfiStr) {
if platform_config.logging.flame.enabled { if platform_config.logging.flame.enabled {
let filter = veilid_core::VeilidLayerFilter::new_no_default( let filter = veilid_core::VeilidLayerFilter::new_no_default(
veilid_core::VeilidConfigLogLevel::Trace, veilid_core::VeilidConfigLogLevel::Trace,
&veilid_core::FLAME_LOG_FACILITIES_IGNORE_LIST.map(|x| x.to_string()), &veilid_core::FLAME_LOG_FACILITIES_IGNORE_LIST
.iter()
.map(|&x| x.to_string())
.collect::<Vec<_>>(),
); );
let (flame_layer, guard) = let (flame_layer, guard) =
FlameLayer::with_file(&platform_config.logging.flame.path).unwrap(); FlameLayer::with_file(&platform_config.logging.flame.path).unwrap();

View File

@ -95,7 +95,10 @@ impl VeilidLogs {
if settingsr.logging.flame.enabled { if settingsr.logging.flame.enabled {
let filter = veilid_core::VeilidLayerFilter::new_no_default( let filter = veilid_core::VeilidLayerFilter::new_no_default(
veilid_core::VeilidConfigLogLevel::Trace, veilid_core::VeilidConfigLogLevel::Trace,
&veilid_core::FLAME_LOG_FACILITIES_IGNORE_LIST.map(|x| x.to_string()), &veilid_core::FLAME_LOG_FACILITIES_IGNORE_LIST
.iter()
.map(|&x| x.to_string())
.collect::<Vec<_>>(),
); );
let (flame_layer, guard) = FlameLayer::with_file(&settingsr.logging.flame.path)?; let (flame_layer, guard) = FlameLayer::with_file(&settingsr.logging.flame.path)?;
flame_guard = Some(guard); flame_guard = Some(guard);
@ -115,7 +118,10 @@ impl VeilidLogs {
if settingsr.logging.perfetto.enabled { if settingsr.logging.perfetto.enabled {
let filter = veilid_core::VeilidLayerFilter::new_no_default( let filter = veilid_core::VeilidLayerFilter::new_no_default(
veilid_core::VeilidConfigLogLevel::Trace, veilid_core::VeilidConfigLogLevel::Trace,
&veilid_core::FLAME_LOG_FACILITIES_IGNORE_LIST.map(|x| x.to_string()), &veilid_core::FLAME_LOG_FACILITIES_IGNORE_LIST
.iter()
.map(|&x| x.to_string())
.collect::<Vec<_>>(),
); );
let perfetto_layer = PerfettoLayer::new(std::sync::Mutex::new(std::fs::File::create( let perfetto_layer = PerfettoLayer::new(std::sync::Mutex::new(std::fs::File::create(
&settingsr.logging.perfetto.path, &settingsr.logging.perfetto.path,