more refactor

This commit is contained in:
John Smith 2023-02-25 22:02:13 -05:00
parent e328bdc270
commit 1fba8831e8
10 changed files with 203 additions and 127 deletions

View File

@ -324,7 +324,7 @@ struct ValueData @0xb4b7416f169f2a3d {
}
struct OperationGetValueQ @0xf88a5b6da5eda5d0 {
publicKey @0 :TypedKey; # the location of the value
key @0 :TypedKey; # the location of the value
subkey @1 :Subkey; # the index of the subkey (0 for the default subkey)
}
@ -336,7 +336,7 @@ struct OperationGetValueA @0xd896bb46f2e0249f {
}
struct OperationSetValueQ @0xbac06191ff8bdbc5 {
publicKey @0 :TypedKey; # the location of the value
key @0 :TypedKey; # the location of the value
subkey @1 :Subkey; # the index of the subkey (0 for the default subkey)
value @2 :ValueData; # value or subvalue contents (older or equal seq number gets dropped)
}
@ -349,10 +349,10 @@ struct OperationSetValueA @0x9378d0732dc95be2 {
}
struct OperationWatchValueQ @0xf9a5a6c547b9b228 {
publicKey @0 :TypedKey; # key for value to watch
key @0 :TypedKey; # key for value to watch
subkeys @1 :List(SubkeyRange); # subkey range to watch, if empty, watch everything
expiration @2 :UInt64; # requested timestamp when this watch will expire in usec since epoch (can be return less, 0 for max)
count @3 :UInt32; # requested number of changes to watch for (0 = continuous, 1 = single shot, 2+ = counter)
count @3 :UInt32; # requested number of changes to watch for (0 = cancel, 1 = single shot, 2+ = counter, UINT32_MAX = continuous)
}
struct OperationWatchValueA @0xa726cab7064ba893 {
@ -361,7 +361,7 @@ struct OperationWatchValueA @0xa726cab7064ba893 {
}
struct OperationValueChanged @0xd1c59ebdd8cc1bf6 {
publicKey @0 :TypedKey; # key for value that changed
key @0 :TypedKey; # key for value that changed
subkeys @1 :List(SubkeyRange); # subkey range that changed (up to 512 ranges at a time)
count @2 :UInt32; # remaining changes left (0 means watch has expired)
value @3 :ValueData; # first value that changed (the rest can be gotten with getvalue)

View File

@ -159,15 +159,9 @@ impl RoutingTable {
out
}
pub(crate) fn debug_info_entry(&self, node_id: TypedKey) -> String {
pub(crate) fn debug_info_entry(&self, node_ref: NodeRef) -> String {
let mut out = String::new();
out += &format!("Entry {:?}:\n", node_id);
if let Some(nr) = self.lookup_node_ref(node_id) {
out += &nr.operate(|_rt, e| format!("{:#?}\n", e));
} else {
out += "Entry not found\n";
}
out += &node_ref.operate(|_rt, e| format!("{:#?}\n", e));
out
}

View File

@ -8,7 +8,6 @@ mod route_spec_store_cache;
mod route_spec_store_content;
mod route_stats;
pub use permutation::*;
pub use remote_private_route_info::*;
pub use route_set_spec_detail::*;
pub use route_spec_store::*;

View File

@ -1134,32 +1134,17 @@ impl RouteSpecStore {
)?)
}
/// Assemble private route for publication
/// Returns a PrivateRoute object for an allocated private route
#[instrument(level = "trace", skip(self), err)]
pub fn assemble_private_route(
&self,
key: &PublicKey,
optimized: Option<bool>,
) -> EyreResult<PrivateRoute> {
let inner = &*self.inner.lock();
fn assemble_private_route_inner(&self, inner: &RouteSpecStoreInner, key: &PublicKey, rsd: &RouteSpecDetail, optimized: bool) -> EyreResult<PrivateRoute>
{
let routing_table = self.unlocked_inner.routing_table.clone();
let rti = &*routing_table.inner.read();
// Get the route spec detail for the requested private route
let rsd = Self::detail(inner, key).ok_or_else(|| eyre!("route does not exist"))?;
// Ensure we get the crypto for it
let crypto = routing_table.network_manager().crypto();
let Some(vcrypto) = crypto.get(rsd.crypto_kind) else {
bail!("crypto not supported for route");
};
// See if we can optimize this compilation yet
// We don't want to include full nodeinfo if we don't have to
let optimized = optimized
.unwrap_or(rsd.stats.last_tested_ts.is_some() || rsd.stats.last_received_ts.is_some());
// Make innermost route hop to our own node
let mut route_hop = RouteHop {
node: if optimized {
@ -1235,6 +1220,58 @@ impl RouteSpecStore {
Ok(private_route)
}
/// Assemble a single private route for publication
/// Returns a PrivateRoute object for an allocated private route key
#[instrument(level = "trace", skip(self), err)]
pub fn assemble_private_route(
&self,
key: &PublicKey,
optimized: Option<bool>,
) -> EyreResult<PrivateRoute> {
let inner = &*self.inner.lock();
let Some(rsid) = inner.content.get_id_by_key(key) else {
bail!("route key does not exist");
};
let Some(rssd) = inner.content.get_detail(&rsid) else {
bail!("route id does not exist");
};
// See if we can optimize this compilation yet
// We don't want to include full nodeinfo if we don't have to
let optimized = optimized
.unwrap_or(rssd.get_stats().last_tested_ts.is_some() || rssd.get_stats().last_received_ts.is_some());
let rsd = rssd.get_route_by_key(key).expect("route key index is broken");
self.assemble_private_route_inner(inner, key, rsd, optimized)
}
/// Assemble private route set for publication
/// Returns a vec of PrivateRoute objects for an allocated private route
#[instrument(level = "trace", skip(self), err)]
pub fn assemble_private_routes(
&self,
id: &RouteId,
optimized: Option<bool>,
) -> EyreResult<Vec<PrivateRoute>> {
let inner = &*self.inner.lock();
let Some(rssd) = inner.content.get_detail(id) else {
bail!("route id does not exist");
};
// See if we can optimize this compilation yet
// We don't want to include full nodeinfo if we don't have to
let optimized = optimized
.unwrap_or(rssd.get_stats().last_tested_ts.is_some() || rssd.get_stats().last_received_ts.is_some());
let mut out = Vec::new();
for (key, rsd) in rssd.iter_route_set() {
out.push(self.assemble_private_route_inner(inner, key, rsd, optimized)?);
}
Ok(out)
}
/// Import a remote private route for compilation
/// It is safe to import the same route more than once and it will return the same route id
/// Returns a route set id
@ -1407,7 +1444,7 @@ impl RouteSpecStore {
/// Mark route as published
/// When first deserialized, routes must be re-published in order to ensure they remain
/// in the RouteSpecStore.
pub fn mark_route_published(&self, id: &String, published: bool) -> EyreResult<()> {
pub fn mark_route_published(&self, id: &RouteId, published: bool) -> EyreResult<()> {
let inner = &mut *self.inner.lock();
let Some(rssd) = inner.content.get_detail_mut(id) else {
bail!("route does not exist");

View File

@ -162,6 +162,9 @@ impl RPCProcessor {
}
SafetySelection::Safe(safety_spec) => {
// Sent directly but with a safety route, respond to private route
xxx continue here. ensure crypto kind makes sense with get_private_route_for_safety_spec and then make it work.
let ck = target.best_node_id().kind;
let Some(pr_key) = rss
.get_private_route_for_safety_spec(ck, safety_spec, &target.node_ids())
@ -222,7 +225,7 @@ impl RPCProcessor {
// Determine if we can use optimized nodeinfo
let route_node = match rss
.has_remote_private_route_seen_our_node_info(&private_route.public_key.key)
.has_remote_private_route_seen_our_node_info(&private_route_id)
{
true => {
if !routing_table.has_valid_own_node_info(RoutingDomain::PublicInternet) {
@ -247,10 +250,10 @@ impl RPCProcessor {
// Sent to a private route via a safety route, respond to private route
// Check for loopback test
let pr_key = if safety_spec.preferred_route
== Some(private_route.public_key.key)
let pr_key = if safety_spec.preferred_route == Some(private_route_id)
{
// Private route is also safety route during loopback test
xxx build loopback routine? get_private_route_for_loopback_test?
private_route.public_key.key
} else {
// Get the private route to respond to that matches the safety route spec we sent the request with

View File

@ -165,10 +165,10 @@ impl VeilidAPI {
// Private route allocation
/// Allocate a new private route set with default cryptography and network options
/// Returns a set of keys and a publishable 'blob' with the route encrypted with each crypto kind
/// Returns a route id and a publishable 'blob' with the route encrypted with each crypto kind
/// Those nodes importing the blob will have their choice of which crypto kind to use
#[instrument(level = "debug", skip(self))]
pub async fn new_private_route(&self) -> Result<(TypedKeySet, Vec<u8>), VeilidAPIError> {
pub async fn new_private_route(&self) -> Result<(RouteId, Vec<u8>), VeilidAPIError> {
self.new_custom_private_route(
&VALID_CRYPTO_KINDS,
Stability::default(),
@ -184,7 +184,7 @@ impl VeilidAPI {
crypto_kinds: &[CryptoKind],
stability: Stability,
sequencing: Sequencing,
) -> Result<(TypedKeySet, Vec<u8>), VeilidAPIError> {
) -> Result<(RouteId, Vec<u8>), VeilidAPIError> {
let default_route_hop_count: usize = {
let config = self.config()?;
let c = config.get();
@ -202,56 +202,48 @@ impl VeilidAPI {
&[],
)
.map_err(VeilidAPIError::internal)?;
let Some(pr_keys) = r else {
let Some(route_id) = r else {
apibail_generic!("unable to allocate route");
};
if !rss
.test_route(&pr_keys)
.test_route(route_id.clone())
.await
.map_err(VeilidAPIError::no_connection)?
{
rss.release_route(&pr_pubkey);
rss.release_route(route_id);
apibail_generic!("allocated route failed to test");
}
let private_route = rss
.assemble_private_route(&pr_pubkey, Some(true))
let private_routes = rss
.assemble_private_routes(&route_id, Some(true))
.map_err(VeilidAPIError::generic)?;
let blob = match RouteSpecStore::private_route_to_blob(&private_route) {
let blob = match RouteSpecStore::private_routes_to_blob(&private_routes) {
Ok(v) => v,
Err(e) => {
rss.release_route(&pr_pubkey);
rss.release_route(route_id);
apibail_internal!(e);
}
};
rss.mark_route_published(&pr_pubkey, true)
rss.mark_route_published(&route_id, true)
.map_err(VeilidAPIError::internal)?;
Ok((pr_pubkey, blob))
Ok((route_id, blob))
}
#[instrument(level = "debug", skip(self))]
pub fn import_remote_private_route(
&self,
blob: Vec<u8>,
) -> Result<TypedKeySet, VeilidAPIError> {
pub fn import_remote_private_route(&self, blob: Vec<u8>) -> Result<RouteId, VeilidAPIError> {
let rss = self.routing_table()?.route_spec_store();
rss.import_remote_private_route(blob)
.map_err(|e| VeilidAPIError::invalid_argument(e, "blob", "private route blob"))
}
#[instrument(level = "debug", skip(self))]
pub fn release_private_route(&self, key: &PublicKey) -> Result<(), VeilidAPIError> {
pub fn release_private_route(&self, route_id: RouteId) -> Result<(), VeilidAPIError> {
let rss = self.routing_table()?.route_spec_store();
if rss.release_route(key) {
Ok(())
} else {
Err(VeilidAPIError::invalid_argument(
"release_private_route",
"key",
key,
))
if !rss.release_route(route_id) {
apibail_invalid_argument!("release_private_route", "key", route_id);
}
Ok(())
}
////////////////////////////////////////////////////////////////

View File

@ -7,7 +7,7 @@ use routing_table::*;
#[derive(Default, Debug)]
struct DebugCache {
imported_routes: Vec<TypedKeySet>,
imported_routes: Vec<RouteId>,
}
static DEBUG_CACHE: Mutex<DebugCache> = Mutex::new(DebugCache {
@ -30,12 +30,12 @@ fn get_string(text: &str) -> Option<String> {
Some(text.to_owned())
}
fn get_route_id(rss: RouteSpecStore) -> impl Fn(&str) -> Option<PublicKey> {
fn get_route_id(rss: RouteSpecStore) -> impl Fn(&str) -> Option<RouteId> {
return move |text: &str| {
if text.is_empty() {
return None;
}
match PublicKey::from_str(text).ok() {
match RouteId::from_str(text).ok() {
Some(key) => {
let routes = rss.list_allocated_routes(|k, _| Some(*k));
if routes.contains(&key) {
@ -128,38 +128,20 @@ fn get_destination(routing_table: RoutingTable) -> impl FnOnce(&str) -> Option<D
}
if &text[0..1] == "#" {
// Private route
let mut text = &text[1..];
let opt_crypto_kind = if text.len() > 5 {
let fcc = &text[0..4];
if &text[4..5] != ":" {
return None;
}
let ck = match FourCC::from_str(fcc) {
Ok(v) => v,
Err(_) => {
return None;
}
};
text = &text[5..];
Some(ck)
} else {
None
};
let text = &text[1..];
let n = get_number(text)?;
let mut dc = DEBUG_CACHE.lock();
let pr_pubkey = match opt_crypto_kind {
Some(ck) => dc.imported_routes.get(n)?.get(ck)?,
None => dc.imported_routes.get(n)?.best()?,
};
let private_route_id = dc.imported_routes.get(n)?.clone();
let rss = routing_table.route_spec_store();
let Some(private_route) = rss.get_remote_private_route(&pr_pubkey.key) else {
if !rss.is_valid_remote_private_route(&private_route_id) {
// Remove imported route
dc.imported_routes.remove(n);
info!("removed dead imported route {}", n);
return None;
};
Some(Destination::private_route(
private_route,
private_route_id,
ss.unwrap_or(SafetySelection::Unsafe(Sequencing::default())),
))
} else {
@ -381,12 +363,19 @@ impl VeilidAPI {
async fn debug_entry(&self, args: String) -> Result<String, VeilidAPIError> {
let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect();
let routing_table = self.network_manager()?.routing_table();
let node_id = get_debug_argument_at(&args, 0, "debug_entry", "node_id", get_typed_key)?;
let node_ref = get_debug_argument_at(
&args,
0,
"debug_entry",
"node_id",
get_node_ref(routing_table),
)?;
// Dump routing table entry
let routing_table = self.network_manager()?.routing_table();
Ok(routing_table.debug_info_entry(node_id))
Ok(routing_table.debug_info_entry(node_ref))
}
async fn debug_nodeinfo(&self, _args: String) -> Result<String, VeilidAPIError> {
@ -655,10 +644,16 @@ impl VeilidAPI {
let routing_table = netman.routing_table();
let rss = routing_table.route_spec_store();
let route_id = get_debug_argument_at(&args, 1, "debug_route", "route_id", get_typed_key)?;
let route_id = get_debug_argument_at(
&args,
1,
"debug_route",
"route_id",
get_route_id(rss.clone()),
)?;
// Release route
let out = match rss.release_route(&route_id) {
let out = match rss.release_route(route_id) {
true => "Released".to_owned(),
false => "Route does not exist".to_owned(),
};
@ -671,7 +666,13 @@ impl VeilidAPI {
let routing_table = netman.routing_table();
let rss = routing_table.route_spec_store();
let route_id = get_debug_argument_at(&args, 1, "debug_route", "route_id", get_typed_key)?;
let route_id = get_debug_argument_at(
&args,
1,
"debug_route",
"route_id",
get_route_id(rss.clone()),
)?;
let full = {
if args.len() > 2 {
let full_val = get_debug_argument_at(&args, 2, "debug_route", "full", get_string)?
@ -687,13 +688,13 @@ impl VeilidAPI {
};
// Publish route
let out = match rss.assemble_private_route(&route_id, Some(!full)) {
Ok(private_route) => {
let out = match rss.assemble_private_routes(&route_id, Some(!full)) {
Ok(private_routes) => {
if let Err(e) = rss.mark_route_published(&route_id, true) {
return Ok(format!("Couldn't mark route published: {}", e));
}
// Convert to blob
let blob_data = RouteSpecStore::private_route_to_blob(&private_route)
let blob_data = RouteSpecStore::private_routes_to_blob(&private_routes)
.map_err(VeilidAPIError::internal)?;
let out = BASE64URL_NOPAD.encode(&blob_data);
info!(
@ -717,7 +718,13 @@ impl VeilidAPI {
let routing_table = netman.routing_table();
let rss = routing_table.route_spec_store();
let route_id = get_debug_argument_at(&args, 1, "debug_route", "route_id", get_public_key)?;
let route_id = get_debug_argument_at(
&args,
1,
"debug_route",
"route_id",
get_route_id(rss.clone()),
)?;
// Unpublish route
let out = if let Err(e) = rss.mark_route_published(&route_id, false) {
@ -733,7 +740,13 @@ impl VeilidAPI {
let routing_table = netman.routing_table();
let rss = routing_table.route_spec_store();
let route_id = get_debug_argument_at(&args, 1, "debug_route", "route_id", get_public_key)?;
let route_id = get_debug_argument_at(
&args,
1,
"debug_route",
"route_id",
get_route_id(rss.clone()),
)?;
match rss.debug_route(&route_id) {
Some(s) => Ok(s),
@ -771,14 +784,14 @@ impl VeilidAPI {
.decode(blob.as_bytes())
.map_err(VeilidAPIError::generic)?;
let rss = self.routing_table()?.route_spec_store();
let pr_pubkey = rss
let route_id = rss
.import_remote_private_route(blob_dec)
.map_err(VeilidAPIError::generic)?;
let mut dc = DEBUG_CACHE.lock();
let n = dc.imported_routes.len();
let out = format!("Private route #{} imported: {}", n, pr_pubkey);
dc.imported_routes.push(pr_pubkey);
let out = format!("Private route #{} imported: {}", n, route_id);
dc.imported_routes.push(route_id);
return Ok(out);
}
@ -789,10 +802,16 @@ impl VeilidAPI {
let routing_table = netman.routing_table();
let rss = routing_table.route_spec_store();
let route_id = get_debug_argument_at(&args, 1, "debug_route", "route_id", get_typed_key)?;
let route_id = get_debug_argument_at(
&args,
1,
"debug_route",
"route_id",
get_route_id(rss.clone()),
)?;
let success = rss
.test_route(&route_id)
.test_route(route_id)
.await
.map_err(VeilidAPIError::internal)?;

View File

@ -66,9 +66,17 @@ macro_rules! apibail_no_connection {
#[allow(unused_macros)]
#[macro_export]
macro_rules! apibail_key_not_found {
macro_rules! apibail_invalid_target {
() => {
return Err(VeilidAPIError::invalid_target())
};
}
#[allow(unused_macros)]
#[macro_export]
macro_rules! apibail_route_not_found {
($x:expr) => {
return Err(VeilidAPIError::key_not_found($x))
return Err(VeilidAPIError::route_not_found($x))
};
}
@ -107,8 +115,8 @@ pub enum VeilidAPIError {
TryAgain,
#[error("Shutdown")]
Shutdown,
#[error("Key not found: {key}")]
KeyNotFound { key: PublicKey },
#[error("Invalid target")]
InvalidTarget,
#[error("No connection: {message}")]
NoConnection { message: String },
#[error("No peer info: {node_id}")]
@ -147,8 +155,8 @@ impl VeilidAPIError {
pub fn shutdown() -> Self {
Self::Shutdown
}
pub fn key_not_found(key: PublicKey) -> Self {
Self::KeyNotFound { key }
pub fn invalid_target() -> Self {
Self::InvalidTarget
}
pub fn no_connection<T: ToString>(msg: T) -> Self {
Self::NoConnection {

View File

@ -107,7 +107,7 @@ impl RoutingContext {
// Resolve node
let mut nr = match rpc_processor.resolve_node(node_id).await {
Ok(Some(nr)) => nr,
Ok(None) => apibail_key_not_found!(node_id),
Ok(None) => apibail_invalid_target!(),
Err(e) => return Err(e.into()),
};
// Apply sequencing to match safety selection
@ -121,9 +121,8 @@ impl RoutingContext {
Target::PrivateRoute(rsid) => {
// Get remote private route
let rss = self.api.routing_table()?.route_spec_store();
if !rss.is_valid_remote_private_route(&rsid)
else {
apibail_key_not_found!(pr);
if !rss.is_valid_remote_private_route(&rsid) {
apibail_invalid_target!();
};
Ok(rpc_processor::Destination::PrivateRoute {
@ -197,27 +196,38 @@ impl RoutingContext {
///////////////////////////////////
/// DHT Values
pub async fn get_value(&self, _value_key: ValueKey) -> Result<Vec<u8>, VeilidAPIError> {
pub async fn get_value(
&self,
_key: TypedKey,
_subkey: ValueSubkey,
) -> Result<ValueData, VeilidAPIError> {
panic!("unimplemented");
}
pub async fn set_value(
&self,
_value_key: ValueKey,
_value: Vec<u8>,
_key: TypedKey,
_subkey: ValueSubkey,
_value: ValueData,
) -> Result<bool, VeilidAPIError> {
panic!("unimplemented");
}
pub async fn watch_value(
&self,
_value_key: ValueKey,
_callback: ValueChangeCallback,
_key: TypedKey,
_subkeys: &[ValueSubkeyRange],
_expiration: Timestamp,
_count: u32,
) -> Result<bool, VeilidAPIError> {
panic!("unimplemented");
}
pub async fn cancel_watch_value(&self, _value_key: ValueKey) -> Result<bool, VeilidAPIError> {
pub async fn cancel_watch_value(
&self,
_key: TypedKey,
_subkeys: &[ValueSubkeyRange],
) -> Result<bool, VeilidAPIError> {
panic!("unimplemented");
}

View File

@ -17,6 +17,12 @@ pub type ByteCount = AlignedU64;
pub type TunnelId = AlignedU64;
/// Value schema
pub type ValueSchema = FourCC;
/// Value subkey
pub type ValueSubkey = u32;
/// Value subkey range
pub type ValueSubkeyRange = (u32, u32);
/// Value sequence number
pub type ValueSeqNum = u32;
/// FOURCC code
#[derive(
@ -291,6 +297,17 @@ pub struct VeilidStateConfig {
pub config: VeilidConfigInner,
}
#[derive(
Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,
)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct VeilidValueChange {
key: TypedKey,
subkeys: Vec<ValueSubkey>,
count: u32,
value: ValueData,
}
#[derive(Debug, Clone, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(u8), derive(CheckBytes))]
#[serde(tag = "kind")]
@ -302,6 +319,7 @@ pub enum VeilidUpdate {
Network(VeilidStateNetwork),
Config(VeilidStateConfig),
Route(VeilidStateRoute),
ValueChange(VeilidValueChange),
Shutdown,
}
@ -332,7 +350,7 @@ pub struct VeilidState {
)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct ValueData {
pub seq: u32,
pub seq: ValueSeqNum,
pub schema: ValueSchema,
pub data: Vec<u8>,
}
@ -344,7 +362,7 @@ impl ValueData {
data,
}
}
pub fn new_with_seq(seq: u32, schema: ValueSchema, data: Vec<u8>) -> Self {
pub fn new_with_seq(seq: ValueSeqNum, schema: ValueSchema, data: Vec<u8>) -> Self {
Self { seq, schema, data }
}
pub fn change(&mut self, data: Vec<u8>) {
@ -2378,10 +2396,6 @@ pub struct PeerStats {
pub transfer: TransferStatsDownUp, // Stats for communications with the peer
}
pub type ValueChangeCallback = Arc<
dyn Fn(TypedKey, Vec<(u32, u32)>, u32, Vec<u8>) -> SendPinBoxFuture<()> + Send + Sync + 'static,
>;
/////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Clone, Debug, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize)]