Fix unwrap crash

This commit is contained in:
Christien Rioux 2025-05-23 18:49:59 -04:00
parent 95d272dec9
commit 9709ce326c
17 changed files with 308 additions and 153 deletions

View file

@ -4,6 +4,10 @@
- Update keyvaluedb to 0.1.3
- Inspect watched records for value changes made while offline when coming back online
- Additional table store unit test
- Eliminate `unwrap()` in `best_node_id()` (fixes crash)
- veilid-tools:
- Add `HashAtom<>` type for hashing references by identity
- veilid-flutter:
- Fix exception handling for WASM
@ -18,7 +22,7 @@
- *BREAKING API CHANGE*:
- watch_dht_values() now returns a bool rather than an expiration timestamp. Expiration renewal is now managed by veilid-core internally. Apps no longer need to renew watches!
- inspect_dht_record() and cancel_dht_watch() now take an Option<ValueSubkeyRangeSet> instead of just a ValueSubkeyRangeSet, to make things easier for automatic binding generation someday and to remove ambiguities about the semantics of the default empty set.
- DHTRecordReport now uses a Vec<Option<ValueSubkey>> for seq lists, rather than using the 'ValueSubkey::MAX' sentinel value (0xFFFFFFFF) to represent a missing subkey
- DHTRecordReport now uses a `Vec<Option<ValueSubkey>>` for seq lists, rather than using the 'ValueSubkey::MAX' sentinel value (0xFFFFFFFF) to represent a missing subkey
- Renamed config structs to better describe their purpose, and remove "Inner" from a struct that's being exposed via the API. ([!402](https://gitlab.com/veilid/veilid/-/merge_requests/402))
- `VeilidConfig` -> `VeilidStartupOptions`
- `VeilidConfigInner` -> `VeilidConfig`

View file

@ -109,9 +109,9 @@ where
#[must_use]
pub fn best(&self) -> Option<CryptoTyped<K>> {
self.items
.first()
.iter()
.find(|k| VALID_CRYPTO_KINDS.contains(&k.kind))
.copied()
.filter(|k| VALID_CRYPTO_KINDS.contains(&k.kind))
}
#[must_use]
pub fn is_empty(&self) -> bool {

View file

@ -922,7 +922,12 @@ impl NetworkManager {
};
let destination_node_ref = destination_node_ref.unwrap_or_else(|| node_ref.unfiltered());
let best_node_id = destination_node_ref.best_node_id();
let Some(best_node_id) = destination_node_ref.best_node_id() else {
bail!(
"can't talk to this node {} because we dont support its cryptosystem",
node_ref
);
};
// Get node's envelope versions and see if we can send to it
// and if so, get the max version we can use

View file

@ -348,8 +348,8 @@ impl BucketEntryInner {
opt_dead_id
}
pub fn best_node_id(&self) -> TypedPublicKey {
self.validated_node_ids.best().unwrap()
pub fn best_node_id(&self) -> Option<TypedPublicKey> {
self.validated_node_ids.best()
}
/// Get crypto kinds
@ -1271,6 +1271,11 @@ impl BucketEntry {
}
}
// Get a hash atom for this entry that can be used as a key for HashSet and HashTable
pub fn hash_atom(self: Arc<Self>) -> HashAtom<'static, BucketEntry> {
HashAtom::from(self)
}
// Note, that this requires -also- holding the RoutingTable read lock, as an
// immutable reference to RoutingTableInner must be passed in to get this
// This ensures that an operation on the routing table can not change entries

View file

