cleanup dht stuff and client/server api

This commit is contained in:
John Smith 2023-06-28 23:15:06 -04:00
parent b01fb20ec9
commit 05a9ee754e
34 changed files with 315 additions and 254 deletions

7
Cargo.lock generated
View File

@ -2767,6 +2767,12 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "indent"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9f1a0777d972970f204fdf8ef319f1f4f8459131636d7e3c96c5d59570d0fa6"
[[package]]
name = "indenter"
version = "0.3.3"
@ -6400,6 +6406,7 @@ dependencies = [
"flume",
"futures",
"hex",
"indent",
"json",
"log",
"parking_lot 0.12.1",

View File

@ -46,6 +46,7 @@ json = "^0"
stop-token = { version = "^0", default-features = false }
flume = { version = "^0", features = ["async"] }
data-encoding = { version = "^2" }
indent = { version = "0.1.1" }
[dev-dependencies]
serial_test = "^0"

View File

@ -358,21 +358,6 @@ impl ClientApiConnection {
Ok(())
}
pub async fn server_appcall_reply(&self, id: u64, msg: Vec<u8>) -> Result<(), String> {
trace!("ClientApiConnection::appcall_reply");
let mut req = json::JsonValue::new_object();
req["op"] = "AppCallReply".into();
req["call_id"] = id.to_string().into();
req["message"] = data_encoding::BASE64URL_NOPAD.encode(&msg).into();
let Some(resp) = self.perform_request(req).await else {
return Err("Cancelled".to_owned());
};
if resp.has_key("error") {
return Err(resp["error"].to_string());
}
Ok(())
}
// Start Client API connection
pub async fn connect(&self, connect_addr: SocketAddr) -> Result<(), String> {
trace!("ClientApiConnection::connect");

View File

@ -2,6 +2,7 @@ use crate::client_api_connection::*;
use crate::settings::Settings;
use crate::tools::*;
use crate::ui::*;
use indent::indent_all_by;
use std::net::SocketAddr;
use std::time::SystemTime;
use veilid_tools::*;
@ -106,33 +107,44 @@ impl CommandProcessor {
pub fn cmd_help(&self, _rest: Option<String>, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_help");
self.ui_sender().add_node_event(
Level::Info,
r#"Commands:
exit/quit exit the client
disconnect disconnect the client from the Veilid node
shutdown shut the server down
attach attach the server to the Veilid network
detach detach the server from the Veilid network
debug [command] send a debugging command to the Veilid server
change_log_level <layer> <level> change the log level for a tracing layer
layers include:
all, terminal, system, api, file, otlp
levels include:
error, warn, info, debug, trace
reply <call id> <message> reply to an AppCall not handled directly by the server
<call id> must be exact call id reported in VeilidUpdate
<message> can be a string (left trimmed) or
it can start with a '#' followed by a string of undelimited hex bytes
enable [flag] set a flag
disable [flag] unset a flag
valid flags in include:
app_messages
"#
.to_owned(),
);
let capi = self.capi();
let ui = self.ui_sender();
ui.send_callback(callback);
spawn_detached_local(async move {
let out = match capi.server_debug("help".to_owned()).await {
Err(e) => {
error!("Server command 'debug help' failed: {}", e);
ui.send_callback(callback);
return;
}
Ok(v) => v,
};
ui.add_node_event(
Level::Info,
format!(
r#"Client Commands:
exit/quit exit the client
disconnect disconnect the client from the Veilid node
shutdown shut the server down
attach attach the server to the Veilid network
detach detach the server from the Veilid network
change_log_level <layer> <level> change the log level for a tracing layer
layers include:
all, terminal, system, api, file, otlp
levels include:
error, warn, info, debug, trace
enable [flag] set a flag
disable [flag] unset a flag
valid flags in include:
app_messages
Server Debug Commands:
{}
"#,
indent_all_by(4, out)
),
);
ui.send_callback(callback);
});
Ok(())
}
@ -194,12 +206,12 @@ disable [flag] unset a flag
Ok(())
}
pub fn cmd_debug(&self, rest: Option<String>, callback: UICallback) -> Result<(), String> {
pub fn cmd_debug(&self, command_line: String, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_debug");
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
match capi.server_debug(rest.unwrap_or_default()).await {
match capi.server_debug(command_line).await {
Ok(output) => {
ui.add_node_event(Level::Info, output);
ui.send_callback(callback);
@ -248,69 +260,6 @@ disable [flag] unset a flag
Ok(())
}
pub fn cmd_reply(&self, rest: Option<String>, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_reply");
let capi = self.capi();
let ui = self.ui_sender();
let some_last_id = self.inner_mut().last_call_id.take();
spawn_detached_local(async move {
let (first, second) = Self::word_split(&rest.clone().unwrap_or_default());
let (id, msg) = if let Some(second) = second {
let id = match u64::from_str(&first) {
Err(e) => {
ui.add_node_event(Level::Error, format!("invalid appcall id: {}", e));
ui.send_callback(callback);
return;
}
Ok(v) => v,
};
(id, second)
} else {
let id = match some_last_id {
None => {
ui.add_node_event(Level::Error, "must specify last call id".to_owned());
ui.send_callback(callback);
return;
}
Some(v) => v,
};
(id, rest.unwrap_or_default())
};
let msg = if msg[0..1] == "#".to_owned() {
match hex::decode(msg[1..].as_bytes().to_vec()) {
Err(e) => {
ui.add_node_event(Level::Error, format!("invalid hex message: {}", e));
ui.send_callback(callback);
return;
}
Ok(v) => v,
}
} else {
msg[1..].as_bytes().to_vec()
};
let msglen = msg.len();
match capi.server_appcall_reply(id, msg).await {
Ok(()) => {
ui.add_node_event(
Level::Info,
format!("reply sent to {} : {} bytes", id, msglen),
);
ui.send_callback(callback);
return;
}
Err(e) => {
ui.display_string_dialog(
"Server command 'appcall_reply' failed",
e.to_string(),
callback,
);
}
}
});
Ok(())
}
pub fn cmd_enable(&self, rest: Option<String>, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_enable");
@ -368,16 +317,10 @@ disable [flag] unset a flag
"shutdown" => self.cmd_shutdown(callback),
"attach" => self.cmd_attach(callback),
"detach" => self.cmd_detach(callback),
"debug" => self.cmd_debug(rest, callback),
"change_log_level" => self.cmd_change_log_level(rest, callback),
"reply" => self.cmd_reply(rest, callback),
"enable" => self.cmd_enable(rest, callback),
"disable" => self.cmd_disable(rest, callback),
_ => {
let ui = self.ui_sender();
ui.send_callback(callback);
Err(format!("Invalid command: {}", cmd))
}
_ => self.cmd_debug(command_line.to_owned(), callback),
}
}

View File

@ -248,8 +248,8 @@ impl Crypto {
node_ids: &[TypedKey],
data: &[u8],
typed_signatures: &[TypedSignature],
) -> VeilidAPIResult<TypedKeySet> {
let mut out = TypedKeySet::with_capacity(node_ids.len());
) -> VeilidAPIResult<TypedKeyGroup> {
let mut out = TypedKeyGroup::with_capacity(node_ids.len());
for sig in typed_signatures {
for nid in node_ids {
if nid.kind == sig.kind {

View File

@ -17,7 +17,7 @@ use super::*;
)]
#[archive_attr(repr(C), derive(CheckBytes, Hash, PartialEq, Eq))]
#[serde(from = "Vec<CryptoTyped<K>>", into = "Vec<CryptoTyped<K>>")]
pub struct CryptoTypedSet<K = PublicKey>
pub struct CryptoTypedGroup<K = PublicKey>
where
K: Clone
+ Copy
@ -37,7 +37,7 @@ where
items: Vec<CryptoTyped<K>>,
}
impl<K> CryptoTypedSet<K>
impl<K> CryptoTypedGroup<K>
where
K: Clone
+ Copy
@ -151,7 +151,7 @@ where
}
}
impl<K> core::ops::Deref for CryptoTypedSet<K>
impl<K> core::ops::Deref for CryptoTypedGroup<K>
where
K: Clone
+ Copy
@ -175,7 +175,7 @@ where
}
}
impl<K> fmt::Display for CryptoTypedSet<K>
impl<K> fmt::Display for CryptoTypedGroup<K>
where
K: Clone
+ Copy
@ -205,7 +205,7 @@ where
write!(f, "]")
}
}
impl<K> FromStr for CryptoTypedSet<K>
impl<K> FromStr for CryptoTypedGroup<K>
where
K: Clone
+ Copy
@ -238,7 +238,7 @@ where
Ok(Self { items })
}
}
impl<K> From<CryptoTyped<K>> for CryptoTypedSet<K>
impl<K> From<CryptoTyped<K>> for CryptoTypedGroup<K>
where
K: Clone
+ Copy
@ -255,12 +255,12 @@ where
<K as RkyvArchive>::Archived: Hash + PartialEq + Eq,
{
fn from(x: CryptoTyped<K>) -> Self {
let mut tks = CryptoTypedSet::<K>::with_capacity(1);
let mut tks = CryptoTypedGroup::<K>::with_capacity(1);
tks.add(x);
tks
}
}
impl<K> From<Vec<CryptoTyped<K>>> for CryptoTypedSet<K>
impl<K> From<Vec<CryptoTyped<K>>> for CryptoTypedGroup<K>
where
K: Clone
+ Copy
@ -277,12 +277,12 @@ where
<K as RkyvArchive>::Archived: Hash + PartialEq + Eq,
{
fn from(x: Vec<CryptoTyped<K>>) -> Self {
let mut tks = CryptoTypedSet::<K>::with_capacity(x.len());
let mut tks = CryptoTypedGroup::<K>::with_capacity(x.len());
tks.add_all(&x);
tks
}
}
impl<K> From<&[CryptoTyped<K>]> for CryptoTypedSet<K>
impl<K> From<&[CryptoTyped<K>]> for CryptoTypedGroup<K>
where
K: Clone
+ Copy
@ -299,12 +299,12 @@ where
<K as RkyvArchive>::Archived: Hash + PartialEq + Eq,
{
fn from(x: &[CryptoTyped<K>]) -> Self {
let mut tks = CryptoTypedSet::<K>::with_capacity(x.len());
let mut tks = CryptoTypedGroup::<K>::with_capacity(x.len());
tks.add_all(x);
tks
}
}
impl<K> Into<Vec<CryptoTyped<K>>> for CryptoTypedSet<K>
impl<K> Into<Vec<CryptoTyped<K>>> for CryptoTypedGroup<K>
where
K: Clone
+ Copy

