checkpoint

This commit is contained in:
John Smith 2023-05-05 21:23:17 -04:00
parent 61415597db
commit e2c5691d7e
11 changed files with 484 additions and 98 deletions

View File

@ -127,12 +127,17 @@ where
type Err = VeilidAPIError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let b = s.as_bytes();
if b.len() != (5 + K::encoded_len()) || b[4..5] != b":"[..] {
apibail_parse_error!("invalid typed key", s);
if b.len() == (5 + K::encoded_len()) && b[4..5] != b":"[..] {
let kind: CryptoKind = b[0..4].try_into().expect("should not fail to convert");
let value = K::try_decode_bytes(&b[5..])?;
Ok(Self { kind, value })
} else if b.len() == K::encoded_len() {
let kind = best_crypto_kind();
let value = K::try_decode_bytes(b)?;
Ok(Self { kind, value })
} else {
apibail_generic!("invalid cryptotyped format");
}
let kind: CryptoKind = b[0..4].try_into().expect("should not fail to convert");
let value = K::try_decode_bytes(&b[5..])?;
Ok(Self { kind, value })
}
}
impl<'de, K> Deserialize<'de> for CryptoTyped<K>

View File

@ -1400,7 +1400,7 @@ impl NetworkManager {
let some_relay_nr = if self.check_client_whitelist(sender_id) {
// Full relay allowed, do a full resolve_node
match rpc.resolve_node(recipient_id.value).await {
match rpc.resolve_node(recipient_id, SafetySelection::Unsafe(Sequencing::default())).await {
Ok(v) => v,
Err(e) => {
log_net!(debug "failed to resolve recipient node for relay, dropping outbound relayed packet: {}" ,e);

View File

@ -964,6 +964,16 @@ impl RoutingTable {
.find_closest_nodes(node_count, node_id, filters, transform)
}
pub fn sort_and_clean_closest_noderefs(
&self,
node_id: TypedKey,
closest_nodes: &mut Vec<NodeRef>,
) {
self.inner
.read()
.sort_and_clean_closest_noderefs(node_id, closest_nodes)
}
#[instrument(level = "trace", skip(self), ret)]
pub fn register_find_node_answer(
&self,

View File

@ -1153,7 +1153,6 @@ impl RoutingTableInner {
let vcrypto = self.unlocked_inner.crypto().get(crypto_kind).unwrap();
// Filter to ensure entries support the crypto kind in use
let filter = Box::new(
move |_rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
if let Some(entry) = opt_entry {
@ -1219,4 +1218,71 @@ impl RoutingTableInner {
log_rtab!(">> find_closest_nodes: node count = {}", out.len());
out
}
pub fn sort_and_clean_closest_noderefs(
&self,
node_id: TypedKey,
closest_nodes: &mut Vec<NodeRef>,
) {
// Lock all noderefs
let kind = node_id.kind;
let mut closest_nodes_locked: Vec<NodeRefLocked> = closest_nodes
.iter()
.filter_map(|x| {
if x.node_ids().kinds().contains(&kind) {
Some(x.locked(self))
} else {
None
}
})
.collect();
// Sort closest
let sort = make_closest_noderef_sort(self.unlocked_inner.crypto(), node_id);
closest_nodes_locked.sort_by(sort);
// Unlock noderefs
*closest_nodes = closest_nodes_locked.iter().map(|x| x.unlocked()).collect();
}
}
fn make_closest_noderef_sort(
crypto: Crypto,
node_id: TypedKey,
) -> impl Fn(&NodeRefLocked, &NodeRefLocked) -> core::cmp::Ordering {
let cur_ts = get_aligned_timestamp();
let kind = node_id.kind;
// Get cryptoversion to check distance with
let vcrypto = crypto.get(node_id.kind).unwrap();
move |a: &NodeRefLocked, b: &NodeRefLocked| -> core::cmp::Ordering {
// same nodes are always the same
if a.same_entry(b) {
return core::cmp::Ordering::Equal;
}
// reliable nodes come first, pessimistically treating our own node as unreliable
a.operate(|_rti, a_entry| {
b.operate(|_rti, b_entry| {
let ra = a_entry.check_reliable(cur_ts);
let rb = b_entry.check_reliable(cur_ts);
if ra != rb {
if ra {
return core::cmp::Ordering::Less;
} else {
return core::cmp::Ordering::Greater;
}
}
// get keys
let a_key = a_entry.node_ids().get(node_id.kind).unwrap();
let b_key = b_entry.node_ids().get(node_id.kind).unwrap();
// distance is the next metric, closer nodes first
let da = vcrypto.distance(&a_key.value, &node_id.value);
let db = vcrypto.distance(&b_key.value, &node_id.value);
da.cmp(&db)
})
})
}
}

View File

@ -0,0 +1,222 @@
use super::*;
struct FanoutContext<R>
where
R: Unpin,
{
closest_nodes: Vec<NodeRef>,
called_nodes: TypedKeySet,
result: Option<Result<R, RPCError>>,
}
pub type FanoutCallReturnType = Result<Option<Vec<PeerInfo>>, RPCError>;
pub struct FanoutCall<R, F, C, D>
where
R: Unpin,
F: Future<Output = FanoutCallReturnType>,
C: Fn(NodeRef) -> F,
D: Fn(&[NodeRef]) -> Option<R>,
{
routing_table: RoutingTable,
crypto_kind: CryptoKind,
node_id: TypedKey,
context: Mutex<FanoutContext<R>>,
count: usize,
fanout: usize,
timeout_us: TimestampDuration,
call_routine: C,
check_done: D,
}
impl<R, F, C, D> FanoutCall<R, F, C, D>
where
R: Unpin,
F: Future<Output = FanoutCallReturnType>,
C: Fn(NodeRef) -> F,
D: Fn(&[NodeRef]) -> Option<R>,
{
pub fn new(
routing_table: RoutingTable,
node_id: TypedKey,
count: usize,
fanout: usize,
timeout_us: TimestampDuration,
call_routine: C,
check_done: D,
) -> Arc<Self> {
let context = Mutex::new(FanoutContext {
closest_nodes: Vec::with_capacity(count),
called_nodes: TypedKeySet::new(),
result: None,
});
Arc::new(Self {
routing_table,
node_id,
crypto_kind: node_id.kind,
context,
count,
fanout,
timeout_us,
call_routine,
check_done,
})
}
fn add_new_nodes(self: Arc<Self>, new_nodes: Vec<NodeRef>) {
let mut ctx = self.context.lock();
for nn in new_nodes {
let mut dup = false;
for cn in &ctx.closest_nodes {
if cn.same_entry(&nn) {
dup = true;
}
}
if !dup {
ctx.closest_nodes.push(nn.clone());
}
}
self.routing_table
.sort_and_clean_closest_noderefs(self.node_id, &mut ctx.closest_nodes);
ctx.closest_nodes.truncate(self.count);
}
fn remove_node(self: Arc<Self>, dead_node: NodeRef) {
let mut ctx = self.context.lock();
for n in 0..ctx.closest_nodes.len() {
let cn = &ctx.closest_nodes[n];
if cn.same_entry(&dead_node) {
ctx.closest_nodes.remove(n);
break;
}
}
}
fn get_next_node(self: Arc<Self>) -> Option<NodeRef> {
let mut next_node = None;
let mut ctx = self.context.lock();
for cn in &ctx.closest_nodes {
if let Some(key) = cn.node_ids().get(self.crypto_kind) {
if !ctx.called_nodes.contains(&key) {
// New fanout call candidate found
next_node = Some(cn.clone());
ctx.called_nodes.add(key);
}
}
}
next_node
}
fn evaluate_done(self: Arc<Self>) -> bool {
let mut ctx = self.context.lock();
// If we have a result, then we're done
if ctx.result.is_some() {
return true;
}
// Check for a new done result
ctx.result = (self.check_done)(&ctx.closest_nodes).map(|o| Ok(o));
ctx.result.is_some()
}
async fn fanout_processor(self: Arc<Self>) {
// Check to see if we have a result or are done
while !self.clone().evaluate_done() {
// Get the closest node we haven't processed yet
let next_node = self.clone().get_next_node();
// If we don't have a node to process, stop fanning out
let Some(next_node) = next_node else {
return;
};
// Do the call for this node
match (self.call_routine)(next_node.clone()).await {
Ok(Some(v)) => {
// Call succeeded
// Register the returned nodes and add them to the closest nodes list in sorted order
let new_nodes = self
.routing_table
.register_find_node_answer(self.crypto_kind, v);
self.clone().add_new_nodes(new_nodes);
}
Ok(None) => {
// Call failed, remove the node so it isn't included in the output
self.clone().remove_node(next_node);
}
Err(e) => {
// Error happened, abort everything and return the error
}
};
}
}
fn init_closest_nodes(self: Arc<Self>) {
// Get the 'count' closest nodes to the key out of our routing table
let closest_nodes = {
let routing_table = self.routing_table.clone();
let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
// Exclude our own node
if opt_entry.is_none() {
return false;
}
// Ensure only things that are valid/signed in the PublicInternet domain are returned
rti.filter_has_valid_signed_node_info(
RoutingDomain::PublicInternet,
true,
opt_entry,
)
},
) as RoutingTableEntryFilter;
let filters = VecDeque::from([filter]);
let transform = |_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
NodeRef::new(routing_table.clone(), v.unwrap().clone(), None)
};
routing_table.find_closest_nodes(self.count, self.node_id, filters, transform)
};
let mut ctx = self.context.lock();
ctx.closest_nodes = closest_nodes;
}
pub async fn run(self: Arc<Self>) -> TimeoutOr<Result<Option<R>, RPCError>> {
// Initialize closest nodes list
self.clone().init_closest_nodes();
// Do a quick check to see if we're already done
if self.clone().evaluate_done() {
let mut ctx = self.context.lock();
return TimeoutOr::value(ctx.result.take().transpose());
}
// If not, do the fanout
let mut unord = FuturesUnordered::new();
{
// Spin up 'fanout' tasks to process the fanout
for _ in 0..self.fanout {
let h = self.clone().fanout_processor();
unord.push(h);
}
}
// Wait for them to complete
timeout((self.timeout_us.as_u64() / 1000u64) as u32, async {
while let Some(_) = unord.next().await {}
})
.await
.into_timeout_or()
.map(|_| {
// Finished, return whatever value we came up with
let mut ctx = self.context.lock();
ctx.result.take().transpose()
})
}
}

View File

@ -1,5 +1,6 @@
mod coders;
mod destination;
mod fanout_call;
mod operation_waiter;
mod rpc_app_call;
mod rpc_app_message;
@ -22,6 +23,7 @@ mod rpc_watch_value;
pub use coders::*;
pub use destination::*;
pub use fanout_call::*;
pub use operation_waiter::*;
pub use rpc_error::*;
pub use rpc_status::*;
@ -399,90 +401,64 @@ impl RPCProcessor {
/// Search the DHT for a single node closest to a key and add it to the routing table and return the node reference
/// If no node was found in the timeout, this returns None
pub async fn search_dht_single_key(
async fn search_dht_single_key(
&self,
node_id: TypedKey,
count: usize,
fanout: usize,
timeout_us: TimestampDuration,
) -> Result<Option<NodeRef>, RPCError> {
safety_selection: SafetySelection,
) -> TimeoutOr<Result<Option<NodeRef>, RPCError>> {
let routing_table = self.routing_table();
let filter = Box::new(
move |rti: &RoutingTableInner, opt_entry: Option<Arc<BucketEntry>>| {
// Exclude our own node
if opt_entry.is_none() {
return false;
// Routine to call to generate fanout
let call_routine = |next_node: NodeRef| {
let this = self.clone();
async move {
match this
.clone()
.rpc_call_find_node(
Destination::direct(next_node).with_safety(safety_selection),
node_id,
)
.await
{
Ok(v) => {
let v = network_result_value_or_log!(v => {
// Any other failures, just try the next node
return Ok(None);
});
Ok(Some(v.answer))
}
Err(e) => Err(e),
}
// Ensure only things that are valid/signed in the PublicInternet domain are returned
rti.filter_has_valid_signed_node_info(
RoutingDomain::PublicInternet,
true,
opt_entry,
)
},
) as RoutingTableEntryFilter;
let filters = VecDeque::from([filter]);
let transform = |_rti: &RoutingTableInner, v: Option<Arc<BucketEntry>>| {
NodeRef::new(routing_table.clone(), v.unwrap().clone(), None)
}
};
// Get the 'count' closest nodes to the key out of our routing table
let closest_nodes = routing_table.find_closest_nodes(count, node_id, filters, transform);
// If the node we want to locate is one of the closest nodes, return it immediately
if let Some(out) = closest_nodes
.iter()
.find(|x| x.node_ids().contains(&node_id))
{
return Ok(Some(out.clone()));
}
// Make accessible to fanout tasks
struct FanoutContext {
closest_nodes: Vec<NodeRef>,
called_nodes: TypedKeySet,
}
let closest_nodes = Arc::new(Mutex::new(closest_nodes));
// Otherwise contact the 'fanout' closest nodes to see if there's closer nodes
let mut unord = FuturesUnordered::new();
{
// Spin up 'fanout' tasks to process the fanout
for n in 0..4 {
// Fanout processor
let closest_nodes = closest_nodes.clone();
let h = async move {
// Find the nth node to iterate on
let cn = closest_nodes.lock();
let n = n.clamp(0, cn.len()); xxx dont do this, use called nodes set, shouldnt need stop token canceller, but maybe at the top level? nothing is spawning. so maybe not.
let mut node =
};
unord.push(h);
// Routine to call to check if we're done at each step
let check_done = |closest_nodes: &[NodeRef]| {
// If the node we want to locate is one of the closest nodes, return it immediately
if let Some(out) = closest_nodes
.iter()
.find(|x| x.node_ids().contains(&node_id))
{
return Some(out.clone());
}
}
// Wait for them to complete
timeout((timeout_us.as_u64() / 1000u64) as u32, async {
while let Some(_) = unord.next().await {}
})
.await;
None
};
Ok(None)
}
// Call the fanout
let fanout_call = FanoutCall::new(
routing_table.clone(),
node_id,
count,
fanout,
timeout_us,
call_routine,
check_done,
);
/// Search the DHT for the 'count' closest nodes to a key, adding them all to the routing table if they are not there and returning their node references
pub async fn search_dht_multi_key(
&self,
_node_id: TypedKey,
_count: usize,
_fanout: usize,
_timeout: TimestampDuration,
) -> Result<Vec<NodeRef>, RPCError> {
// xxx return closest nodes after the timeout
Err(RPCError::unimplemented("search_dht_multi_key")).map_err(logthru_rpc!(error))
fanout_call.run().await
}
/// Search the DHT for a specific node corresponding to a key unless we have that node in our routing table already, and return the node reference
@ -490,6 +466,7 @@ impl RPCProcessor {
pub fn resolve_node(
&self,
node_id: TypedKey,
safety_selection: SafetySelection,
) -> SendPinBoxFuture<Result<Option<NodeRef>, RPCError>> {
let this = self.clone();
Box::pin(async move {
@ -515,9 +492,16 @@ impl RPCProcessor {
};
// Search in preferred cryptosystem order
let nr = this
.search_dht_single_key(node_id, count, fanout, timeout)
.await?;
let nr = match this
.search_dht_single_key(node_id, count, fanout, timeout, safety_selection)
.await
{
TimeoutOr::Timeout => None,
TimeoutOr::Value(Ok(v)) => v,
TimeoutOr::Value(Err(e)) => {
return Err(e);
}
};
if let Some(nr) = &nr {
if nr.node_ids().contains(&node_id) {

View File

@ -0,0 +1,107 @@
use super::*;
pub struct DoGetValueResult {
pub value: Option<SignedValueData>,
pub descriptor: Option<SignedValueDescriptor>,
}
impl StorageManager {
pub async fn do_get_value(
&self,
mut inner: AsyncMutexGuardArc<StorageManagerInner>,
key: TypedKey,
subkey: ValueSubkey,
min_seq: ValueSeqNum,
last_descriptor: Option<SignedValueDescriptor>,
safety_selection: SafetySelection,
) -> Result<Option<DoGetValueResult>, VeilidAPIError> {
let Some(rpc_processor) = inner.rpc_processor.clone() else {
apibail_not_initialized!();
};
let routing_table = rpc_processor.routing_table();
// Get the DHT parameters for 'GetValue'
let (count, fanout, timeout) = {
let c = self.unlocked_inner.config.get();
(
c.network.dht.get_value_count as usize,
c.network.dht.get_value_fanout as usize,
TimestampDuration::from(ms_to_us(c.network.dht.get_value_timeout_ms)),
)
};
// Routine to call to generate fanout
let call_routine = |next_node: NodeRef| {
let rpc_processor = rpc_processor.clone();
async move {
match rpc_processor
.clone()
.rpc_call_get_value(
Destination::direct(next_node).with_safety(safety_selection),
key, subkey, last_descriptor
)
.await
{
Ok(v) => {
let v = network_result_value_or_log!(v => {
// Any other failures, just try the next node
return Ok(None);
});
// Keep the value if we got one and it is newer and it passes schema validation
if let Some(value) = v.answer.value {
// See if this is even a candidate
if value.value_data(). xxx apply min_seq and also to OperationGetValueQ
// Validate with scheam
}
// Return peers if we have some
Ok(Some(v.answer.peers))
}
Err(e) => Err(e),
}
}
};
// Routine to call to check if we're done at each step
let check_done = |closest_nodes: &[NodeRef]| {
// If the node we want to locate is one of the closest nodes, return it immediately
if let Some(out) = closest_nodes
.iter()
.find(|x| x.node_ids().contains(&node_id))
{
return Some(out.clone());
}
None
};
// Call the fanout
let fanout_call = FanoutCall::new(
routing_table.clone(),
node_id,
count,
fanout,
timeout_us,
call_routine,
check_done,
);
fanout_call.run().await
// Search in preferred cryptosystem order
let nr = this
.search_dht_single_key(node_id, count, fanout, timeout, safety_selection)
.await?;
if let Some(nr) = &nr {
if nr.node_ids().contains(&node_id) {
// found a close node, but not exact within our configured resolve_node timeout
return Ok(None);
}
}
Ok(nr)
}
}

View File

@ -1,3 +1,4 @@
mod do_get_value;
mod keys;
mod record_store;
mod record_store_limits;
@ -265,18 +266,6 @@ impl StorageManager {
.await
}
async fn do_get_value(
&self,
mut inner: AsyncMutexGuardArc<StorageManagerInner>,
key: TypedKey,
subkey: ValueSubkey,
) -> Result<Option<GetValueAnswer>, VeilidAPIError> {
let Some(rpc_processor) = inner.rpc_processor.clone() else {
apibail_not_initialized!();
};
//
}
async fn open_record_inner(
&self,
mut inner: AsyncMutexGuardArc<StorageManagerInner>,
@ -334,7 +323,7 @@ impl StorageManager {
Ok(descriptor)
} else {
// No record yet, try to get it from the network
self.do_get_value(inner, key, 0).await
self.do_get_value(inner, key, 0, safety_selection).await
// Make DHT Record Descriptor to return
// let descriptor = DHTRecordDescriptor {

View File

@ -4,7 +4,7 @@ use super::*;
#[derive(Clone, Debug)]
pub enum Target {
NodeId(PublicKey), // Node by any of its public keys
NodeId(TypedKey), // Node by its public key
PrivateRoute(RouteId), // Remote private route by its id
}
@ -105,7 +105,10 @@ impl RoutingContext {
match target {
Target::NodeId(node_id) => {
// Resolve node
let mut nr = match rpc_processor.resolve_node(node_id).await {
let mut nr = match rpc_processor
.resolve_node(node_id, self.unlocked_inner.safety_selection)
.await
{
Ok(Some(nr)) => nr,
Ok(None) => apibail_invalid_target!(),
Err(e) => return Err(e.into()),

View File

@ -74,7 +74,7 @@ async fn parse_target(s: String) -> APIResult<veilid_core::Target> {
}
// Is this a node id?
if let Ok(nid) = veilid_core::PublicKey::from_str(&s) {
if let Ok(nid) = veilid_core::TypedKey::from_str(&s) {
return Ok(veilid_core::Target::NodeId(nid));
}

View File

@ -94,7 +94,7 @@ fn parse_target(s: String) -> APIResult<veilid_core::Target> {
}
// Is this a node id?
if let Ok(nid) = veilid_core::PublicKey::from_str(&s) {
if let Ok(nid) = veilid_core::TypedKey::from_str(&s) {
return Ok(veilid_core::Target::NodeId(nid));
}