@ -87,7 +87,7 @@ impl RoutingTable {
fn format_entry(
cur_ts: Timestamp,
node: TypedPublicKey,
node_id_str: &str,
e: &BucketEntryInner,
relay_tag: &str,
) -> String {
@ -130,7 +130,7 @@ impl RoutingTable {
let mut result = format!(
" {} [{}][{}] {} [{}] lastq@{} seen@{}",
// node id
node,
node_id_str,
// state reason
state_reason,
// Relay tag
@ -245,12 +245,8 @@ impl RoutingTable {
out += " ";
out += &e.1.with(inner, |_rti, e| {
Self::format_entry(
cur_ts,
TypedPublicKey::new(*ck, node),
e,
&relay_tag,
)
let node_id_str = TypedPublicKey::new(*ck, node).to_string();
Self::format_entry(cur_ts, &node_id_str, e, &relay_tag)
});
out += "\n";
}
@ -328,10 +324,10 @@ impl RoutingTable {
relaying_count += 1;
}
let best_node_id = node.best_node_id();
let node_id_str = node.to_string();
out += " ";
out += &node.operate(|_rti, e| Self::format_entry(cur_ts, best_node_id, e, &relay_tag));
out += &node.operate(|_rti, e| Self::format_entry(cur_ts, &node_id_str, e, &relay_tag));
out += "\n";
}

View file

@ -158,7 +158,12 @@ impl Clone for FilteredNodeRef {
impl fmt::Display for FilteredNodeRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.entry.with_inner(|e| e.best_node_id()))
if let Some(best_node_id) = self.entry.with_inner(|e| e.best_node_id()) {
return write!(f, "{}", best_node_id);
} else if let Some(node_id) = self.entry.with_inner(|e| e.node_ids().first().cloned()) {
return write!(f, "{}", node_id);
}
write!(f, "*NONE*")
}
}

View file

@ -176,7 +176,12 @@ impl Clone for NodeRef {
impl fmt::Display for NodeRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.entry.with_inner(|e| e.best_node_id()))
if let Some(best_node_id) = self.entry.with_inner(|e| e.best_node_id()) {
return write!(f, "{}", best_node_id);
} else if let Some(node_id) = self.entry.with_inner(|e| e.node_ids().first().cloned()) {
return write!(f, "{}", node_id);
}
write!(f, "*NONE*")
}
}

View file