View File

@ -42,11 +42,11 @@ pub fn common_crypto_kinds(a: &[CryptoKind], b: &[CryptoKind]) -> Vec<CryptoKind
}
mod crypto_typed;
mod crypto_typed_set;
mod crypto_typed_group;
mod keypair;
pub use crypto_typed::*;
pub use crypto_typed_set::*;
pub use crypto_typed_group::*;
pub use keypair::*;
pub type TypedKey = CryptoTyped<PublicKey>;
@ -55,8 +55,8 @@ pub type TypedKeyPair = CryptoTyped<KeyPair>;
pub type TypedSignature = CryptoTyped<Signature>;
pub type TypedSharedSecret = CryptoTyped<SharedSecret>;
pub type TypedKeySet = CryptoTypedSet<PublicKey>;
pub type TypedSecretSet = CryptoTypedSet<SecretKey>;
pub type TypedKeyPairSet = CryptoTypedSet<KeyPair>;
pub type TypedSignatureSet = CryptoTypedSet<Signature>;
pub type TypedSharedSecretSet = CryptoTypedSet<SharedSecret>;
pub type TypedKeyGroup = CryptoTypedGroup<PublicKey>;
pub type TypedSecretGroup = CryptoTypedGroup<SecretKey>;
pub type TypedKeyPairGroup = CryptoTypedGroup<KeyPair>;
pub type TypedSignatureGroup = CryptoTypedGroup<Signature>;
pub type TypedSharedSecretGroup = CryptoTypedGroup<SharedSecret>;

