checkpoint

This commit is contained in:
John Smith 2023-05-04 21:26:14 -04:00
parent 514bc34e11
commit 61415597db
3 changed files with 74 additions and 21 deletions

View File

@ -141,9 +141,9 @@ where
}
false
}
pub fn contains_key(&self, key: &K) -> bool {
pub fn contains_value(&self, value: &K) -> bool {
for tk in &self.items {
if tk.value == *key {
if tk.value == *value {
return true;
}
}

View File

@ -1209,9 +1209,6 @@ impl RoutingTableInner {
};
// distance is the next metric, closer nodes first
// since multiple cryptosystems are in use, the distance for a key is the shortest
// distance to that key over all supported cryptosystems
let da = vcrypto.distance(&a_key.value, &node_id.value);
let db = vcrypto.distance(&b_key.value, &node_id.value);
da.cmp(&db)

View File

@ -401,28 +401,84 @@ impl RPCProcessor {
/// If no node was found in the timeout, this returns None
pub async fn search_dht_single_key(
&self,
_node_id: PublicKey,
_count: u32,
_fanout: u32,
_timeout: TimestampDuration,
node_id: TypedKey,
count: usize,
fanout: usize,
timeout_us: TimestampDuration,
) -> Result<Option<NodeRef>, RPCError> {
let routing_table = self.routing_table();
let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
// Exclude our own node
if opt_entry.is_none() {
return false;
}
// Ensure only things that are valid/signed in the PublicInternet domain are returned
rti.filter_has_valid_signed_node_info(
RoutingDomain::PublicInternet,
true,
opt_entry,
)
},
) as RoutingTableEntryFilter;
let filters = VecDeque::from([filter]);
let transform = |_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
NodeRef::new(routing_table.clone(), v.unwrap().clone(), None)
};
// Get the 'count' closest nodes to the key out of our routing table
let mut closest_nodes = Vec::new();
routing_table.find_closest_nodes(count, node_id, filters, transform)
let closest_nodes = routing_table.find_closest_nodes(count, node_id, filters, transform);
// If the node we want to locate is one of the closest nodes, return it immediately
if let Some(out) = closest_nodes
.iter()
.find(|x| x.node_ids().contains(&node_id))
{
return Ok(Some(out.clone()));
}
// Make accessible to fanout tasks
struct FanoutContext {
closest_nodes: Vec<NodeRef>,
called_nodes: TypedKeySet,
}
let closest_nodes = Arc::new(Mutex::new(closest_nodes));
// Otherwise contact the 'fanout' closest nodes to see if there's closer nodes
let mut unord = FuturesUnordered::new();
{
// Spin up 'fanout' tasks to process the fanout
for n in 0..4 {
// Fanout processor
let closest_nodes = closest_nodes.clone();
let h = async move {
// Find the nth node to iterate on
let cn = closest_nodes.lock();
let n = n.clamp(0, cn.len()); xxx dont do this, use called nodes set, shouldnt need stop token canceller, but maybe at the top level? nothing is spawning. so maybe not.
let mut node =
};
unord.push(h);
}
}
// Wait for them to complete
timeout((timeout_us.as_u64() / 1000u64) as u32, async {
while let Some(_) = unord.next().await {}
})
.await;
Ok(None)
}
/// Search the DHT for the 'count' closest nodes to a key, adding them all to the routing table if they are not there and returning their node references
pub async fn search_dht_multi_key(
&self,
_node_id: PublicKey,
_count: u32,
_fanout: u32,
_node_id: TypedKey,
_count: usize,
_fanout: usize,
_timeout: TimestampDuration,
) -> Result<Vec<NodeRef>, RPCError> {
// xxx return closest nodes after the timeout
@ -433,14 +489,14 @@ impl RPCProcessor {
/// Note: This routine can possible be recursive, hence the SendPinBoxFuture async form
pub fn resolve_node(
&self,
node_id: PublicKey, xxx switch to typedkey for the api. everything else is going to need it.
node_id: TypedKey,
) -> SendPinBoxFuture<Result<Option<NodeRef>, RPCError>> {
let this = self.clone();
Box::pin(async move {
let routing_table = this.routing_table();
// First see if we have the node in our routing table already
if let Some(nr) = routing_table.lookup_any_node_ref(node_id) {
if let Some(nr) = routing_table.lookup_node_ref(node_id) {
// ensure we have some dial info for the entry already,
// if not, we should do the find_node anyway
if nr.has_any_dial_info() {
@ -452,8 +508,8 @@ impl RPCProcessor {
let (count, fanout, timeout) = {
let c = this.config.get();
(
c.network.dht.resolve_node_count,
c.network.dht.resolve_node_fanout,
c.network.dht.resolve_node_count as usize,
c.network.dht.resolve_node_fanout as usize,
TimestampDuration::from(ms_to_us(c.network.dht.resolve_node_timeout_ms)),
)
};
@ -464,7 +520,7 @@ impl RPCProcessor {
.await?;
if let Some(nr) = &nr {
if nr.node_ids().contains_key(&node_id) {
if nr.node_ids().contains(&node_id) {
// found a close node, but not exact within our configured resolve_node timeout
return Ok(None);
}