@ -42,7 +42,7 @@ pub(crate) trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait
fn node_ids(&self) -> TypedPublicKeyGroup {
self.operate(|_rti, e| e.node_ids())
}
fn best_node_id(&self) -> TypedPublicKey {
fn best_node_id(&self) -> Option<TypedPublicKey> {
self.operate(|_rti, e| e.best_node_id())
}
@ -247,7 +247,9 @@ pub(crate) trait NodeRefCommonTrait: NodeRefAccessorsTrait + NodeRefOperateTrait
fn set_last_flow(&self, flow: Flow, ts: Timestamp) {
self.operate_mut(|rti, e| {
e.set_last_flow(flow, ts);
rti.touch_recent_peer(e.best_node_id(), flow);
if let Some(best_node_id) = e.best_node_id() {
rti.touch_recent_peer(best_node_id, flow);
}
})
}

View file

@ -505,7 +505,13 @@ impl RouteSpecStore {
|_rti: &RoutingTableInner, nodes: &[NodeRef], perm: &[usize]| -> Vec<u8> {
let mut cache: Vec<u8> = Vec::with_capacity(perm.len() * PUBLIC_KEY_LENGTH);
for n in perm {
cache.extend_from_slice(&nodes[*n].locked(rti).best_node_id().value.bytes)
cache.extend_from_slice(
&nodes[*n]
.locked(rti)
.best_node_id()
.map(|bni| bni.value.bytes)
.unwrap_or_default(),
);
}
cache
};
@ -517,10 +523,10 @@ impl RouteSpecStore {
}
// Ensure the route doesn't contain both a node and its relay
let mut seen_nodes: HashSet<TypedPublicKey> = HashSet::new();
let mut seen_nodes: HashSet<HashAtom<BucketEntry>> = HashSet::new();
for n in permutation {
let node = nodes.get(*n).unwrap();
if !seen_nodes.insert(node.locked(rti).best_node_id()) {
if !seen_nodes.insert(node.entry().hash_atom()) {
// Already seen this node, should not be in the route twice
return None;
}
@ -532,8 +538,7 @@ impl RouteSpecStore {
}
};
if let Some(relay) = opt_relay {
let relay_id = relay.locked(rti).best_node_id();
if !seen_nodes.insert(relay_id) {
if !seen_nodes.insert(relay.entry().hash_atom()) {
// Already seen this node, should not be in the route twice
return None;
}

View file

@ -128,7 +128,12 @@ impl RouteSetSpecDetail {
let hops = &self.hop_node_refs;
let mut cache: Vec<u8> = Vec::with_capacity(hops.len() * PUBLIC_KEY_LENGTH);
for hop in hops {
cache.extend_from_slice(&hop.locked(rti).best_node_id().value.bytes);
cache.extend_from_slice(
&hop.locked(rti)
.best_node_id()
.map(|bni| bni.value.bytes)
.unwrap_or_default(),
);
}
cache
}

View file

@ -1564,11 +1564,13 @@ impl RoutingTableInner {
// Print faster node stats
#[cfg(feature = "verbose-tracing")]
for nl in 0..node_index {
let (latency, node_id) = all_filtered_nodes[nl].with(self, |_rti, e| {
let (latency, best_node_id) = all_filtered_nodes[nl].with(self, |_rti, e| {
(e.peer_stats().latency.clone(), e.best_node_id())
});
if let Some(latency) = latency {
veilid_log!(self debug "Better relay {}: {}: {}", nl, node_id, latency);
if let Some(node_id) = best_node_id {
if let Some(latency) = latency {
veilid_log!(self debug "Better relay {}: {}: {}", nl, node_id, latency);
}
}
}

View file

@ -43,6 +43,127 @@ impl RoutingTable {
None
}
fn check_relay_valid(
&self,
editor: &mut RoutingDomainEditorPublicInternet<'_>,
cur_ts: Timestamp,
relay_node: FilteredNodeRef,
relay_node_filter: &impl Fn(&BucketEntryInner) -> bool,
relay_desired: Option<RelayKind>,
) -> bool {
let state_reason = relay_node.state_reason(cur_ts);
// No best node id
let Some(relay_node_id) = relay_node.best_node_id() else {
veilid_log!(self debug "Relay node no longer has best node id, dropping relay {}", relay_node);
editor.set_relay_node(None);
return false;
};
// Relay node is dead or no longer needed
if matches!(
state_reason,
BucketEntryStateReason::Dead(_) | BucketEntryStateReason::Punished(_)
) {
veilid_log!(self debug "Relay node is now {:?}, dropping relay {}", state_reason, relay_node);
editor.set_relay_node(None);
return false;
}
// Relay node no longer can relay
if relay_node.operate(|_rti, e| !&relay_node_filter(e)) {
veilid_log!(self debug
"Relay node can no longer relay, dropping relay {}",
relay_node
);
editor.set_relay_node(None);
return false;
}
// Relay node is no longer wanted
if relay_desired.is_none() {
veilid_log!(self debug
"Relay node no longer desired, dropping relay {}",
relay_node
);
editor.set_relay_node(None);
return false;
}
// See if our relay was optimized last long enough ago to consider getting a new one
// if it is no longer fast enough
let mut inner = self.inner.upgradable_read();
if let Some(last_optimized) = inner.relay_node_last_optimized(RoutingDomain::PublicInternet)
{
let last_optimized_duration = cur_ts - last_optimized;
if last_optimized_duration
> TimestampDuration::new_secs(RELAY_OPTIMIZATION_INTERVAL_SECS)
{
// See what our relay's current percentile is
if let Some(relay_relative_performance) = inner.get_node_relative_performance(
relay_node_id,
cur_ts,
relay_node_filter,
|ls| ls.tm90,
) {
// Get latency numbers
let latency_stats = if let Some(latency) = relay_node.peer_stats().latency {
latency.to_string()
} else {
"[no stats]".to_owned()
};
// Get current relay reliability
let state_reason = relay_node.state_reason(cur_ts);
if relay_relative_performance.percentile < RELAY_OPTIMIZATION_PERCENTILE {
// Drop the current relay so we can get the best new one
veilid_log!(self debug
"Relay tm90 is ({:.2}% < {:.2}%) ({} out of {}) (latency {}, {:?}) optimizing relay {}",
relay_relative_performance.percentile,
RELAY_OPTIMIZATION_PERCENTILE,
relay_relative_performance.node_index,
relay_relative_performance.node_count,
latency_stats,
state_reason,
relay_node
);
editor.set_relay_node(None);
return false;
} else {
// Note that we tried to optimize the relay but it was good
veilid_log!(self debug
"Relay tm90 is ({:.2}% >= {:.2}%) ({} out of {}) (latency {}, {:?}) keeping {}",
relay_relative_performance.percentile,
RELAY_OPTIMIZATION_PERCENTILE,
relay_relative_performance.node_index,
relay_relative_performance.node_count,
latency_stats,
state_reason,
relay_node
);
inner.with_upgraded(|inner| {
inner.set_relay_node_last_optimized(
RoutingDomain::PublicInternet,
cur_ts,
)
});
}
} else {
// Drop the current relay because it could not be measured
veilid_log!(self debug
"Relay relative performance not found {}",
relay_node
);
editor.set_relay_node(None);
return false;
}
}
}
true
}
// Keep relays assigned and accessible
#[instrument(level = "trace", skip_all, err)]
pub async fn relay_management_task_routine(
@ -60,115 +181,13 @@ impl RoutingTable {
// If we already have a relay, see if it is dead, or if we don't need it any more
let has_relay = {
if let Some(relay_node) = self.relay_node(RoutingDomain::PublicInternet) {
let state_reason = relay_node.state_reason(cur_ts);
// Relay node is dead or no longer needed
if matches!(
state_reason,
BucketEntryStateReason::Dead(_) | BucketEntryStateReason::Punished(_)
) {
veilid_log!(self debug "Relay node is now {:?}, dropping relay {}", state_reason, relay_node);
editor.set_relay_node(None);
false
}
// Relay node no longer can relay
else if relay_node.operate(|_rti, e| !&relay_node_filter(e)) {
veilid_log!(self debug
"Relay node can no longer relay, dropping relay {}",
relay_node
);
editor.set_relay_node(None);
false
}
// Relay node is no longer wanted
else if relay_desired.is_none() {
veilid_log!(self debug
"Relay node no longer desired, dropping relay {}",
relay_node
);
editor.set_relay_node(None);
false
} else {
// See if our relay was optimized last long enough ago to consider getting a new one
// if it is no longer fast enough
let mut has_relay = true;
let mut inner = self.inner.upgradable_read();
if let Some(last_optimized) =
inner.relay_node_last_optimized(RoutingDomain::PublicInternet)
{
let last_optimized_duration = cur_ts - last_optimized;
if last_optimized_duration
> TimestampDuration::new_secs(RELAY_OPTIMIZATION_INTERVAL_SECS)
{
// See what our relay's current percentile is
let relay_node_id = relay_node.best_node_id();
if let Some(relay_relative_performance) = inner
.get_node_relative_performance(
relay_node_id,
cur_ts,
&relay_node_filter,
|ls| ls.tm90,
)
{
// Get latency numbers
let latency_stats =
if let Some(latency) = relay_node.peer_stats().latency {
latency.to_string()
} else {
"[no stats]".to_owned()
};
// Get current relay reliability
let state_reason = relay_node.state_reason(cur_ts);
if relay_relative_performance.percentile
< RELAY_OPTIMIZATION_PERCENTILE
{
// Drop the current relay so we can get the best new one
veilid_log!(self debug
"Relay tm90 is ({:.2}% < {:.2}%) ({} out of {}) (latency {}, {:?}) optimizing relay {}",
relay_relative_performance.percentile,
RELAY_OPTIMIZATION_PERCENTILE,
relay_relative_performance.node_index,
relay_relative_performance.node_count,
latency_stats,
state_reason,
relay_node
);
editor.set_relay_node(None);
has_relay = false;
} else {
// Note that we tried to optimize the relay but it was good
veilid_log!(self debug
"Relay tm90 is ({:.2}% >= {:.2}%) ({} out of {}) (latency {}, {:?}) keeping {}",
relay_relative_performance.percentile,
RELAY_OPTIMIZATION_PERCENTILE,
relay_relative_performance.node_index,
relay_relative_performance.node_count,
latency_stats,
state_reason,
relay_node
);
inner.with_upgraded(|inner| {
inner.set_relay_node_last_optimized(
RoutingDomain::PublicInternet,
cur_ts,
)
});
}
} else {
// Drop the current relay because it could not be measured
veilid_log!(self debug
"Relay relative performance not found {}",
relay_node
);
editor.set_relay_node(None);
has_relay = false;
}
}
}
has_relay
}
self.check_relay_valid(
&mut editor,
cur_ts,
relay_node,
&relay_node_filter,
relay_desired,
)
} else {
false
}
@ -246,6 +265,11 @@ impl RoutingTable {
return false;
};
// Exclude any nodes that don't have a 'best node id' for our enabled cryptosystems
if e.best_node_id().is_none() {
return false;
}
// Exclude any nodes that have 'failed to send' state indicating a
// connection drop or inability to reach the node
if e.peer_stats().rpc_stats.failed_to_send > 0 {

View file

@ -130,12 +130,20 @@ impl Destination {
Destination::Direct {
node,
safety_selection: _,
} => Ok(Target::NodeId(node.best_node_id())),
} => {
Ok(Target::NodeId(node.best_node_id().ok_or_else(|| {
RPCError::protocol("no supported node id")
})?))
}
Destination::Relay {
relay: _,
node,
safety_selection: _,
} => Ok(Target::NodeId(node.best_node_id())),
} => {
Ok(Target::NodeId(node.best_node_id().ok_or_else(|| {
RPCError::protocol("no supported node id")
})?))
}
Destination::PrivateRoute {
private_route,
safety_selection: _,
@ -336,7 +344,10 @@ impl RPCProcessor {
}
SafetySelection::Safe(safety_spec) => {
// Sent directly but with a safety route, respond to private route
let crypto_kind = target.best_node_id().kind;
let crypto_kind = target
.best_node_id()
.ok_or_else(|| RPCError::protocol("no supported node id"))?
.kind;
let pr_key = network_result_try!(rss
.get_private_route_for_safety_spec(
crypto_kind,
@ -364,7 +375,10 @@ impl RPCProcessor {
}
SafetySelection::Safe(safety_spec) => {
// Sent via a relay but with a safety route, respond to private route
let crypto_kind = target.best_node_id().kind;
let crypto_kind = target
.best_node_id()
.ok_or_else(|| RPCError::protocol("no supported node id"))?
.kind;
let mut avoid_nodes = relay.node_ids();
avoid_nodes.add_all(&target.node_ids());

View file

@ -772,7 +772,14 @@ impl RPCProcessor {
Some(pi) => pi,
};
let private_route = PrivateRoute::new_stub(
destination_node_ref.best_node_id(),
match destination_node_ref.best_node_id() {
Some(nid) => nid,
None => {
return Ok(NetworkResult::no_connection_other(
"No best node id for stub private route",
));
}
},
RouteNode::PeerInfo(peer_info),
);

View file

@ -454,24 +454,17 @@ impl StorageManager {
let inner = self.inner.lock().await;
let mut out = vec![];
let mut node_set = HashSet::new();
let mut node_set: HashSet<HashAtom<BucketEntry>> = HashSet::new();
for v in inner.outbound_watch_manager.outbound_watches.values() {
if let Some(current) = v.state() {
let node_refs =
current.watch_node_refs(&inner.outbound_watch_manager.per_node_states);
for node_ref in &node_refs {
let mut found = false;
for nid in node_ref.node_ids().iter() {
if node_set.contains(nid) {
found = true;
break;
}
}
if found {
if node_set.contains(&node_ref.entry().hash_atom()) {
continue;
}
node_set.insert(node_ref.best_node_id());
node_set.insert(node_ref.entry().hash_atom());
out.push(
Destination::direct(
node_ref.routing_domain_filtered(RoutingDomain::PublicInternet),

View file

@ -0,0 +1,80 @@
use std::marker::PhantomData;
use super::*;
/// Pointer-identity hashing for unique objects
/// Considers the `===` identity equals rather than the `==` Eq/PartialEq equals for objects
/// that are guaranteed to be fixed in memory
pub struct HashAtom<'a, T> {
val: usize,
is_arc: bool,
_phantom: &'a PhantomData<T>,
}
impl<T> Drop for HashAtom<'_, T> {
fn drop(&mut self) {
if self.is_arc {
unsafe {
let ptr = self.val as *const T;
Arc::from_raw(ptr);
};
}
}
}
impl<T> core::fmt::Debug for HashAtom<'_, T>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HashAtom").field("val", &self.val).finish()
}
}
impl<'a, T> From<Pin<&'a T>> for HashAtom<'a, T> {
fn from(value: Pin<&'a T>) -> Self {
Self {
val: (value.get_ref() as *const T) as usize,
is_arc: false,
_phantom: &PhantomData {},
}
}
}
impl<'a, T> From<Pin<&'a mut T>> for HashAtom<'a, T> {
fn from(value: Pin<&'a mut T>) -> Self {
Self {
val: (value.as_ref().get_ref() as *const T) as usize,
is_arc: false,
_phantom: &PhantomData {},
}
}
}
impl<T> From<Arc<T>> for HashAtom<'_, T> {
fn from(value: Arc<T>) -> Self {
let val = {
let ptr = Arc::into_raw(value);
ptr as usize
};
Self {
val,
is_arc: true,
_phantom: &PhantomData {},
}
}
}
impl<T> PartialEq for HashAtom<'_, T> {
fn eq(&self, other: &Self) -> bool {
self.val == other.val
}
}
impl<T> Eq for HashAtom<'_, T> {}
impl<T> core::hash::Hash for HashAtom<'_, T> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.val.hash(state);
}
}

View file

@ -32,6 +32,7 @@ pub mod eventual_base;
pub mod eventual_value;
pub mod eventual_value_clone;
pub mod future_queue;
pub mod hash_atom;
pub mod interval;
pub mod ip_addr_port;
pub mod ip_extra;
@ -190,6 +191,8 @@ pub use eventual_value_clone::*;
#[doc(inline)]
pub use future_queue::*;
#[doc(inline)]
pub use hash_atom::*;
#[doc(inline)]
pub use interval::*;
#[doc(inline)]
pub use ip_addr_port::*;