View File

@ -112,7 +112,7 @@ impl DiscoveryContext {
&self,
protocol_type: ProtocolType,
address_type: AddressType,
ignore_node_ids: Option<TypedKeySet>,
ignore_node_ids: Option<TypedKeyGroup>,
) -> Option<(SocketAddress, NodeRef)> {
let node_count = {
let config = self.routing_table.network_manager().config();

View File

@ -34,7 +34,7 @@ pub async fn test_signed_node_info() {
node_info.clone(),
)
.unwrap();
let tks: TypedKeySet = TypedKey::new(ck, keypair.key).into();
let tks: TypedKeyGroup = TypedKey::new(ck, keypair.key).into();
let oldtkslen = tks.len();
let sdni = SignedDirectNodeInfo::new(
node_info.clone(),
@ -47,7 +47,7 @@ pub async fn test_signed_node_info() {
// Test incorrect validation
let keypair1 = vcrypto.generate_keypair();
let tks1: TypedKeySet = TypedKey::new(ck, keypair1.key).into();
let tks1: TypedKeyGroup = TypedKey::new(ck, keypair1.key).into();
let sdni = SignedDirectNodeInfo::new(
node_info.clone(),
sni.timestamp(),
@ -57,7 +57,8 @@ pub async fn test_signed_node_info() {
// Test unsupported cryptosystem validation
let fake_crypto_kind: CryptoKind = FourCC::from([0, 1, 2, 3]);
let mut tksfake: TypedKeySet = TypedKey::new(fake_crypto_kind, PublicKey::default()).into();
let mut tksfake: TypedKeyGroup =
TypedKey::new(fake_crypto_kind, PublicKey::default()).into();
let mut sigsfake = sni.signatures().to_vec();
sigsfake.push(TypedSignature::new(fake_crypto_kind, Signature::default()));
tksfake.add(TypedKey::new(ck, keypair.key));
@ -82,7 +83,7 @@ pub async fn test_signed_node_info() {
// Test correct validation
let keypair2 = vcrypto.generate_keypair();
let tks2: TypedKeySet = TypedKey::new(ck, keypair2.key).into();
let tks2: TypedKeyGroup = TypedKey::new(ck, keypair2.key).into();
let oldtks2len = tks2.len();
let sni2 = SignedRelayedNodeInfo::make_signatures(
@ -107,7 +108,7 @@ pub async fn test_signed_node_info() {
// Test incorrect validation
let keypair3 = vcrypto.generate_keypair();
let tks3: TypedKeySet = TypedKey::new(ck, keypair3.key).into();
let tks3: TypedKeyGroup = TypedKey::new(ck, keypair3.key).into();
let srni = SignedRelayedNodeInfo::new(
node_info2.clone(),
@ -120,7 +121,7 @@ pub async fn test_signed_node_info() {
// Test unsupported cryptosystem validation
let fake_crypto_kind: CryptoKind = FourCC::from([0, 1, 2, 3]);
let mut tksfake3: TypedKeySet =
let mut tksfake3: TypedKeyGroup =
TypedKey::new(fake_crypto_kind, PublicKey::default()).into();
let mut sigsfake3 = sni2.signatures().to_vec();
sigsfake3.push(TypedSignature::new(fake_crypto_kind, Signature::default()));

View File

@ -71,9 +71,9 @@ pub struct BucketEntryLocalNetwork {
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct BucketEntryInner {
/// The node ids matching this bucket entry, with the cryptography versions supported by this node as the 'kind' field
validated_node_ids: TypedKeySet,
validated_node_ids: TypedKeyGroup,
/// The node ids claimed by the remote node that use cryptography versions we do not support
unsupported_node_ids: TypedKeySet,
unsupported_node_ids: TypedKeyGroup,
/// The set of envelope versions supported by the node inclusive of the requirements of any relay the node may be using
envelope_support: Vec<u8>,
/// If this node has updated it's SignedNodeInfo since our network
@ -123,7 +123,7 @@ impl BucketEntryInner {
}
/// Get all node ids
pub fn node_ids(&self) -> TypedKeySet {
pub fn node_ids(&self) -> TypedKeyGroup {
let mut node_ids = self.validated_node_ids.clone();
node_ids.add_all(&self.unsupported_node_ids);
node_ids
@ -786,8 +786,8 @@ impl BucketEntry {
let now = get_aligned_timestamp();
let inner = BucketEntryInner {
validated_node_ids: TypedKeySet::from(first_node_id),
unsupported_node_ids: TypedKeySet::new(),
validated_node_ids: TypedKeyGroup::from(first_node_id),
unsupported_node_ids: TypedKeyGroup::new(),
envelope_support: Vec::new(),
updated_since_last_network_change: false,
last_connections: BTreeMap::new(),

View File

@ -82,9 +82,9 @@ pub struct RoutingTableUnlockedInner {
network_manager: NetworkManager,
/// The current node's public DHT keys
node_id: TypedKeySet,
node_id: TypedKeyGroup,
/// The current node's public DHT secrets
node_id_secret: TypedSecretSet,
node_id_secret: TypedSecretGroup,
/// Buckets to kick on our next kick task
kick_queue: Mutex<BTreeSet<BucketIndex>>,
/// Background process for computing statistics
@ -131,7 +131,7 @@ impl RoutingTableUnlockedInner {
self.node_id_secret.get(kind).unwrap().value
}
pub fn node_ids(&self) -> TypedKeySet {
pub fn node_ids(&self) -> TypedKeyGroup {
self.node_id.clone()
}
@ -648,7 +648,7 @@ impl RoutingTable {
inner.get_all_nodes(self.clone(), cur_ts)
}
fn queue_bucket_kicks(&self, node_ids: TypedKeySet) {
fn queue_bucket_kicks(&self, node_ids: TypedKeyGroup) {
for node_id in node_ids.iter() {
// Skip node ids we didn't add to buckets
if !VALID_CRYPTO_KINDS.contains(&node_id.kind) {

View File

@ -106,7 +106,7 @@ pub trait NodeRefBase: Sized {
fn routing_table(&self) -> RoutingTable {
self.common().routing_table.clone()
}
fn node_ids(&self) -> TypedKeySet {
fn node_ids(&self) -> TypedKeyGroup {
self.operate(|_rti, e| e.node_ids())
}
fn best_node_id(&self) -> TypedKey {

View File

@ -59,8 +59,8 @@ impl RouteSetSpecDetail {
pub fn get_route_by_key_mut(&mut self, key: &PublicKey) -> Option<&mut RouteSpecDetail> {
self.route_set.get_mut(key)
}
pub fn get_route_set_keys(&self) -> TypedKeySet {
let mut tks = TypedKeySet::new();
pub fn get_route_set_keys(&self) -> TypedKeyGroup {
let mut tks = TypedKeyGroup::new();
for (k, v) in &self.route_set {
tks.add(TypedKey::new(v.crypto_kind, *k));
}

View File

@ -117,14 +117,14 @@ impl RouteSpecStoreCache {
}
/// calculate how many times a node with a particular node id set has been used anywhere in the path of our allocated routes
pub fn get_used_node_count(&self, node_ids: &TypedKeySet) -> usize {
pub fn get_used_node_count(&self, node_ids: &TypedKeyGroup) -> usize {
node_ids.iter().fold(0usize, |acc, k| {
acc + self.used_nodes.get(&k.value).cloned().unwrap_or_default()
})
}
/// calculate how many times a node with a particular node id set has been used at the end of the path of our allocated routes
pub fn get_used_end_node_count(&self, node_ids: &TypedKeySet) -> usize {
pub fn get_used_end_node_count(&self, node_ids: &TypedKeyGroup) -> usize {
node_ids.iter().fold(0usize, |acc, k| {
acc + self
.used_end_nodes

View File

@ -658,7 +658,7 @@ impl RoutingTableInner {
fn create_node_ref<F>(
&mut self,
outer_self: RoutingTable,
node_ids: &TypedKeySet,
node_ids: &TypedKeyGroup,
update_func: F,
) -> EyreResult<NodeRef>
where
@ -873,7 +873,7 @@ impl RoutingTableInner {
descriptor: ConnectionDescriptor,
timestamp: Timestamp,
) -> EyreResult<NodeRef> {
let nr = self.create_node_ref(outer_self, &TypedKeySet::from(node_id), |_rti, e| {
let nr = self.create_node_ref(outer_self, &TypedKeyGroup::from(node_id), |_rti, e| {
// this node is live because it literally just connected to us
e.touch_last_seen(timestamp);
})?;

View File

@ -7,7 +7,7 @@ pub const BOOTSTRAP_TXT_VERSION_0: u8 = 0;
#[derive(Clone, Debug)]
pub struct BootstrapRecord {
node_ids: TypedKeySet,
node_ids: TypedKeyGroup,
envelope_support: Vec<u8>,
dial_info_details: Vec<DialInfoDetail>,
}
@ -63,7 +63,7 @@ impl RoutingTable {
envelope_support.sort();
// Node Id
let mut node_ids = TypedKeySet::new();
let mut node_ids = TypedKeyGroup::new();
for node_id_str in records[2].split(",") {
let node_id_str = node_id_str.trim();
let node_id = match TypedKey::from_str(&node_id_str) {

View File

@ -83,7 +83,7 @@ pub async fn test_routingtable_buckets_round_trip() {
}
pub async fn test_round_trip_peerinfo() {
let mut tks = TypedKeySet::new();
let mut tks = TypedKeyGroup::new();
tks.add(TypedKey::new(
CRYPTO_KIND_VLD0,
CryptoKey::new([

View File

@ -5,12 +5,12 @@ use super::*;
)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct PeerInfo {
node_ids: TypedKeySet,
node_ids: TypedKeyGroup,
signed_node_info: SignedNodeInfo,
}
impl PeerInfo {
pub fn new(node_ids: TypedKeySet, signed_node_info: SignedNodeInfo) -> Self {
pub fn new(node_ids: TypedKeyGroup, signed_node_info: SignedNodeInfo) -> Self {
assert!(node_ids.len() > 0 && node_ids.len() <= MAX_CRYPTO_KINDS);
Self {
node_ids,
@ -27,13 +27,13 @@ impl PeerInfo {
Ok(())
}
pub fn node_ids(&self) -> &TypedKeySet {
pub fn node_ids(&self) -> &TypedKeyGroup {
&self.node_ids
}
pub fn signed_node_info(&self) -> &SignedNodeInfo {
&self.signed_node_info
}
pub fn destructure(self) -> (TypedKeySet, SignedNodeInfo) {
pub fn destructure(self) -> (TypedKeyGroup, SignedNodeInfo) {
(self.node_ids, self.signed_node_info)
}

View File

@ -22,7 +22,11 @@ impl SignedDirectNodeInfo {
}
}
pub fn validate(&self, node_ids: &TypedKeySet, crypto: Crypto) -> VeilidAPIResult<TypedKeySet> {
pub fn validate(
&self,
node_ids: &TypedKeyGroup,
crypto: Crypto,
) -> VeilidAPIResult<TypedKeyGroup> {
let node_info_bytes = Self::make_signature_bytes(&self.node_info, self.timestamp)?;
// Verify the signatures that we can

View File

@ -10,7 +10,11 @@ pub enum SignedNodeInfo {
}
impl SignedNodeInfo {
pub fn validate(&self, node_ids: &TypedKeySet, crypto: Crypto) -> VeilidAPIResult<TypedKeySet> {
pub fn validate(
&self,
node_ids: &TypedKeyGroup,
crypto: Crypto,
) -> VeilidAPIResult<TypedKeyGroup> {
match self {
SignedNodeInfo::Direct(d) => d.validate(node_ids, crypto),
SignedNodeInfo::Relayed(r) => r.validate(node_ids, crypto),
@ -36,9 +40,9 @@ impl SignedNodeInfo {
SignedNodeInfo::Relayed(r) => &r.node_info(),
}
}
pub fn relay_ids(&self) -> TypedKeySet {
pub fn relay_ids(&self) -> TypedKeyGroup {
match self {
SignedNodeInfo::Direct(_) => TypedKeySet::new(),
SignedNodeInfo::Direct(_) => TypedKeyGroup::new(),
SignedNodeInfo::Relayed(r) => r.relay_ids().clone(),
}
}

View File

@ -7,7 +7,7 @@ use super::*;
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct SignedRelayedNodeInfo {
node_info: NodeInfo,
relay_ids: TypedKeySet,
relay_ids: TypedKeyGroup,
relay_info: SignedDirectNodeInfo,
timestamp: Timestamp,
signatures: Vec<TypedSignature>,
@ -19,7 +19,7 @@ impl SignedRelayedNodeInfo {
/// All signatures are stored however, as this can be passed to other nodes that may be able to validate those signatures.
pub fn new(
node_info: NodeInfo,
relay_ids: TypedKeySet,
relay_ids: TypedKeyGroup,
relay_info: SignedDirectNodeInfo,
timestamp: Timestamp,
signatures: Vec<TypedSignature>,
@ -33,7 +33,11 @@ impl SignedRelayedNodeInfo {
}
}
pub fn validate(&self, node_ids: &TypedKeySet, crypto: Crypto) -> VeilidAPIResult<TypedKeySet> {
pub fn validate(
&self,
node_ids: &TypedKeyGroup,
crypto: Crypto,
) -> VeilidAPIResult<TypedKeyGroup> {
// Ensure the relay info for the node has a superset of the crypto kinds of the node it is relaying
if common_crypto_kinds(
self.node_info.crypto_support(),
@ -64,7 +68,7 @@ impl SignedRelayedNodeInfo {
crypto: Crypto,
typed_key_pairs: Vec<TypedKeyPair>,
node_info: NodeInfo,
relay_ids: TypedKeySet,
relay_ids: TypedKeyGroup,
relay_info: SignedDirectNodeInfo,
) -> VeilidAPIResult<Self> {
let timestamp = get_aligned_timestamp();
@ -128,7 +132,7 @@ impl SignedRelayedNodeInfo {
pub fn timestamp(&self) -> Timestamp {
self.timestamp
}
pub fn relay_ids(&self) -> &TypedKeySet {
pub fn relay_ids(&self) -> &TypedKeyGroup {
&self.relay_ids
}
pub fn relay_info(&self) -> &SignedDirectNodeInfo {

View File

@ -36,7 +36,7 @@ pub fn decode_peer_info(reader: &veilid_capnp::peer_info::Reader) -> Result<Peer
.reborrow()
.get_signed_node_info()
.map_err(RPCError::protocol)?;
let mut node_ids = TypedKeySet::with_capacity(nids_reader.len() as usize);
let mut node_ids = TypedKeyGroup::with_capacity(nids_reader.len() as usize);
for nid_reader in nids_reader.iter() {
node_ids.add(decode_typed_key(&nid_reader)?);
}

View File

@ -69,7 +69,7 @@ pub fn decode_signed_relayed_node_info(
if rid_count > MAX_CRYPTO_KINDS {
return Err(RPCError::protocol("too many relay ids"));
}
let mut relay_ids = TypedKeySet::with_capacity(rid_count);
let mut relay_ids = TypedKeyGroup::with_capacity(rid_count);
for rid_reader in rids_reader {
let relay_id = decode_typed_key(&rid_reader)?;
relay_ids.add(relay_id);

View File

@ -5,7 +5,7 @@ where
R: Unpin,
{
closest_nodes: Vec<NodeRef>,
called_nodes: TypedKeySet,
called_nodes: HashSet<TypedKey>,
result: Option<Result<R, RPCError>>,
}
@ -62,7 +62,7 @@ where
) -> Arc<Self> {
let context = Mutex::new(FanoutContext {
closest_nodes: Vec::with_capacity(node_count),
called_nodes: TypedKeySet::new(),
called_nodes: HashSet::new(),
result: None,
});
@ -125,7 +125,7 @@ where
if !ctx.called_nodes.contains(&key) {
// New fanout call candidate found
next_node = Some(cn.clone());
ctx.called_nodes.add(key);
ctx.called_nodes.insert(key);
break;
}
}

View File

@ -82,12 +82,14 @@ impl StorageManager {
// Keep the value if we got one and it is newer and it passes schema validation
if let Some(value) = gva.answer.value {
log_stor!(debug "Got value back: len={}", value.value_data().data().len());
let mut ctx = context.lock();
// Ensure we have a schema and descriptor
let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else {
// Got a value but no descriptor for it
// Move to the next node
log_stor!(debug "Got value with no descriptor");
return Ok(None);
};
@ -99,6 +101,7 @@ impl StorageManager {
) {
// Validation failed, ignore this value
// Move to the next node
log_stor!(debug "Schema validation failed on subkey {}", subkey);
return Ok(None);
}
@ -118,15 +121,22 @@ impl StorageManager {
} else if new_seq > prior_seq {
// If the sequence number is greater, start over with the new value
ctx.value = Some(value);
// One node has show us this value so far
// One node has shown us this value so far
ctx.value_count = 1;
} else {
// If the sequence number is older, ignore it
}
}
else {
// If we have no prior value, keep it
ctx.value = Some(value);
// One node has shown us this value so far
ctx.value_count = 1;
}
}
// Return peers if we have some
log_stor!(debug "Fanout call returned peers {}", gva.answer.peers.len());
Ok(Some(gva.answer.peers))
}
};

View File

@ -197,8 +197,8 @@ fn config_callback(key: String) -> ConfigCallbackReturn {
"network.reverse_connection_receipt_time_ms" => Ok(Box::new(5_000u32)),
"network.hole_punch_receipt_time_ms" => Ok(Box::new(5_000u32)),
"network.network_key_password" => Ok(Box::new(Option::<String>::None)),
"network.routing_table.node_id" => Ok(Box::new(TypedKeySet::new())),
"network.routing_table.node_id_secret" => Ok(Box::new(TypedSecretSet::new())),
"network.routing_table.node_id" => Ok(Box::new(TypedKeyGroup::new())),
"network.routing_table.node_id_secret" => Ok(Box::new(TypedSecretGroup::new())),
"network.routing_table.bootstrap" => Ok(Box::new(Vec::<String>::new())),
"network.routing_table.limit_over_attached" => Ok(Box::new(64u32)),
"network.routing_table.limit_fully_attached" => Ok(Box::new(32u32)),

View File

@ -235,6 +235,14 @@ fn get_public_key(text: &str) -> Option<PublicKey> {
PublicKey::from_str(text).ok()
}
fn get_crypto_system_version(crypto: Crypto) -> impl FnOnce(&str) -> Option<CryptoSystemVersion> {
move |text| {
let kindstr = get_string(text)?;
let kind = CryptoKind::from_str(&kindstr).ok()?;
crypto.get(kind)
}
}
fn get_dht_key(
routing_table: RoutingTable,
) -> impl FnOnce(&str) -> Option<(TypedKey, Option<SafetySelection>)> {
@ -381,10 +389,10 @@ fn get_debug_argument_at<T, G: FnOnce(&str) -> Option<T>>(
Ok(val)
}
fn print_data_truncated(data: Vec<u8>) -> String {
fn print_data_truncated(data: &[u8]) -> String {
// check is message body is ascii printable
let mut printable = true;
for c in &data {
for c in data {
if *c < 32 || *c > 126 {
printable = false;
}
@ -445,6 +453,24 @@ impl VeilidAPI {
Ok(routing_table.debug_info_txtrecord().await)
}
async fn debug_keypair(&self, args: String) -> VeilidAPIResult<String> {
let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect();
let crypto = self.crypto()?;
let vcrypto = get_debug_argument_at(
&args,
0,
"debug_keypair",
"kind",
get_crypto_system_version(crypto.clone()),
)
.unwrap_or_else(|_| crypto.best());
// Generate a keypair
let out = TypedKeyPair::new(vcrypto.kind(), vcrypto.generate_keypair()).to_string();
Ok(out)
}
async fn debug_entries(&self, args: String) -> VeilidAPIResult<String> {
let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect();
@ -996,8 +1022,18 @@ impl VeilidAPI {
get_dht_key(routing_table),
)?;
let subkey = get_debug_argument_at(&args, 2, "debug_record_get", "subkey", get_number)?;
let force_refresh =
get_debug_argument_at(&args, 3, "debug_record_get", "force_refresh", get_string);
let force_refresh = if args.len() >= 4 {
Some(get_debug_argument_at(
&args,
3,
"debug_record_get",
"force_refresh",
get_string,
)?)
} else {
None
};
let force_refresh = if let Some(force_refresh) = force_refresh {
if &force_refresh == "force" {
true
@ -1021,7 +1057,7 @@ impl VeilidAPI {
};
// Do a record get
let record = match rc.open_dht_record(key, None).await {
let _record = match rc.open_dht_record(key, None).await {
Err(e) => return Ok(format!("Can't open DHT record: {}", e)),
Ok(v) => v,
};
@ -1029,7 +1065,18 @@ impl VeilidAPI {
.get_dht_value(key, subkey as ValueSubkey, force_refresh)
.await
{
Err(e) => return Ok(format!("Can't get DHT value: {}", e)),
Err(e) => {
match rc.close_dht_record(key).await {
Err(e) => {
return Ok(format!(
"Can't get DHT value and can't close DHT record: {}",
e
))
}
Ok(v) => v,
};
return Ok(format!("Can't get DHT value: {}", e));
}
Ok(v) => v,
};
let out = if let Some(value) = value {
@ -1044,6 +1091,57 @@ impl VeilidAPI {
return Ok(out);
}
async fn debug_record_delete(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let key = get_debug_argument_at(&args, 1, "debug_record_delete", "key", get_typed_key)?;
// Do a record delete
let rc = self.routing_context();
match rc.delete_dht_record(key).await {
Err(e) => return Ok(format!("Can't delete DHT record: {}", e)),
Ok(v) => v,
};
Ok(format!("DHT record deleted"))
}
async fn debug_record_info(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let netman = self.network_manager()?;
let routing_table = netman.routing_table();
let (key, ss) = get_debug_argument_at(
&args,
1,
"debug_record_info",
"key",
get_dht_key(routing_table),
)?;
// Get routing context with optional privacy
let rc = self.routing_context();
let rc = if let Some(ss) = ss {
let rcp = match rc.with_custom_privacy(ss) {
Err(e) => return Ok(format!("Can't use safety selection: {}", e)),
Ok(v) => v,
};
rcp
} else {
rc
};
// Do a record get
let record = match rc.open_dht_record(key, None).await {
Err(e) => return Ok(format!("Can't open DHT record: {}", e)),
Ok(v) => v,
};
let out = format!("{:#?}", record);
match rc.close_dht_record(key).await {
Err(e) => return Ok(format!("Can't close DHT record: {}", e)),
Ok(v) => v,
};
return Ok(out);
}
async fn debug_record(&self, args: String) -> VeilidAPIResult<String> {
let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect();
@ -1055,57 +1153,63 @@ impl VeilidAPI {
self.debug_record_purge(args).await
} else if command == "get" {
self.debug_record_get(args).await
} else if command == "delete" {
self.debug_record_delete(args).await
} else if command == "info" {
self.debug_record_info(args).await
} else {
Ok(">>> Unknown command\n".to_owned())
}
}
pub async fn debug_help(&self, _args: String) -> VeilidAPIResult<String> {
Ok(r#">>> Debug commands:
help
buckets [dead|reliable]
dialinfo
entries [dead|reliable]
entry <node>
nodeinfo
config [configkey [new value]]
txtrecord
purge <buckets|connections|routes>
attach
detach
restart network
ping <destination>
contact <node>[<modifiers>]
route allocate [ord|*ord] [rel] [<count>] [in|out]
release <route>
publish <route> [full]
unpublish <route>
print <route>
list
import <blob>
test <route>
record list <local|remote>
purge <local|remote> [bytes]
get <dhtkey> <subkey> [force]
<configkey> is: dot path like network.protocol.udp.enabled
<destination> is:
* direct: <node>[+<safety>][<modifiers>]
* relay: <relay>@<target>[+<safety>][<modifiers>]
* private: #<id>[+<safety>]
<safety> is:
* unsafe: -[ord|*ord]
* safe: [route][,ord|*ord][,rel][,<count>]
<modifiers> is: [/<protocoltype>][/<addresstype>][/<routingdomain>]
<protocoltype> is: udp|tcp|ws|wss
<addresstype> is: ipv4|ipv6
<routingdomain> is: public|local
<dhtkey> is: <key>[+<safety>]
<subkey> is: a number: 2
<subkeys> is:
* a number: 2
* a comma-separated inclusive range list: 1..=3,5..=8
"#
Ok(r#"buckets [dead|reliable]
dialinfo
entries [dead|reliable]
entry <node>
nodeinfo
config [configkey [new value]]
txtrecord
keypair
purge <buckets|connections|routes>
attach
detach
restart network
contact <node>[<modifiers>]
ping <destination>
route allocate [ord|*ord] [rel] [<count>] [in|out]
release <route>
publish <route> [full]
unpublish <route>
print <route>
list
import <blob>
test <route>
record list <local|remote>
purge <local|remote> [bytes]
get <key>[+<safety>] <subkey> [force]
delete <key>
info <key>
--------------------------------------------------------------------
<key> is: VLD0:GsgXCRPrzSK6oBNgxhNpm-rTYFd02R0ySx6j9vbQBG4
* also <node>, <relay>, <target>, <route>
<configkey> is: dot path like network.protocol.udp.enabled
<destination> is:
* direct: <node>[+<safety>][<modifiers>]
* relay: <relay>@<target>[+<safety>][<modifiers>]
* private: #<id>[+<safety>]
<safety> is:
* unsafe: -[ord|*ord]
* safe: [route][,ord|*ord][,rel][,<count>]
<modifiers> is: [/<protocoltype>][/<addresstype>][/<routingdomain>]
<protocoltype> is: udp|tcp|ws|wss
<addresstype> is: ipv4|ipv6
<routingdomain> is: public|local
<subkey> is: a number: 2
<subkeys> is:
* a number: 2
* a comma-separated inclusive range list: 1..=3,5..=8
"#
.to_owned())
}
@ -1127,6 +1231,8 @@ impl VeilidAPI {
self.debug_dialinfo(rest).await
} else if arg == "txtrecord" {
self.debug_txtrecord(rest).await
} else if arg == "keypair" {
self.debug_keypair(rest).await
} else if arg == "entries" {
self.debug_entries(rest).await
} else if arg == "entry" {
@ -1152,7 +1258,7 @@ impl VeilidAPI {
} else if arg == "record" {
self.debug_record(rest).await
} else {
Err(VeilidAPIError::generic("Unknown debug command"))
Err(VeilidAPIError::generic("Unknown server debug command"))
}
};
res

View File

@ -200,7 +200,7 @@ pub enum ResponseOp {
VerifySignatures {
#[serde(flatten)]
#[schemars(with = "ApiResult<Vec<String>>")]
result: ApiResultWithVecString<TypedKeySet>,
result: ApiResultWithVecString<TypedKeyGroup>,
},
GenerateSignatures {
#[serde(flatten)]

View File

@ -112,8 +112,8 @@ pub fn fix_veilidconfiginner() -> VeilidConfigInner {
hole_punch_receipt_time_ms: 9000,
network_key_password: None,
routing_table: VeilidConfigRoutingTable {
node_id: TypedKeySet::new(),
node_id_secret: TypedSecretSet::new(),
node_id: TypedKeyGroup::new(),
node_id_secret: TypedSecretGroup::new(),
bootstrap: vec!["boots".to_string()],
limit_over_attached: 1,
limit_fully_attached: 2,

View File

@ -348,9 +348,9 @@ pub struct VeilidConfigRPC {
)]
pub struct VeilidConfigRoutingTable {
#[schemars(with = "Vec<String>")]
pub node_id: TypedKeySet,
pub node_id: TypedKeyGroup,
#[schemars(with = "Vec<String>")]
pub node_id_secret: TypedSecretSet,
pub node_id_secret: TypedSecretGroup,
pub bootstrap: Vec<String>,
pub limit_over_attached: u32,
pub limit_fully_attached: u32,
@ -785,7 +785,7 @@ impl VeilidConfig {
let mut safe_cfg = self.inner.read().clone();
// Remove secrets
safe_cfg.network.routing_table.node_id_secret = TypedSecretSet::new();
safe_cfg.network.routing_table.node_id_secret = TypedSecretGroup::new();
safe_cfg.protected_store.device_encryption_key_password = "".to_owned();
safe_cfg.protected_store.new_device_encryption_key_password = None;
@ -1075,8 +1075,8 @@ impl VeilidConfig {
crypto: Crypto,
table_store: TableStore,
) -> VeilidAPIResult<()> {
let mut out_node_id = TypedKeySet::new();
let mut out_node_id_secret = TypedSecretSet::new();
let mut out_node_id = TypedKeyGroup::new();
let mut out_node_id_secret = TypedSecretGroup::new();
for ck in VALID_CRYPTO_KINDS {
let vcrypto = crypto

View File

@ -4,7 +4,7 @@ use clap::{Arg, ArgMatches, Command};
use std::ffi::OsStr;
use std::path::Path;
use std::str::FromStr;
use veilid_core::{TypedKeySet, TypedSecretSet};
use veilid_core::{TypedKeyGroup, TypedSecretGroup};
fn do_clap_matches(default_config_path: &OsStr) -> Result<clap::ArgMatches, clap::Error> {
let matches = Command::new("veilid-server")
@ -277,12 +277,12 @@ pub fn process_command_line() -> EyreResult<(Settings, ArgMatches)> {
// Split or get secret
let tks =
TypedKeySet::from_str(v).wrap_err("failed to decode node id set from command line")?;
TypedKeyGroup::from_str(v).wrap_err("failed to decode node id set from command line")?;
let buffer = rpassword::prompt_password("Enter secret key set (will not echo): ")
.wrap_err("invalid secret key")?;
let buffer = buffer.trim().to_string();
let tss = TypedSecretSet::from_str(&buffer).wrap_err("failed to decode secret set")?;
let tss = TypedSecretGroup::from_str(&buffer).wrap_err("failed to decode secret set")?;
settingsrw.core.network.routing_table.node_id = Some(tks);
settingsrw.core.network.routing_table.node_id_secret = Some(tss);

View File

@ -38,8 +38,8 @@ fn main() -> EyreResult<()> {
if matches.occurrences_of("generate-key-pair") != 0 {
if let Some(ckstr) = matches.get_one::<String>("generate-key-pair") {
if ckstr == "" {
let mut tks = veilid_core::TypedKeySet::new();
let mut tss = veilid_core::TypedSecretSet::new();
let mut tks = veilid_core::TypedKeyGroup::new();
let mut tss = veilid_core::TypedSecretGroup::new();
for ck in veilid_core::VALID_CRYPTO_KINDS {
let tkp = veilid_core::Crypto::generate_keypair(ck)
.wrap_err("invalid crypto kind")?;

View File

@ -562,8 +562,8 @@ pub struct Dht {
#[derive(Debug, Deserialize, Serialize)]
pub struct RoutingTable {
pub node_id: Option<veilid_core::TypedKeySet>,
pub node_id_secret: Option<veilid_core::TypedSecretSet>,
pub node_id: Option<veilid_core::TypedKeyGroup>,
pub node_id_secret: Option<veilid_core::TypedSecretGroup>,
pub bootstrap: Vec<String>,
pub limit_over_attached: u32,
pub limit_fully_attached: u32,

View File

@ -112,7 +112,7 @@ pub fn debug_duration(dur: u64) -> String {
let msecs = dur / MSEC;
format!(
"{}{}{}{}.{:03}",
"{}{}{}{}.{:03}s",
if days != 0 {
format!("{}d", days)
} else {
@ -128,11 +128,7 @@ pub fn debug_duration(dur: u64) -> String {
} else {
"".to_owned()
},
if secs != 0 {
format!("{}s", secs)
} else {
"".to_owned()
},
secs,
msecs
)
}