Merge branch 'dht-testing' into 'main'

Dht testing

See merge request veilid/veilid!39
This commit is contained in:
John Smith 2023-07-02 04:22:57 +00:00
commit 82e87042a4
77 changed files with 1810 additions and 668 deletions

14
Cargo.lock generated
View File

@ -2767,6 +2767,12 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "indent"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9f1a0777d972970f204fdf8ef319f1f4f8459131636d7e3c96c5d59570d0fa6"
[[package]]
name = "indenter"
version = "0.3.3"
@ -5226,6 +5232,12 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "shell-words"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde"
[[package]]
name = "shlex"
version = "0.1.1"
@ -6400,6 +6412,7 @@ dependencies = [
"flume",
"futures",
"hex",
"indent",
"json",
"log",
"parking_lot 0.12.1",
@ -6486,6 +6499,7 @@ dependencies = [
"serde-big-array",
"serde_json",
"serial_test",
"shell-words",
"simplelog 0.12.1",
"socket2 0.5.3",
"static_assertions",

View File

@ -46,6 +46,7 @@ json = "^0"
stop-token = { version = "^0", default-features = false }
flume = { version = "^0", features = ["async"] }
data-encoding = { version = "^2" }
indent = { version = "0.1.1" }
[dev-dependencies]
serial_test = "^0"

View File

@ -358,21 +358,6 @@ impl ClientApiConnection {
Ok(())
}
pub async fn server_appcall_reply(&self, id: u64, msg: Vec<u8>) -> Result<(), String> {
trace!("ClientApiConnection::appcall_reply");
let mut req = json::JsonValue::new_object();
req["op"] = "AppCallReply".into();
req["call_id"] = id.to_string().into();
req["message"] = data_encoding::BASE64URL_NOPAD.encode(&msg).into();
let Some(resp) = self.perform_request(req).await else {
return Err("Cancelled".to_owned());
};
if resp.has_key("error") {
return Err(resp["error"].to_string());
}
Ok(())
}
// Start Client API connection
pub async fn connect(&self, connect_addr: SocketAddr) -> Result<(), String> {
trace!("ClientApiConnection::connect");

View File

@ -2,6 +2,7 @@ use crate::client_api_connection::*;
use crate::settings::Settings;
use crate::tools::*;
use crate::ui::*;
use indent::indent_all_by;
use std::net::SocketAddr;
use std::time::SystemTime;
use veilid_tools::*;
@ -106,33 +107,42 @@ impl CommandProcessor {
pub fn cmd_help(&self, _rest: Option<String>, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_help");
self.ui_sender().add_node_event(
Level::Info,
r#"Commands:
exit/quit exit the client
disconnect disconnect the client from the Veilid node
shutdown shut the server down
attach attach the server to the Veilid network
detach detach the server from the Veilid network
debug [command] send a debugging command to the Veilid server
change_log_level <layer> <level> change the log level for a tracing layer
layers include:
all, terminal, system, api, file, otlp
levels include:
error, warn, info, debug, trace
reply <call id> <message> reply to an AppCall not handled directly by the server
<call id> must be exact call id reported in VeilidUpdate
<message> can be a string (left trimmed) or
it can start with a '#' followed by a string of undelimited hex bytes
enable [flag] set a flag
disable [flag] unset a flag
valid flags in include:
app_messages
"#
.to_owned(),
);
let capi = self.capi();
let ui = self.ui_sender();
ui.send_callback(callback);
spawn_detached_local(async move {
let out = match capi.server_debug("help".to_owned()).await {
Err(e) => {
error!("Server command 'debug help' failed: {}", e);
ui.send_callback(callback);
return;
}
Ok(v) => v,
};
ui.add_node_event(
Level::Info,
format!(
r#"Client Commands:
exit/quit exit the client
disconnect disconnect the client from the Veilid node
shutdown shut the server down
change_log_level <layer> <level> change the log level for a tracing layer
layers include:
all, terminal, system, api, file, otlp
levels include:
error, warn, info, debug, trace
enable [flag] set a flag
disable [flag] unset a flag
valid flags in include:
app_messages
Server Debug Commands:
{}
"#,
indent_all_by(4, out)
),
);
ui.send_callback(callback);
});
Ok(())
}
@ -157,32 +167,6 @@ disable [flag] unset a flag
Ok(())
}
pub fn cmd_attach(&self, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_attach");
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
if let Err(e) = capi.server_attach().await {
error!("Server command 'attach' failed: {}", e);
}
ui.send_callback(callback);
});
Ok(())
}
pub fn cmd_detach(&self, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_detach");
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
if let Err(e) = capi.server_detach().await {
error!("Server command 'detach' failed: {}", e);
}
ui.send_callback(callback);
});
Ok(())
}
pub fn cmd_disconnect(&self, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_disconnect");
let capi = self.capi();
@ -194,12 +178,12 @@ disable [flag] unset a flag
Ok(())
}
pub fn cmd_debug(&self, rest: Option<String>, callback: UICallback) -> Result<(), String> {
pub fn cmd_debug(&self, command_line: String, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_debug");
let capi = self.capi();
let ui = self.ui_sender();
spawn_detached_local(async move {
match capi.server_debug(rest.unwrap_or_default()).await {
match capi.server_debug(command_line).await {
Ok(output) => {
ui.add_node_event(Level::Info, output);
ui.send_callback(callback);
@ -248,69 +232,6 @@ disable [flag] unset a flag
Ok(())
}
pub fn cmd_reply(&self, rest: Option<String>, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_reply");
let capi = self.capi();
let ui = self.ui_sender();
let some_last_id = self.inner_mut().last_call_id.take();
spawn_detached_local(async move {
let (first, second) = Self::word_split(&rest.clone().unwrap_or_default());
let (id, msg) = if let Some(second) = second {
let id = match u64::from_str(&first) {
Err(e) => {
ui.add_node_event(Level::Error, format!("invalid appcall id: {}", e));
ui.send_callback(callback);
return;
}
Ok(v) => v,
};
(id, second)
} else {
let id = match some_last_id {
None => {
ui.add_node_event(Level::Error, "must specify last call id".to_owned());
ui.send_callback(callback);
return;
}
Some(v) => v,
};
(id, rest.unwrap_or_default())
};
let msg = if msg[0..1] == "#".to_owned() {
match hex::decode(msg[1..].as_bytes().to_vec()) {
Err(e) => {
ui.add_node_event(Level::Error, format!("invalid hex message: {}", e));
ui.send_callback(callback);
return;
}
Ok(v) => v,
}
} else {
msg[1..].as_bytes().to_vec()
};
let msglen = msg.len();
match capi.server_appcall_reply(id, msg).await {
Ok(()) => {
ui.add_node_event(
Level::Info,
format!("reply sent to {} : {} bytes", id, msglen),
);
ui.send_callback(callback);
return;
}
Err(e) => {
ui.display_string_dialog(
"Server command 'appcall_reply' failed",
e.to_string(),
callback,
);
}
}
});
Ok(())
}
pub fn cmd_enable(&self, rest: Option<String>, callback: UICallback) -> Result<(), String> {
trace!("CommandProcessor::cmd_enable");
@ -366,18 +287,10 @@ disable [flag] unset a flag
"quit" => self.cmd_exit(callback),
"disconnect" => self.cmd_disconnect(callback),
"shutdown" => self.cmd_shutdown(callback),
"attach" => self.cmd_attach(callback),
"detach" => self.cmd_detach(callback),
"debug" => self.cmd_debug(rest, callback),
"change_log_level" => self.cmd_change_log_level(rest, callback),
"reply" => self.cmd_reply(rest, callback),
"enable" => self.cmd_enable(rest, callback),
"disable" => self.cmd_disable(rest, callback),
_ => {
let ui = self.ui_sender();
ui.send_callback(callback);
Err(format!("Invalid command: {}", cmd))
}
_ => self.cmd_debug(command_line.to_owned(), callback),
}
}

View File

@ -10,19 +10,25 @@ license = "LGPL-2.0-or-later OR MPL-2.0 OR (MIT AND BSD-3-Clause)"
crate-type = ["cdylib", "staticlib", "rlib"]
[features]
# Common features
default = ["enable-crypto-vld0"]
crypto-test = ["enable-crypto-vld0", "enable-crypto-none"]
crypto-test-none = ["enable-crypto-none"]
enable-crypto-vld0 = []
enable-crypto-none = []
verbose-tracing = []
rt-async-std = ["async-std", "async-std-resolver", "async_executors/async_std", "rtnetlink/smol_socket", "veilid-tools/rt-async-std"]
rt-tokio = ["tokio", "tokio-util", "tokio-stream", "trust-dns-resolver/tokio-runtime", "async_executors/tokio_tp", "async_executors/tokio_io", "async_executors/tokio_timer", "rtnetlink/tokio_socket", "veilid-tools/rt-tokio"]
rt-wasm-bindgen = ["veilid-tools/rt-wasm-bindgen", "async_executors/bindgen"]
# Crypto support features
enable-crypto-vld0 = []
enable-crypto-none = []
# Debugging and testing features
verbose-tracing = []
tracking = []
debug-dht = []
crypto-test = ["enable-crypto-vld0", "enable-crypto-none"]
crypto-test-none = ["enable-crypto-none"]
veilid_core_android_tests = ["dep:paranoid-android"]
veilid_core_ios_tests = ["dep:tracing-oslog"]
tracking = []
network-result-extra = ["veilid-tools/network-result-extra"]
[dependencies]
@ -56,6 +62,7 @@ enumset = { version= "^1", features = ["serde"] }
backtrace = { version = "^0" }
stop-token = { version = "^0", default-features = false }
num-traits = "0.2.15"
shell-words = "1.1.0"
ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] }
x25519-dalek = { version = "^1", default_features = false, features = ["u64_backend"] }

View File

@ -180,14 +180,36 @@ impl AttachmentManager {
}
}
fn update_attaching_detaching_state(&self, state: AttachmentState) {
let update_callback = {
let mut inner = self.inner.lock();
inner.last_attachment_state = state;
if state == AttachmentState::Attaching {
inner.attach_ts = Some(get_aligned_timestamp());
} else if state == AttachmentState::Detached {
inner.attach_ts = None;
} else if state == AttachmentState::Detaching {
// ok
} else {
unreachable!("don't use this for attached states, use update_attachment()");
}
inner.update_callback.clone()
};
if let Some(update_callback) = update_callback {
update_callback(VeilidUpdate::Attachment(VeilidStateAttachment {
state,
public_internet_ready: false,
local_network_ready: false,
}))
}
}
#[instrument(level = "debug", skip(self))]
async fn attachment_maintainer(self) {
{
let mut inner = self.inner.lock();
inner.last_attachment_state = AttachmentState::Attaching;
inner.attach_ts = Some(get_aligned_timestamp());
debug!("attachment starting");
}
debug!("attachment starting");
self.update_attaching_detaching_state(AttachmentState::Attaching);
let netman = self.network_manager();
let mut restart;
@ -226,8 +248,7 @@ impl AttachmentManager {
debug!("stopped maintaining peers");
if !restart {
let mut inner = self.inner.lock();
inner.last_attachment_state = AttachmentState::Detaching;
self.update_attaching_detaching_state(AttachmentState::Detaching);
debug!("attachment stopping");
}
@ -243,12 +264,8 @@ impl AttachmentManager {
sleep(1000).await;
}
{
let mut inner = self.inner.lock();
inner.last_attachment_state = AttachmentState::Detached;
inner.attach_ts = None;
debug!("attachment stopped");
}
self.update_attaching_detaching_state(AttachmentState::Detached);
debug!("attachment stopped");
}
#[instrument(level = "debug", skip_all, err)]

View File

@ -1,5 +1,4 @@
mod blake3digest512;
mod byte_array_types;
mod dh_cache;
mod envelope;
mod receipt;
@ -13,7 +12,7 @@ pub mod tests;
pub mod vld0;
pub use blake3digest512::*;
pub use byte_array_types::*;
pub use crypto_system::*;
pub use dh_cache::*;
pub use envelope::*;
@ -248,8 +247,8 @@ impl Crypto {
node_ids: &[TypedKey],
data: &[u8],
typed_signatures: &[TypedSignature],
) -> VeilidAPIResult<TypedKeySet> {
let mut out = TypedKeySet::with_capacity(node_ids.len());
) -> VeilidAPIResult<TypedKeyGroup> {
let mut out = TypedKeyGroup::with_capacity(node_ids.len());
for sig in typed_signatures {
for nid in node_ids {
if nid.kind == sig.kind {

View File

@ -17,7 +17,7 @@ use super::*;
)]
#[archive_attr(repr(C), derive(CheckBytes, Hash, PartialEq, Eq))]
#[serde(from = "Vec<CryptoTyped<K>>", into = "Vec<CryptoTyped<K>>")]
pub struct CryptoTypedSet<K = PublicKey>
pub struct CryptoTypedGroup<K = PublicKey>
where
K: Clone
+ Copy
@ -37,7 +37,7 @@ where
items: Vec<CryptoTyped<K>>,
}
impl<K> CryptoTypedSet<K>
impl<K> CryptoTypedGroup<K>
where
K: Clone
+ Copy
@ -151,7 +151,7 @@ where
}
}
impl<K> core::ops::Deref for CryptoTypedSet<K>
impl<K> core::ops::Deref for CryptoTypedGroup<K>
where
K: Clone
+ Copy
@ -175,7 +175,7 @@ where
}
}
impl<K> fmt::Display for CryptoTypedSet<K>
impl<K> fmt::Display for CryptoTypedGroup<K>
where
K: Clone
+ Copy
@ -205,7 +205,7 @@ where
write!(f, "]")
}
}
impl<K> FromStr for CryptoTypedSet<K>
impl<K> FromStr for CryptoTypedGroup<K>
where
K: Clone
+ Copy
@ -238,7 +238,7 @@ where
Ok(Self { items })
}
}
impl<K> From<CryptoTyped<K>> for CryptoTypedSet<K>
impl<K> From<CryptoTyped<K>> for CryptoTypedGroup<K>
where
K: Clone
+ Copy
@ -255,12 +255,12 @@ where
<K as RkyvArchive>::Archived: Hash + PartialEq + Eq,
{
fn from(x: CryptoTyped<K>) -> Self {
let mut tks = CryptoTypedSet::<K>::with_capacity(1);
let mut tks = CryptoTypedGroup::<K>::with_capacity(1);
tks.add(x);
tks
}
}
impl<K> From<Vec<CryptoTyped<K>>> for CryptoTypedSet<K>
impl<K> From<Vec<CryptoTyped<K>>> for CryptoTypedGroup<K>
where
K: Clone
+ Copy
@ -277,12 +277,12 @@ where
<K as RkyvArchive>::Archived: Hash + PartialEq + Eq,
{
fn from(x: Vec<CryptoTyped<K>>) -> Self {
let mut tks = CryptoTypedSet::<K>::with_capacity(x.len());
let mut tks = CryptoTypedGroup::<K>::with_capacity(x.len());
tks.add_all(&x);
tks
}
}
impl<K> From<&[CryptoTyped<K>]> for CryptoTypedSet<K>
impl<K> From<&[CryptoTyped<K>]> for CryptoTypedGroup<K>
where
K: Clone
+ Copy
@ -299,12 +299,12 @@ where
<K as RkyvArchive>::Archived: Hash + PartialEq + Eq,
{
fn from(x: &[CryptoTyped<K>]) -> Self {
let mut tks = CryptoTypedSet::<K>::with_capacity(x.len());
let mut tks = CryptoTypedGroup::<K>::with_capacity(x.len());
tks.add_all(x);
tks
}
}
impl<K> Into<Vec<CryptoTyped<K>>> for CryptoTypedSet<K>
impl<K> Into<Vec<CryptoTyped<K>>> for CryptoTypedGroup<K>
where
K: Clone
+ Copy

View File

@ -3,8 +3,7 @@ use super::*;
#[derive(
Clone,
Copy,
Serialize,
Deserialize,
Default,
PartialOrd,
Ord,
PartialEq,
@ -87,3 +86,26 @@ impl TryFrom<&str> for KeyPair {
Self::try_decode(value)
}
}
impl serde::Serialize for KeyPair {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let s = self.encode();
serde::Serialize::serialize(&s, serializer)
}
}
impl<'de> serde::Deserialize<'de> for KeyPair {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = <String as serde::Deserialize>::deserialize(deserializer)?;
if s == "" {
return Ok(KeyPair::default());
}
KeyPair::try_decode(s.as_str()).map_err(serde::de::Error::custom)
}
}

View File

@ -41,12 +41,14 @@ pub fn common_crypto_kinds(a: &[CryptoKind], b: &[CryptoKind]) -> Vec<CryptoKind
out
}
mod byte_array_types;
mod crypto_typed;
mod crypto_typed_set;
mod crypto_typed_group;
mod keypair;
pub use byte_array_types::*;
pub use crypto_typed::*;
pub use crypto_typed_set::*;
pub use crypto_typed_group::*;
pub use keypair::*;
pub type TypedKey = CryptoTyped<PublicKey>;
@ -55,8 +57,8 @@ pub type TypedKeyPair = CryptoTyped<KeyPair>;
pub type TypedSignature = CryptoTyped<Signature>;
pub type TypedSharedSecret = CryptoTyped<SharedSecret>;
pub type TypedKeySet = CryptoTypedSet<PublicKey>;
pub type TypedSecretSet = CryptoTypedSet<SecretKey>;
pub type TypedKeyPairSet = CryptoTypedSet<KeyPair>;
pub type TypedSignatureSet = CryptoTypedSet<Signature>;
pub type TypedSharedSecretSet = CryptoTypedSet<SharedSecret>;
pub type TypedKeyGroup = CryptoTypedGroup<PublicKey>;
pub type TypedSecretGroup = CryptoTypedGroup<SecretKey>;
pub type TypedKeyPairGroup = CryptoTypedGroup<KeyPair>;
pub type TypedSignatureGroup = CryptoTypedGroup<Signature>;
pub type TypedSharedSecretGroup = CryptoTypedGroup<SharedSecret>;

View File

@ -0,0 +1,311 @@
use super::*;
use alloc::collections::btree_map::Entry;
// XXX: Move to config eventually?
const PUNISHMENT_DURATION_MIN: usize = 60;
#[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)]
pub enum AddressFilterError {
#[error("Count exceeded")]
CountExceeded,
#[error("Rate exceeded")]
RateExceeded,
#[error("Address is punished")]
Punished,
}
#[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)]
#[error("Address not in table")]
pub struct AddressNotInTableError {}
#[derive(Debug)]
struct AddressFilterInner {
conn_count_by_ip4: BTreeMap<Ipv4Addr, usize>,
conn_count_by_ip6_prefix: BTreeMap<Ipv6Addr, usize>,
conn_timestamps_by_ip4: BTreeMap<Ipv4Addr, Vec<Timestamp>>,
conn_timestamps_by_ip6_prefix: BTreeMap<Ipv6Addr, Vec<Timestamp>>,
punishments_by_ip4: BTreeMap<Ipv4Addr, Timestamp>,
punishments_by_ip6_prefix: BTreeMap<Ipv6Addr, Timestamp>,
}
#[derive(Debug)]
struct AddressFilterUnlockedInner {
max_connections_per_ip4: usize,
max_connections_per_ip6_prefix: usize,
max_connections_per_ip6_prefix_size: usize,
max_connection_frequency_per_min: usize,
punishment_duration_min: usize,
}
#[derive(Clone, Debug)]
pub struct AddressFilter {
unlocked_inner: Arc<AddressFilterUnlockedInner>,
inner: Arc<Mutex<AddressFilterInner>>,
}
impl AddressFilter {
pub fn new(config: VeilidConfig) -> Self {
let c = config.get();
Self {
unlocked_inner: Arc::new(AddressFilterUnlockedInner {
max_connections_per_ip4: c.network.max_connections_per_ip4 as usize,
max_connections_per_ip6_prefix: c.network.max_connections_per_ip6_prefix as usize,
max_connections_per_ip6_prefix_size: c.network.max_connections_per_ip6_prefix_size
as usize,
max_connection_frequency_per_min: c.network.max_connection_frequency_per_min
as usize,
punishment_duration_min: PUNISHMENT_DURATION_MIN,
}),
inner: Arc::new(Mutex::new(AddressFilterInner {
conn_count_by_ip4: BTreeMap::new(),
conn_count_by_ip6_prefix: BTreeMap::new(),
conn_timestamps_by_ip4: BTreeMap::new(),
conn_timestamps_by_ip6_prefix: BTreeMap::new(),
punishments_by_ip4: BTreeMap::new(),
punishments_by_ip6_prefix: BTreeMap::new(),
})),
}
}
fn purge_old_timestamps(&self, inner: &mut AddressFilterInner, cur_ts: Timestamp) {
// v4
{
let mut dead_keys = Vec::<Ipv4Addr>::new();
for (key, value) in &mut inner.conn_timestamps_by_ip4 {
value.retain(|v| {
// keep timestamps that are less than a minute away
cur_ts.saturating_sub(*v) < TimestampDuration::new(60_000_000u64)
});
if value.is_empty() {
dead_keys.push(*key);
}
}
for key in dead_keys {
inner.conn_timestamps_by_ip4.remove(&key);
}
}
// v6
{
let mut dead_keys = Vec::<Ipv6Addr>::new();
for (key, value) in &mut inner.conn_timestamps_by_ip6_prefix {
value.retain(|v| {
// keep timestamps that are less than a minute away
cur_ts.saturating_sub(*v) < TimestampDuration::new(60_000_000u64)
});
if value.is_empty() {
dead_keys.push(*key);
}
}
for key in dead_keys {
inner.conn_timestamps_by_ip6_prefix.remove(&key);
}
}
}
fn purge_old_punishments(&self, inner: &mut AddressFilterInner, cur_ts: Timestamp) {
// v4
{
let mut dead_keys = Vec::<Ipv4Addr>::new();
for (key, value) in &mut inner.punishments_by_ip4 {
// Drop punishments older than the punishment duration
if cur_ts.as_u64().saturating_sub(value.as_u64())
> self.unlocked_inner.punishment_duration_min as u64 * 60_000_000u64
{
dead_keys.push(*key);
}
}
for key in dead_keys {
inner.punishments_by_ip4.remove(&key);
}
}
// v6
{
let mut dead_keys = Vec::<Ipv6Addr>::new();
for (key, value) in &mut inner.punishments_by_ip6_prefix {
// Drop punishments older than the punishment duration
if cur_ts.as_u64().saturating_sub(value.as_u64())
> self.unlocked_inner.punishment_duration_min as u64 * 60_000_000u64
{
dead_keys.push(*key);
}
}
for key in dead_keys {
inner.punishments_by_ip6_prefix.remove(&key);
}
}
}
fn is_punished_inner(&self, inner: &AddressFilterInner, ipblock: IpAddr) -> bool {
match ipblock {
IpAddr::V4(v4) => {
if inner.punishments_by_ip4.contains_key(&v4) {
return true;
}
}
IpAddr::V6(v6) => {
if inner.punishments_by_ip6_prefix.contains_key(&v6) {
return true;
}
}
}
false
}
pub fn is_punished(&self, addr: IpAddr) -> bool {
let inner = self.inner.lock();
let ipblock = ip_to_ipblock(
self.unlocked_inner.max_connections_per_ip6_prefix_size,
addr,
);
self.is_punished_inner(&*inner, ipblock)
}
pub fn punish(&self, addr: IpAddr) {
log_net!(debug ">>> PUNISHED: {}", addr);
let ts = get_aligned_timestamp();
let ipblock = ip_to_ipblock(
self.unlocked_inner.max_connections_per_ip6_prefix_size,
addr,
);
let mut inner = self.inner.lock();
match ipblock {
IpAddr::V4(v4) => inner
.punishments_by_ip4
.entry(v4)
.and_modify(|v| *v = ts)
.or_insert(ts),
IpAddr::V6(v6) => inner
.punishments_by_ip6_prefix
.entry(v6)
.and_modify(|v| *v = ts)
.or_insert(ts),
};
}
pub async fn address_filter_task_routine(
self,
_stop_token: StopToken,
_last_ts: Timestamp,
cur_ts: Timestamp,
) -> EyreResult<()> {
//
let mut inner = self.inner.lock();
self.purge_old_timestamps(&mut *inner, cur_ts);
self.purge_old_punishments(&mut *inner, cur_ts);
Ok(())
}
pub fn add_connection(&self, addr: IpAddr) -> Result<(), AddressFilterError> {
let inner = &mut *self.inner.lock();
let ipblock = ip_to_ipblock(
self.unlocked_inner.max_connections_per_ip6_prefix_size,
addr,
);
if self.is_punished_inner(inner, ipblock) {
return Err(AddressFilterError::Punished);
}
let ts = get_aligned_timestamp();
self.purge_old_timestamps(inner, ts);
match ipblock {
IpAddr::V4(v4) => {
// See if we have too many connections from this ip block
let cnt = inner.conn_count_by_ip4.entry(v4).or_default();
assert!(*cnt <= self.unlocked_inner.max_connections_per_ip4);
if *cnt == self.unlocked_inner.max_connections_per_ip4 {
warn!("address filter count exceeded: {:?}", v4);
return Err(AddressFilterError::CountExceeded);
}
// See if this ip block has connected too frequently
let tstamps = inner.conn_timestamps_by_ip4.entry(v4).or_default();
tstamps.retain(|v| {
// keep timestamps that are less than a minute away
ts.saturating_sub(*v) < TimestampDuration::new(60_000_000u64)
});
assert!(tstamps.len() <= self.unlocked_inner.max_connection_frequency_per_min);
if tstamps.len() == self.unlocked_inner.max_connection_frequency_per_min {
warn!("address filter rate exceeded: {:?}", v4);
return Err(AddressFilterError::RateExceeded);
}
// If it's okay, add the counts and timestamps
*cnt += 1;
tstamps.push(ts);
}
IpAddr::V6(v6) => {
// See if we have too many connections from this ip block
let cnt = inner.conn_count_by_ip6_prefix.entry(v6).or_default();
assert!(*cnt <= self.unlocked_inner.max_connections_per_ip6_prefix);
if *cnt == self.unlocked_inner.max_connections_per_ip6_prefix {
warn!("address filter count exceeded: {:?}", v6);
return Err(AddressFilterError::CountExceeded);
}
// See if this ip block has connected too frequently
let tstamps = inner.conn_timestamps_by_ip6_prefix.entry(v6).or_default();
assert!(tstamps.len() <= self.unlocked_inner.max_connection_frequency_per_min);
if tstamps.len() == self.unlocked_inner.max_connection_frequency_per_min {
warn!("address filter rate exceeded: {:?}", v6);
return Err(AddressFilterError::RateExceeded);
}
// If it's okay, add the counts and timestamps
*cnt += 1;
tstamps.push(ts);
}
}
Ok(())
}
pub fn remove_connection(&mut self, addr: IpAddr) -> Result<(), AddressNotInTableError> {
let mut inner = self.inner.lock();
let ipblock = ip_to_ipblock(
self.unlocked_inner.max_connections_per_ip6_prefix_size,
addr,
);
let ts = get_aligned_timestamp();
self.purge_old_timestamps(&mut *inner, ts);
match ipblock {
IpAddr::V4(v4) => {
match inner.conn_count_by_ip4.entry(v4) {
Entry::Vacant(_) => {
return Err(AddressNotInTableError {});
}
Entry::Occupied(mut o) => {
let cnt = o.get_mut();
assert!(*cnt > 0);
if *cnt == 0 {
inner.conn_count_by_ip4.remove(&v4);
} else {
*cnt -= 1;
}
}
};
}
IpAddr::V6(v6) => {
match inner.conn_count_by_ip6_prefix.entry(v6) {
Entry::Vacant(_) => {
return Err(AddressNotInTableError {});
}
Entry::Occupied(mut o) => {
let cnt = o.get_mut();
assert!(*cnt > 0);
if *cnt == 0 {
inner.conn_count_by_ip6_prefix.remove(&v6);
} else {
*cnt -= 1;
}
}
};
}
}
Ok(())
}
}

View File

@ -1,176 +0,0 @@
use super::*;
use alloc::collections::btree_map::Entry;
#[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)]
pub enum AddressFilterError {
#[error("Count exceeded")]
CountExceeded,
#[error("Rate exceeded")]
RateExceeded,
}
#[derive(ThisError, Debug, Clone, Copy, PartialEq, Eq)]
#[error("Address not in table")]
pub struct AddressNotInTableError {}
#[derive(Debug)]
pub struct ConnectionLimits {
max_connections_per_ip4: usize,
max_connections_per_ip6_prefix: usize,
max_connections_per_ip6_prefix_size: usize,
max_connection_frequency_per_min: usize,
conn_count_by_ip4: BTreeMap<Ipv4Addr, usize>,
conn_count_by_ip6_prefix: BTreeMap<Ipv6Addr, usize>,
conn_timestamps_by_ip4: BTreeMap<Ipv4Addr, Vec<Timestamp>>,
conn_timestamps_by_ip6_prefix: BTreeMap<Ipv6Addr, Vec<Timestamp>>,
}
impl ConnectionLimits {
pub fn new(config: VeilidConfig) -> Self {
let c = config.get();
Self {
max_connections_per_ip4: c.network.max_connections_per_ip4 as usize,
max_connections_per_ip6_prefix: c.network.max_connections_per_ip6_prefix as usize,
max_connections_per_ip6_prefix_size: c.network.max_connections_per_ip6_prefix_size
as usize,
max_connection_frequency_per_min: c.network.max_connection_frequency_per_min as usize,
conn_count_by_ip4: BTreeMap::new(),
conn_count_by_ip6_prefix: BTreeMap::new(),
conn_timestamps_by_ip4: BTreeMap::new(),
conn_timestamps_by_ip6_prefix: BTreeMap::new(),
}
}
fn purge_old_timestamps(&mut self, cur_ts: Timestamp) {
// v4
{
let mut dead_keys = Vec::<Ipv4Addr>::new();
for (key, value) in &mut self.conn_timestamps_by_ip4 {
value.retain(|v| {
// keep timestamps that are less than a minute away
cur_ts.saturating_sub(*v) < TimestampDuration::new(60_000_000u64)
});
if value.is_empty() {
dead_keys.push(*key);
}
}
for key in dead_keys {
self.conn_timestamps_by_ip4.remove(&key);
}
}
// v6
{
let mut dead_keys = Vec::<Ipv6Addr>::new();
for (key, value) in &mut self.conn_timestamps_by_ip6_prefix {
value.retain(|v| {
// keep timestamps that are less than a minute away
cur_ts.saturating_sub(*v) < TimestampDuration::new(60_000_000u64)
});
if value.is_empty() {
dead_keys.push(*key);
}
}
for key in dead_keys {
self.conn_timestamps_by_ip6_prefix.remove(&key);
}
}
}
pub fn add(&mut self, addr: IpAddr) -> Result<(), AddressFilterError> {
let ipblock = ip_to_ipblock(self.max_connections_per_ip6_prefix_size, addr);
let ts = get_aligned_timestamp();
self.purge_old_timestamps(ts);
match ipblock {
IpAddr::V4(v4) => {
// See if we have too many connections from this ip block
let cnt = &mut *self.conn_count_by_ip4.entry(v4).or_default();
assert!(*cnt <= self.max_connections_per_ip4);
if *cnt == self.max_connections_per_ip4 {
warn!("address filter count exceeded: {:?}", v4);
return Err(AddressFilterError::CountExceeded);
}
// See if this ip block has connected too frequently
let tstamps = &mut self.conn_timestamps_by_ip4.entry(v4).or_default();
tstamps.retain(|v| {
// keep timestamps that are less than a minute away
ts.saturating_sub(*v) < TimestampDuration::new(60_000_000u64)
});
assert!(tstamps.len() <= self.max_connection_frequency_per_min);
if tstamps.len() == self.max_connection_frequency_per_min {
warn!("address filter rate exceeded: {:?}", v4);
return Err(AddressFilterError::RateExceeded);
}
// If it's okay, add the counts and timestamps
*cnt += 1;
tstamps.push(ts);
}
IpAddr::V6(v6) => {
// See if we have too many connections from this ip block
let cnt = &mut *self.conn_count_by_ip6_prefix.entry(v6).or_default();
assert!(*cnt <= self.max_connections_per_ip6_prefix);
if *cnt == self.max_connections_per_ip6_prefix {
warn!("address filter count exceeded: {:?}", v6);
return Err(AddressFilterError::CountExceeded);
}
// See if this ip block has connected too frequently
let tstamps = &mut self.conn_timestamps_by_ip6_prefix.entry(v6).or_default();
assert!(tstamps.len() <= self.max_connection_frequency_per_min);
if tstamps.len() == self.max_connection_frequency_per_min {
warn!("address filter rate exceeded: {:?}", v6);
return Err(AddressFilterError::RateExceeded);
}
// If it's okay, add the counts and timestamps
*cnt += 1;
tstamps.push(ts);
}
}
Ok(())
}
pub fn remove(&mut self, addr: IpAddr) -> Result<(), AddressNotInTableError> {
let ipblock = ip_to_ipblock(self.max_connections_per_ip6_prefix_size, addr);
let ts = get_aligned_timestamp();
self.purge_old_timestamps(ts);
match ipblock {
IpAddr::V4(v4) => {
match self.conn_count_by_ip4.entry(v4) {
Entry::Vacant(_) => {
return Err(AddressNotInTableError {});
}
Entry::Occupied(mut o) => {
let cnt = o.get_mut();
assert!(*cnt > 0);
if *cnt == 0 {
self.conn_count_by_ip4.remove(&v4);
} else {
*cnt -= 1;
}
}
};
}
IpAddr::V6(v6) => {
match self.conn_count_by_ip6_prefix.entry(v6) {
Entry::Vacant(_) => {
return Err(AddressNotInTableError {});
}
Entry::Occupied(mut o) => {
let cnt = o.get_mut();
assert!(*cnt > 0);
if *cnt == 0 {
self.conn_count_by_ip6_prefix.remove(&v6);
} else {
*cnt -= 1;
}
}
};
}
}
Ok(())
}
}

View File

@ -286,6 +286,7 @@ impl ConnectionManager {
local_addr,
&dial_info,
self.arc.connection_initial_timeout_ms,
self.network_manager().address_filter(),
)
.await;
match result_net_res {

View File

@ -29,7 +29,7 @@ pub struct ConnectionTableInner {
protocol_index_by_id: BTreeMap<NetworkConnectionId, usize>,
id_by_descriptor: BTreeMap<ConnectionDescriptor, NetworkConnectionId>,
ids_by_remote: BTreeMap<PeerAddress, Vec<NetworkConnectionId>>,
address_filter: ConnectionLimits,
address_filter: AddressFilter,
}
#[derive(Debug)]
@ -58,7 +58,7 @@ impl ConnectionTable {
protocol_index_by_id: BTreeMap::new(),
id_by_descriptor: BTreeMap::new(),
ids_by_remote: BTreeMap::new(),
address_filter: ConnectionLimits::new(config),
address_filter: AddressFilter::new(config),
})),
}
}
@ -125,7 +125,7 @@ impl ConnectionTable {
// Filter by ip for connection limits
let ip_addr = descriptor.remote_address().to_ip_addr();
match inner.address_filter.add(ip_addr) {
match inner.address_filter.add_connection(ip_addr) {
Ok(()) => {}
Err(e) => {
// Return the connection in the error to be disposed of
@ -258,7 +258,7 @@ impl ConnectionTable {
let ip_addr = remote.to_socket_addr().ip();
inner
.address_filter
.remove(ip_addr)
.remove_connection(ip_addr)
.expect("Inconsistency in connection table");
conn
}

View File

@ -8,7 +8,7 @@ mod wasm;
mod direct_boot;
mod send_data;
mod connection_handle;
mod connection_limits;
mod address_filter;
mod connection_manager;
mod connection_table;
mod network_connection;
@ -29,7 +29,7 @@ pub use stats::*;
////////////////////////////////////////////////////////////////////////////////////////
use connection_handle::*;
use connection_limits::*;
use address_filter::*;
use crypto::*;
use futures_util::stream::FuturesUnordered;
use hashlink::LruCache;
@ -54,6 +54,7 @@ pub const PUBLIC_ADDRESS_CHECK_CACHE_SIZE: usize = 8;
pub const PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS: u32 = 60;
pub const PUBLIC_ADDRESS_INCONSISTENCY_TIMEOUT_US: TimestampDuration = TimestampDuration::new(300_000_000u64); // 5 minutes
pub const PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US: TimestampDuration = TimestampDuration::new(3600_000_000u64); // 60 minutes
pub const ADDRESS_FILTER_TASK_INTERVAL_SECS: u32 = 60;
pub const BOOT_MAGIC: &[u8; 4] = b"BOOT";
#[derive(Copy, Clone, Debug, Default)]
@ -136,6 +137,7 @@ struct NetworkManagerUnlockedInner {
#[cfg(feature="unstable-blockstore")]
block_store: BlockStore,
crypto: Crypto,
address_filter: AddressFilter,
// Accessors
routing_table: RwLock<Option<RoutingTable>>,
components: RwLock<Option<NetworkComponents>>,
@ -143,6 +145,7 @@ struct NetworkManagerUnlockedInner {
// Background processes
rolling_transfers_task: TickTask<EyreReport>,
public_address_check_task: TickTask<EyreReport>,
address_filter_task: TickTask<EyreReport>,
// Network Key
network_key: Option<SharedSecret>,
}
@ -174,18 +177,20 @@ impl NetworkManager {
network_key: Option<SharedSecret>,
) -> NetworkManagerUnlockedInner {
NetworkManagerUnlockedInner {
config,
config: config.clone(),
storage_manager,
protected_store,
table_store,
#[cfg(feature="unstable-blockstore")]
block_store,
crypto,
address_filter: AddressFilter::new(config),
routing_table: RwLock::new(None),
components: RwLock::new(None),
update_callback: RwLock::new(None),
rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS),
public_address_check_task: TickTask::new(PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS),
address_filter_task: TickTask::new(ADDRESS_FILTER_TASK_INTERVAL_SECS),
network_key,
}
}
@ -273,6 +278,9 @@ impl NetworkManager {
pub fn crypto(&self) -> Crypto {
self.unlocked_inner.crypto.clone()
}
pub fn address_filter(&self) -> AddressFilter {
self.unlocked_inner.address_filter.clone()
}
pub fn routing_table(&self) -> RoutingTable {
self.unlocked_inner
.routing_table
@ -894,10 +902,11 @@ impl NetworkManager {
data.len(),
connection_descriptor
);
let remote_addr = connection_descriptor.remote_address().to_ip_addr();
// Network accounting
self.stats_packet_rcvd(
connection_descriptor.remote_address().to_ip_addr(),
remote_addr,
ByteCount::new(data.len() as u64),
);
@ -911,6 +920,7 @@ impl NetworkManager {
// Ensure we can read the magic number
if data.len() < 4 {
log_net!(debug "short packet");
self.address_filter().punish(remote_addr);
return Ok(false);
}
@ -943,6 +953,7 @@ impl NetworkManager {
Ok(v) => v,
Err(e) => {
log_net!(debug "envelope failed to decode: {}", e);
self.address_filter().punish(remote_addr);
return Ok(false);
}
};
@ -1058,7 +1069,7 @@ impl NetworkManager {
Ok(v) => v,
Err(e) => {
log_net!(debug "failed to decrypt envelope body: {}",e);
// xxx: punish nodes that send messages that fail to decrypt eventually
self.address_filter().punish(remote_addr);
return Ok(false);
}
};
@ -1078,8 +1089,6 @@ impl NetworkManager {
};
source_noderef.add_envelope_version(envelope.get_version());
// xxx: deal with spoofing and flooding here?
// Pass message to RPC system
rpc.enqueue_direct_message(
envelope,

View File

@ -370,6 +370,14 @@ impl Network {
c.network.connection_initial_timeout_ms
};
if self
.network_manager()
.address_filter()
.is_punished(dial_info.address().to_ip_addr())
{
return Ok(NetworkResult::no_connection_other("punished"));
}
match dial_info.protocol_type() {
ProtocolType::UDP => {
let peer_socket_addr = dial_info.to_socket_addr();
@ -429,6 +437,14 @@ impl Network {
c.network.connection_initial_timeout_ms
};
if self
.network_manager()
.address_filter()
.is_punished(dial_info.address().to_ip_addr())
{
return Ok(NetworkResult::no_connection_other("punished"));
}
match dial_info.protocol_type() {
ProtocolType::UDP => {
let peer_socket_addr = dial_info.to_socket_addr();

View File

@ -112,7 +112,7 @@ impl DiscoveryContext {
&self,
protocol_type: ProtocolType,
address_type: AddressType,
ignore_node_ids: Option<TypedKeySet>,
ignore_node_ids: Option<TypedKeyGroup>,
) -> Option<(SocketAddress, NodeRef)> {
let node_count = {
let config = self.routing_table.network_manager().config();

View File

@ -108,12 +108,29 @@ impl Network {
}
};
// XXX
// warn!(
// "DEBUGACCEPT: local={} remote={}",
// tcp_stream.local_addr().unwrap(),
// tcp_stream.peer_addr().unwrap(),
// );
// Limit the number of connections from the same IP address
// and the number of total connections
// XXX limiting here instead for connection table? may be faster and avoids tls negotiation
let peer_addr = match tcp_stream.peer_addr() {
Ok(addr) => addr,
Err(e) => {
log_net!(debug "failed to get peer address: {}", e);
return;
}
};
let address_filter = self.network_manager().address_filter();
// Check to see if it is punished
if address_filter.is_punished(peer_addr.ip()) {
return;
}
let local_addr = match tcp_stream.local_addr() {
Ok(addr) => addr,
Err(e) => {
log_net!(debug "failed to get local address: {}", e);
return;
}
};
if let Err(e) = tcp_stream.set_linger(Some(core::time::Duration::from_secs(0))) {
log_net!(debug "Couldn't set TCP linger: {}", e);
@ -127,24 +144,6 @@ impl Network {
let listener_state = listener_state.clone();
let connection_manager = connection_manager.clone();
// Limit the number of connections from the same IP address
// and the number of total connections
let peer_addr = match tcp_stream.peer_addr() {
Ok(addr) => addr,
Err(e) => {
log_net!(debug "failed to get peer address: {}", e);
return;
}
};
let local_addr = match tcp_stream.local_addr() {
Ok(addr) => addr,
Err(e) => {
log_net!(debug "failed to get local address: {}", e);
return;
}
};
// XXX limiting here instead for connection table? may be faster and avoids tls negotiation
log_net!("TCP connection from: {}", peer_addr);
// Create a stream we can peek on

View File

@ -66,8 +66,6 @@ impl Network {
.await
{
Ok(Ok((size, descriptor))) => {
// XXX: Limit the number of packets from the same IP address?
// Network accounting
network_manager.stats_packet_rcvd(
descriptor.remote_address().to_ip_addr(),
@ -143,7 +141,10 @@ impl Network {
let socket_arc = Arc::new(udp_socket);
// Create protocol handler
let udpv4_handler = RawUdpProtocolHandler::new(socket_arc);
let udpv4_handler = RawUdpProtocolHandler::new(
socket_arc,
Some(self.network_manager().address_filter()),
);
inner.outbound_udpv4_protocol_handler = Some(udpv4_handler);
}
@ -164,7 +165,10 @@ impl Network {
let socket_arc = Arc::new(udp_socket);
// Create protocol handler
let udpv6_handler = RawUdpProtocolHandler::new(socket_arc);
let udpv6_handler = RawUdpProtocolHandler::new(
socket_arc,
Some(self.network_manager().address_filter()),
);
inner.outbound_udpv6_protocol_handler = Some(udpv6_handler);
}
@ -191,7 +195,8 @@ impl Network {
let socket_arc = Arc::new(udp_socket);
// Create protocol handler
let protocol_handler = RawUdpProtocolHandler::new(socket_arc);
let protocol_handler =
RawUdpProtocolHandler::new(socket_arc, Some(self.network_manager().address_filter()));
// Create message_handler records
self.inner

View File

@ -22,7 +22,11 @@ impl ProtocolNetworkConnection {
local_address: Option<SocketAddr>,
dial_info: &DialInfo,
timeout_ms: u32,
address_filter: AddressFilter,
) -> io::Result<NetworkResult<ProtocolNetworkConnection>> {
if address_filter.is_punished(dial_info.address().to_ip_addr()) {
return Ok(NetworkResult::no_connection_other("punished"));
}
match dial_info.protocol_type() {
ProtocolType::UDP => {
panic!("Should not connect to UDP dialinfo");

View File

@ -5,13 +5,15 @@ use sockets::*;
pub struct RawUdpProtocolHandler {
socket: Arc<UdpSocket>,
assembly_buffer: AssemblyBuffer,
address_filter: Option<AddressFilter>,
}
impl RawUdpProtocolHandler {
pub fn new(socket: Arc<UdpSocket>) -> Self {
pub fn new(socket: Arc<UdpSocket>, address_filter: Option<AddressFilter>) -> Self {
Self {
socket,
assembly_buffer: AssemblyBuffer::new(),
address_filter,
}
}
@ -21,6 +23,13 @@ impl RawUdpProtocolHandler {
// Get a packet
let (size, remote_addr) = network_result_value_or_log!(self.socket.recv_from(data).await.into_network_result()? => continue);
// Check to see if it is punished
if let Some(af) = self.address_filter.as_ref() {
if af.is_punished(remote_addr.ip()) {
continue;
}
}
// Insert into assembly buffer
let Some(message) = self.assembly_buffer.insert_frame(&data[0..size], remote_addr) else {
continue;
@ -66,6 +75,13 @@ impl RawUdpProtocolHandler {
bail_io_error_other!("sending too large UDP message");
}
// Check to see if it is punished
if let Some(af) = self.address_filter.as_ref() {
if af.is_punished(remote_addr.ip()) {
return Ok(NetworkResult::no_connection_other("punished"));
}
}
// Fragment and send
let sender = |framed_chunk: Vec<u8>, remote_addr: SocketAddr| async move {
let len = network_result_try!(self
@ -111,6 +127,6 @@ impl RawUdpProtocolHandler {
// get local wildcard address for bind
let local_socket_addr = compatible_unspecified_socket_addr(&socket_addr);
let socket = UdpSocket::bind(local_socket_addr).await?;
Ok(RawUdpProtocolHandler::new(Arc::new(socket)))
Ok(RawUdpProtocolHandler::new(Arc::new(socket), None))
}
}

View File

@ -609,7 +609,7 @@ impl Network {
ip_addrs,
tcp_port,
false,
Box::new(move |c, _| Box::new(RawTcpProtocolHandler::new(c))),
Box::new(|c, _| Box::new(RawTcpProtocolHandler::new(c))),
)
.await?;
trace!("TCP: listener started on {:#?}", socket_addresses);

View File

@ -42,6 +42,20 @@ impl NetworkManager {
)
});
}
// Set address filter task
{
let this = self.clone();
self.unlocked_inner
.address_filter_task
.set_routine(move |s, l, t| {
Box::pin(
this.address_filter()
.address_filter_task_routine(s, Timestamp::new(l), Timestamp::new(t))
.instrument(trace_span!(parent: None, "address filter task routine")),
)
});
}
}
pub async fn tick(&self) -> EyreResult<()> {

View File

@ -34,7 +34,7 @@ pub async fn test_signed_node_info() {
node_info.clone(),
)
.unwrap();
let tks: TypedKeySet = TypedKey::new(ck, keypair.key).into();
let tks: TypedKeyGroup = TypedKey::new(ck, keypair.key).into();
let oldtkslen = tks.len();
let sdni = SignedDirectNodeInfo::new(
node_info.clone(),
@ -47,7 +47,7 @@ pub async fn test_signed_node_info() {
// Test incorrect validation
let keypair1 = vcrypto.generate_keypair();
let tks1: TypedKeySet = TypedKey::new(ck, keypair1.key).into();
let tks1: TypedKeyGroup = TypedKey::new(ck, keypair1.key).into();
let sdni = SignedDirectNodeInfo::new(
node_info.clone(),
sni.timestamp(),
@ -57,7 +57,8 @@ pub async fn test_signed_node_info() {
// Test unsupported cryptosystem validation
let fake_crypto_kind: CryptoKind = FourCC::from([0, 1, 2, 3]);
let mut tksfake: TypedKeySet = TypedKey::new(fake_crypto_kind, PublicKey::default()).into();
let mut tksfake: TypedKeyGroup =
TypedKey::new(fake_crypto_kind, PublicKey::default()).into();
let mut sigsfake = sni.signatures().to_vec();
sigsfake.push(TypedSignature::new(fake_crypto_kind, Signature::default()));
tksfake.add(TypedKey::new(ck, keypair.key));
@ -82,7 +83,7 @@ pub async fn test_signed_node_info() {
// Test correct validation
let keypair2 = vcrypto.generate_keypair();
let tks2: TypedKeySet = TypedKey::new(ck, keypair2.key).into();
let tks2: TypedKeyGroup = TypedKey::new(ck, keypair2.key).into();
let oldtks2len = tks2.len();
let sni2 = SignedRelayedNodeInfo::make_signatures(
@ -107,7 +108,7 @@ pub async fn test_signed_node_info() {
// Test incorrect validation
let keypair3 = vcrypto.generate_keypair();
let tks3: TypedKeySet = TypedKey::new(ck, keypair3.key).into();
let tks3: TypedKeyGroup = TypedKey::new(ck, keypair3.key).into();
let srni = SignedRelayedNodeInfo::new(
node_info2.clone(),
@ -120,7 +121,7 @@ pub async fn test_signed_node_info() {
// Test unsupported cryptosystem validation
let fake_crypto_kind: CryptoKind = FourCC::from([0, 1, 2, 3]);
let mut tksfake3: TypedKeySet =
let mut tksfake3: TypedKeyGroup =
TypedKey::new(fake_crypto_kind, PublicKey::default()).into();
let mut sigsfake3 = sni2.signatures().to_vec();
sigsfake3.push(TypedSignature::new(fake_crypto_kind, Signature::default()));

View File

@ -91,6 +91,14 @@ impl Network {
c.network.connection_initial_timeout_ms
};
if self
.network_manager()
.address_filter()
.is_punished(dial_info.address().to_ip_addr())
{
return Ok(NetworkResult::no_connection_other("punished"));
}
match dial_info.protocol_type() {
ProtocolType::UDP => {
bail!("no support for UDP protocol")
@ -132,6 +140,14 @@ impl Network {
c.network.connection_initial_timeout_ms
};
if self
.network_manager()
.address_filter()
.is_punished(dial_info.address().to_ip_addr())
{
return Ok(NetworkResult::no_connection_other("punished"));
}
match dial_info.protocol_type() {
ProtocolType::UDP => {
bail!("no support for UDP protocol")

View File

@ -17,7 +17,11 @@ impl ProtocolNetworkConnection {
_local_address: Option<SocketAddr>,
dial_info: &DialInfo,
timeout_ms: u32,
address_filter: AddressFiltter,
) -> io::Result<NetworkResult<ProtocolNetworkConnection>> {
if address_filter.is_punished(dial_info.address().to_ip_addr()) {
return Ok(NetworkResult::no_connection_other("punished"));
}
match dial_info.protocol_type() {
ProtocolType::UDP => {
panic!("UDP dial info is not supported on WASM targets");

View File

@ -71,9 +71,9 @@ pub struct BucketEntryLocalNetwork {
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct BucketEntryInner {
/// The node ids matching this bucket entry, with the cryptography versions supported by this node as the 'kind' field
validated_node_ids: TypedKeySet,
validated_node_ids: TypedKeyGroup,
/// The node ids claimed by the remote node that use cryptography versions we do not support
unsupported_node_ids: TypedKeySet,
unsupported_node_ids: TypedKeyGroup,
/// The set of envelope versions supported by the node inclusive of the requirements of any relay the node may be using
envelope_support: Vec<u8>,
/// If this node has updated it's SignedNodeInfo since our network
@ -123,7 +123,7 @@ impl BucketEntryInner {
}
/// Get all node ids
pub fn node_ids(&self) -> TypedKeySet {
pub fn node_ids(&self) -> TypedKeyGroup {
let mut node_ids = self.validated_node_ids.clone();
node_ids.add_all(&self.unsupported_node_ids);
node_ids
@ -786,8 +786,8 @@ impl BucketEntry {
let now = get_aligned_timestamp();
let inner = BucketEntryInner {
validated_node_ids: TypedKeySet::from(first_node_id),
unsupported_node_ids: TypedKeySet::new(),
validated_node_ids: TypedKeyGroup::from(first_node_id),
unsupported_node_ids: TypedKeyGroup::new(),
envelope_support: Vec::new(),
updated_since_last_network_change: false,
last_connections: BTreeMap::new(),

View File

@ -82,9 +82,9 @@ pub struct RoutingTableUnlockedInner {
network_manager: NetworkManager,
/// The current node's public DHT keys
node_id: TypedKeySet,
node_id: TypedKeyGroup,
/// The current node's public DHT secrets
node_id_secret: TypedSecretSet,
node_id_secret: TypedSecretGroup,
/// Buckets to kick on our next kick task
kick_queue: Mutex<BTreeSet<BucketIndex>>,
/// Background process for computing statistics
@ -131,7 +131,7 @@ impl RoutingTableUnlockedInner {
self.node_id_secret.get(kind).unwrap().value
}
pub fn node_ids(&self) -> TypedKeySet {
pub fn node_ids(&self) -> TypedKeyGroup {
self.node_id.clone()
}
@ -648,7 +648,7 @@ impl RoutingTable {
inner.get_all_nodes(self.clone(), cur_ts)
}
fn queue_bucket_kicks(&self, node_ids: TypedKeySet) {
fn queue_bucket_kicks(&self, node_ids: TypedKeyGroup) {
for node_id in node_ids.iter() {
// Skip node ids we didn't add to buckets
if !VALID_CRYPTO_KINDS.contains(&node_id.kind) {

View File

@ -106,7 +106,7 @@ pub trait NodeRefBase: Sized {
fn routing_table(&self) -> RoutingTable {
self.common().routing_table.clone()
}
fn node_ids(&self) -> TypedKeySet {
fn node_ids(&self) -> TypedKeyGroup {
self.operate(|_rti, e| e.node_ids())
}
fn best_node_id(&self) -> TypedKey {

View File

@ -59,8 +59,8 @@ impl RouteSetSpecDetail {
pub fn get_route_by_key_mut(&mut self, key: &PublicKey) -> Option<&mut RouteSpecDetail> {
self.route_set.get_mut(key)
}
pub fn get_route_set_keys(&self) -> TypedKeySet {
let mut tks = TypedKeySet::new();
pub fn get_route_set_keys(&self) -> TypedKeyGroup {
let mut tks = TypedKeyGroup::new();
for (k, v) in &self.route_set {
tks.add(TypedKey::new(v.crypto_kind, *k));
}

View File

@ -117,14 +117,14 @@ impl RouteSpecStoreCache {
}
/// calculate how many times a node with a particular node id set has been used anywhere in the path of our allocated routes
pub fn get_used_node_count(&self, node_ids: &TypedKeySet) -> usize {
pub fn get_used_node_count(&self, node_ids: &TypedKeyGroup) -> usize {
node_ids.iter().fold(0usize, |acc, k| {
acc + self.used_nodes.get(&k.value).cloned().unwrap_or_default()
})
}
/// calculate how many times a node with a particular node id set has been used at the end of the path of our allocated routes
pub fn get_used_end_node_count(&self, node_ids: &TypedKeySet) -> usize {
pub fn get_used_end_node_count(&self, node_ids: &TypedKeyGroup) -> usize {
node_ids.iter().fold(0usize, |acc, k| {
acc + self
.used_end_nodes

View File

@ -658,7 +658,7 @@ impl RoutingTableInner {
fn create_node_ref<F>(
&mut self,
outer_self: RoutingTable,
node_ids: &TypedKeySet,
node_ids: &TypedKeyGroup,
update_func: F,
) -> EyreResult<NodeRef>
where
@ -873,7 +873,7 @@ impl RoutingTableInner {
descriptor: ConnectionDescriptor,
timestamp: Timestamp,
) -> EyreResult<NodeRef> {
let nr = self.create_node_ref(outer_self, &TypedKeySet::from(node_id), |_rti, e| {
let nr = self.create_node_ref(outer_self, &TypedKeyGroup::from(node_id), |_rti, e| {
// this node is live because it literally just connected to us
e.touch_last_seen(timestamp);
})?;

View File

@ -7,7 +7,7 @@ pub const BOOTSTRAP_TXT_VERSION_0: u8 = 0;
#[derive(Clone, Debug)]
pub struct BootstrapRecord {
node_ids: TypedKeySet,
node_ids: TypedKeyGroup,
envelope_support: Vec<u8>,
dial_info_details: Vec<DialInfoDetail>,
}
@ -63,7 +63,7 @@ impl RoutingTable {
envelope_support.sort();
// Node Id
let mut node_ids = TypedKeySet::new();
let mut node_ids = TypedKeyGroup::new();
for node_id_str in records[2].split(",") {
let node_id_str = node_id_str.trim();
let node_id = match TypedKey::from_str(&node_id_str) {

View File

@ -83,7 +83,7 @@ pub async fn test_routingtable_buckets_round_trip() {
}
pub async fn test_round_trip_peerinfo() {
let mut tks = TypedKeySet::new();
let mut tks = TypedKeyGroup::new();
tks.add(TypedKey::new(
CRYPTO_KIND_VLD0,
CryptoKey::new([

View File

@ -5,12 +5,12 @@ use super::*;
)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct PeerInfo {
node_ids: TypedKeySet,
node_ids: TypedKeyGroup,
signed_node_info: SignedNodeInfo,
}
impl PeerInfo {
pub fn new(node_ids: TypedKeySet, signed_node_info: SignedNodeInfo) -> Self {
pub fn new(node_ids: TypedKeyGroup, signed_node_info: SignedNodeInfo) -> Self {
assert!(node_ids.len() > 0 && node_ids.len() <= MAX_CRYPTO_KINDS);
Self {
node_ids,
@ -27,13 +27,13 @@ impl PeerInfo {
Ok(())
}
pub fn node_ids(&self) -> &TypedKeySet {
pub fn node_ids(&self) -> &TypedKeyGroup {
&self.node_ids
}
pub fn signed_node_info(&self) -> &SignedNodeInfo {
&self.signed_node_info
}
pub fn destructure(self) -> (TypedKeySet, SignedNodeInfo) {
pub fn destructure(self) -> (TypedKeyGroup, SignedNodeInfo) {
(self.node_ids, self.signed_node_info)
}

View File

@ -22,7 +22,11 @@ impl SignedDirectNodeInfo {
}
}
pub fn validate(&self, node_ids: &TypedKeySet, crypto: Crypto) -> VeilidAPIResult<TypedKeySet> {
pub fn validate(
&self,
node_ids: &TypedKeyGroup,
crypto: Crypto,
) -> VeilidAPIResult<TypedKeyGroup> {
let node_info_bytes = Self::make_signature_bytes(&self.node_info, self.timestamp)?;
// Verify the signatures that we can

View File

@ -10,7 +10,11 @@ pub enum SignedNodeInfo {
}
impl SignedNodeInfo {
pub fn validate(&self, node_ids: &TypedKeySet, crypto: Crypto) -> VeilidAPIResult<TypedKeySet> {
pub fn validate(
&self,
node_ids: &TypedKeyGroup,
crypto: Crypto,
) -> VeilidAPIResult<TypedKeyGroup> {
match self {
SignedNodeInfo::Direct(d) => d.validate(node_ids, crypto),
SignedNodeInfo::Relayed(r) => r.validate(node_ids, crypto),
@ -36,9 +40,9 @@ impl SignedNodeInfo {
SignedNodeInfo::Relayed(r) => &r.node_info(),
}
}
pub fn relay_ids(&self) -> TypedKeySet {
pub fn relay_ids(&self) -> TypedKeyGroup {
match self {
SignedNodeInfo::Direct(_) => TypedKeySet::new(),
SignedNodeInfo::Direct(_) => TypedKeyGroup::new(),
SignedNodeInfo::Relayed(r) => r.relay_ids().clone(),
}
}

View File

@ -7,7 +7,7 @@ use super::*;
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct SignedRelayedNodeInfo {
node_info: NodeInfo,
relay_ids: TypedKeySet,
relay_ids: TypedKeyGroup,
relay_info: SignedDirectNodeInfo,
timestamp: Timestamp,
signatures: Vec<TypedSignature>,
@ -19,7 +19,7 @@ impl SignedRelayedNodeInfo {
/// All signatures are stored however, as this can be passed to other nodes that may be able to validate those signatures.
pub fn new(
node_info: NodeInfo,
relay_ids: TypedKeySet,
relay_ids: TypedKeyGroup,
relay_info: SignedDirectNodeInfo,
timestamp: Timestamp,
signatures: Vec<TypedSignature>,
@ -33,7 +33,11 @@ impl SignedRelayedNodeInfo {
}
}
pub fn validate(&self, node_ids: &TypedKeySet, crypto: Crypto) -> VeilidAPIResult<TypedKeySet> {
pub fn validate(
&self,
node_ids: &TypedKeyGroup,
crypto: Crypto,
) -> VeilidAPIResult<TypedKeyGroup> {
// Ensure the relay info for the node has a superset of the crypto kinds of the node it is relaying
if common_crypto_kinds(
self.node_info.crypto_support(),
@ -64,7 +68,7 @@ impl SignedRelayedNodeInfo {
crypto: Crypto,
typed_key_pairs: Vec<TypedKeyPair>,
node_info: NodeInfo,
relay_ids: TypedKeySet,
relay_ids: TypedKeyGroup,
relay_info: SignedDirectNodeInfo,
) -> VeilidAPIResult<Self> {
let timestamp = get_aligned_timestamp();
@ -128,7 +132,7 @@ impl SignedRelayedNodeInfo {
pub fn timestamp(&self) -> Timestamp {
self.timestamp
}
pub fn relay_ids(&self) -> &TypedKeySet {
pub fn relay_ids(&self) -> &TypedKeyGroup {
&self.relay_ids
}
pub fn relay_info(&self) -> &SignedDirectNodeInfo {

View File

@ -91,11 +91,6 @@ impl RPCOperationGetValueA {
if peers.len() > MAX_GET_VALUE_A_PEERS_LEN {
return Err(RPCError::protocol("GetValueA peers length too long"));
}
if descriptor.is_some() && !value.is_some() {
return Err(RPCError::protocol(
"GetValueA should not return descriptor without value",
));
}
Ok(Self {
value,
peers,
@ -144,11 +139,6 @@ impl RPCOperationGetValueA {
get_value_context.vcrypto.clone(),
)
.map_err(RPCError::protocol)?;
} else {
// No value, should not have descriptor
if self.descriptor.is_some() {
return Err(RPCError::protocol("descriptor returned without a value"));
}
}
PeerInfo::validate_vec(&mut self.peers, validate_context.crypto.clone());

View File

@ -36,7 +36,7 @@ pub fn decode_peer_info(reader: &veilid_capnp::peer_info::Reader) -> Result<Peer
.reborrow()
.get_signed_node_info()
.map_err(RPCError::protocol)?;
let mut node_ids = TypedKeySet::with_capacity(nids_reader.len() as usize);
let mut node_ids = TypedKeyGroup::with_capacity(nids_reader.len() as usize);
for nid_reader in nids_reader.iter() {
node_ids.add(decode_typed_key(&nid_reader)?);
}

View File

@ -69,7 +69,7 @@ pub fn decode_signed_relayed_node_info(
if rid_count > MAX_CRYPTO_KINDS {
return Err(RPCError::protocol("too many relay ids"));
}
let mut relay_ids = TypedKeySet::with_capacity(rid_count);
let mut relay_ids = TypedKeyGroup::with_capacity(rid_count);
for rid_reader in rids_reader {
let relay_id = decode_typed_key(&rid_reader)?;
relay_ids.add(relay_id);

View File

@ -5,7 +5,7 @@ where
R: Unpin,
{
closest_nodes: Vec<NodeRef>,
called_nodes: TypedKeySet,
called_nodes: HashSet<TypedKey>,
result: Option<Result<R, RPCError>>,
}
@ -62,7 +62,7 @@ where
) -> Arc<Self> {
let context = Mutex::new(FanoutContext {
closest_nodes: Vec::with_capacity(node_count),
called_nodes: TypedKeySet::new(),
called_nodes: HashSet::new(),
result: None,
});
@ -125,7 +125,7 @@ where
if !ctx.called_nodes.contains(&key) {
// New fanout call candidate found
next_node = Some(cn.clone());
ctx.called_nodes.add(key);
ctx.called_nodes.insert(key);
break;
}
}

View File

@ -74,6 +74,7 @@ impl RPCProcessor {
vcrypto: vcrypto.clone(),
});
#[cfg(feature="debug-dht")]
log_rpc!(debug "{}", debug_string);
let waitable_reply = network_result_try!(
@ -100,8 +101,9 @@ impl RPCProcessor {
let (value, peers, descriptor) = get_value_a.destructure();
let debug_string_value = value.as_ref().map(|v| {
format!(" len={} writer={}",
format!(" len={} seq={} writer={}",
v.value_data().data().len(),
v.value_data().seq(),
v.value_data().writer(),
)
}).unwrap_or_default();
@ -209,28 +211,32 @@ impl RPCProcessor {
.await
.map_err(RPCError::internal)?);
let debug_string_value = subkey_result.value.as_ref().map(|v| {
format!(" len={} writer={}",
v.value_data().data().len(),
v.value_data().writer(),
)
}).unwrap_or_default();
#[cfg(feature="debug-dht")]
{
let debug_string_value = subkey_result.value.as_ref().map(|v| {
format!(" len={} seq={} writer={}",
v.value_data().data().len(),
v.value_data().seq(),
v.value_data().writer(),
)
}).unwrap_or_default();
let debug_string_answer = format!(
"IN ===> GetValueA({} #{}{}{} peers={}) ==> {}",
key,
subkey,
debug_string_value,
if subkey_result.descriptor.is_some() {
" +desc"
} else {
""
},
closer_to_key_peers.len(),
msg.header.direct_sender_node_id()
);
log_rpc!(debug "{}", debug_string_answer);
let debug_string_answer = format!(
"IN ===> GetValueA({} #{}{}{} peers={}) ==> {}",
key,
subkey,
debug_string_value,
if subkey_result.descriptor.is_some() {
" +desc"
} else {
""
},
closer_to_key_peers.len(),
msg.header.direct_sender_node_id()
);
log_rpc!(debug "{}", debug_string_answer);
}
// Make GetValue answer
let get_value_a = RPCOperationGetValueA::new(

View File

@ -88,6 +88,7 @@ impl RPCProcessor {
vcrypto: vcrypto.clone(),
});
#[cfg(feature="debug-dht")]
log_rpc!(debug "{}", debug_string);
let waitable_reply = network_result_try!(
@ -202,10 +203,11 @@ impl RPCProcessor {
let closer_to_key_peers = network_result_try!(routing_table.find_peers_closer_to_key(key));
let debug_string = format!(
"IN <=== SetValueQ({} #{} len={} writer={}{}) <== {}",
"IN <=== SetValueQ({} #{} len={} seq={} writer={}{}) <== {}",
key,
subkey,
value.value_data().data().len(),
value.value_data().seq(),
value.value_data().writer(),
if descriptor.is_some() {
" +desc"
@ -238,28 +240,32 @@ impl RPCProcessor {
(true, new_value)
};
let debug_string_value = new_value.as_ref().map(|v| {
format!(" len={} writer={}",
v.value_data().data().len(),
v.value_data().writer(),
)
}).unwrap_or_default();
#[cfg(feature="debug-dht")]
{
let debug_string_value = new_value.as_ref().map(|v| {
format!(" len={} seq={} writer={}",
v.value_data().data().len(),
v.value_data().seq(),
v.value_data().writer(),
)
}).unwrap_or_default();
let debug_string_answer = format!(
"IN ===> SetValueA({} #{}{}{} peers={}) ==> {}",
key,
subkey,
if set {
" +set"
} else {
""
},
debug_string_value,
closer_to_key_peers.len(),
msg.header.direct_sender_node_id()
);
log_rpc!(debug "{}", debug_string_answer);
let debug_string_answer = format!(
"IN ===> SetValueA({} #{}{}{} peers={}) ==> {}",
key,
subkey,
if set {
" +set"
} else {
""
},
debug_string_value,
closer_to_key_peers.len(),
msg.header.direct_sender_node_id()
);
log_rpc!(debug "{}", debug_string_answer);
}
// Make SetValue answer
let set_value_a = RPCOperationSetValueA::new(set, new_value, closer_to_key_peers)?;

View File

@ -35,4 +35,44 @@ impl StorageManager {
.await;
return format!("Remote records purged: reclaimed {} bytes", reclaimed);
}
pub(crate) async fn debug_local_record_subkey_info(
&self,
key: TypedKey,
subkey: ValueSubkey,
) -> String {
let inner = self.inner.lock().await;
let Some(local_record_store) = &inner.local_record_store else {
return "not initialized".to_owned();
};
local_record_store
.debug_record_subkey_info(key, subkey)
.await
}
pub(crate) async fn debug_remote_record_subkey_info(
&self,
key: TypedKey,
subkey: ValueSubkey,
) -> String {
let inner = self.inner.lock().await;
let Some(remote_record_store) = &inner.remote_record_store else {
return "not initialized".to_owned();
};
remote_record_store
.debug_record_subkey_info(key, subkey)
.await
}
pub(crate) async fn debug_local_record_info(&self, key: TypedKey) -> String {
let inner = self.inner.lock().await;
let Some(local_record_store) = &inner.local_record_store else {
return "not initialized".to_owned();
};
local_record_store.debug_record_info(key)
}
pub(crate) async fn debug_remote_record_info(&self, key: TypedKey) -> String {
let inner = self.inner.lock().await;
let Some(remote_record_store) = &inner.remote_record_store else {
return "not initialized".to_owned();
};
remote_record_store.debug_record_info(key)
}
}

View File

@ -82,12 +82,14 @@ impl StorageManager {
// Keep the value if we got one and it is newer and it passes schema validation
if let Some(value) = gva.answer.value {
log_stor!(debug "Got value back: len={}", value.value_data().data().len());
let mut ctx = context.lock();
// Ensure we have a schema and descriptor
let (Some(descriptor), Some(schema)) = (&ctx.descriptor, &ctx.schema) else {
// Got a value but no descriptor for it
// Move to the next node
log_stor!(debug "Got value with no descriptor");
return Ok(None);
};
@ -99,6 +101,7 @@ impl StorageManager {
) {
// Validation failed, ignore this value
// Move to the next node
log_stor!(debug "Schema validation failed on subkey {}", subkey);
return Ok(None);
}
@ -118,15 +121,22 @@ impl StorageManager {
} else if new_seq > prior_seq {
// If the sequence number is greater, start over with the new value
ctx.value = Some(value);
// One node has show us this value so far
// One node has shown us this value so far
ctx.value_count = 1;
} else {
// If the sequence number is older, ignore it
}
}
else {
// If we have no prior value, keep it
ctx.value = Some(value);
// One node has shown us this value so far
ctx.value_count = 1;
}
}
// Return peers if we have some
log_stor!(debug "Fanout call returned peers {}", gva.answer.peers.len());
Ok(Some(gva.answer.peers))
}
};

View File

@ -144,6 +144,7 @@ impl StorageManager {
// The initial writer is the owner of the record
inner
.open_existing_record(key, Some(owner), safety_selection)
.await
.map(|r| r.unwrap())
}
@ -159,7 +160,10 @@ impl StorageManager {
let mut inner = self.lock().await?;
// See if we have a local record already or not
if let Some(res) = inner.open_existing_record(key, writer, safety_selection)? {
if let Some(res) = inner
.open_existing_record(key, writer, safety_selection)
.await?
{
return Ok(res);
}
@ -338,8 +342,14 @@ impl StorageManager {
let schema = descriptor.schema()?;
// Make new subkey data
let value_data = if let Some(signed_value_data) = last_subkey_result.value {
let seq = signed_value_data.value_data().seq();
let value_data = if let Some(last_signed_value_data) = last_subkey_result.value {
if last_signed_value_data.value_data().data() == &data
&& last_signed_value_data.value_data().writer() == &writer.key
{
// Data and writer is the name, nothing is changing, just return the same ValueData
return Ok(Some(last_signed_value_data.into_value_data()));
}
let seq = last_signed_value_data.value_data().seq();
ValueData::new_with_seq(seq + 1, data, writer.key)
} else {
ValueData::new(data, writer.key)

View File

@ -9,7 +9,7 @@ use hashlink::LruCache;
pub struct RecordStore<D>
where
D: Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
D: fmt::Debug + Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, VeilidSharedDeserializeMap>,
{
@ -41,7 +41,7 @@ pub struct SubkeyResult {
impl<D> RecordStore<D>
where
D: Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
D: fmt::Debug + Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, VeilidSharedDeserializeMap>,
{
@ -363,6 +363,20 @@ where
out
}
pub(super) fn peek_record<R, F>(&self, key: TypedKey, f: F) -> Option<R>
where
F: FnOnce(&Record<D>) -> R,
{
// Get record from index
let mut out = None;
let rtk = RecordTableKey { key };
if let Some(record) = self.record_index.peek(&rtk) {
// Callback
out = Some(f(record));
}
out
}
pub(super) fn with_record_mut<R, F>(&mut self, key: TypedKey, f: F) -> Option<R>
where
F: FnOnce(&mut Record<D>) -> R,
@ -454,6 +468,69 @@ where
}));
}
pub(crate) async fn peek_subkey(
&self,
key: TypedKey,
subkey: ValueSubkey,
want_descriptor: bool,
) -> VeilidAPIResult<Option<SubkeyResult>> {
// record from index
let Some((subkey_count, has_subkey, opt_descriptor)) = self.peek_record(key, |record| {
(record.subkey_count(), record.stored_subkeys().contains(subkey), if want_descriptor {
Some(record.descriptor().clone())
} else {
None
})
}) else {
// Record not available
return Ok(None);
};
// Check if the subkey is in range
if subkey as usize >= subkey_count {
apibail_invalid_argument!("subkey out of range", "subkey", subkey);
}
// See if we have this subkey stored
if !has_subkey {
// If not, return no value but maybe with descriptor
return Ok(Some(SubkeyResult {
value: None,
descriptor: opt_descriptor,
}));
}
// Get subkey table
let Some(subkey_table) = self.subkey_table.clone() else {
apibail_internal!("record store not initialized");
};
// If subkey exists in subkey cache, use that
let stk = SubkeyTableKey { key, subkey };
if let Some(record_data) = self.subkey_cache.peek(&stk) {
let out = record_data.signed_value_data().clone();
return Ok(Some(SubkeyResult {
value: Some(out),
descriptor: opt_descriptor,
}));
}
// If not in cache, try to pull from table store if it is in our stored subkey set
let Some(record_data) = subkey_table
.load_rkyv::<RecordData>(0, &stk.bytes())
.await
.map_err(VeilidAPIError::internal)? else {
apibail_internal!("failed to peek subkey that was stored");
};
let out = record_data.signed_value_data().clone();
return Ok(Some(SubkeyResult {
value: Some(out),
descriptor: opt_descriptor,
}));
}
pub async fn set_subkey(
&mut self,
key: TypedKey,
@ -599,4 +676,23 @@ where
out
}
pub(super) fn debug_record_info(&self, key: TypedKey) -> String {
self.peek_record(key, |r| format!("{:#?}", r))
.unwrap_or("Not found".to_owned())
}
pub(super) async fn debug_record_subkey_info(
&self,
key: TypedKey,
subkey: ValueSubkey,
) -> String {
match self.peek_subkey(key, subkey, true).await {
Ok(Some(v)) => {
format!("{:#?}", v)
}
Ok(None) => "Subkey not available".to_owned(),
Err(e) => format!("{}", e),
}
}
}

View File

@ -98,7 +98,7 @@ impl StorageManager {
if new_seq > prior_seq {
// If the sequence number is greater, keep it
ctx.value = value;
// One node has show us this value so far
// One node has shown us this value so far
ctx.value_count = 1;
} else {
// If the sequence number is older, or an equal sequence number,
@ -166,13 +166,24 @@ impl StorageManager {
pub async fn inbound_set_value(&self, key: TypedKey, subkey: ValueSubkey, value: SignedValueData, descriptor: Option<SignedValueDescriptor>) -> VeilidAPIResult<NetworkResult<Option<SignedValueData>>> {
let mut inner = self.lock().await?;
// See if the subkey we are modifying has a last known local value
let last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?;
// See if this is a remote or local value
let (is_local, last_subkey_result) = {
// See if the subkey we are modifying has a last known local value
let last_subkey_result = inner.handle_get_local_value(key, subkey, true).await?;
// If this is local, it must have a descriptor already
if last_subkey_result.descriptor.is_some() {
(true, last_subkey_result)
} else {
// See if the subkey we are modifying has a last known remote value
let last_subkey_result = inner.handle_get_remote_value(key, subkey, true).await?;
(false, last_subkey_result)
}
};
// Make sure this value would actually be newer
if let Some(last_value) = &last_subkey_result.value {
if value.value_data().seq() < last_value.value_data().seq() {
// inbound value is older than the one we have, just return the one we have
if value.value_data().seq() <= last_value.value_data().seq() {
// inbound value is older or equal sequence number than the one we have, just return the one we have
return Ok(NetworkResult::value(Some(last_value.clone())));
}
}
@ -210,7 +221,12 @@ impl StorageManager {
}
// Do the set and return no new value
match inner.handle_set_remote_value(key, subkey, value, actual_descriptor).await {
let res = if is_local {
inner.handle_set_local_value(key, subkey, value).await
} else {
inner.handle_set_remote_value(key, subkey, value, actual_descriptor).await
};
match res {
Ok(()) => {},
Err(VeilidAPIError::Internal { message }) => {
apibail_internal!(message);

View File

@ -209,7 +209,57 @@ impl StorageManagerInner {
Ok((dht_key, owner))
}
pub fn open_existing_record(
async fn move_remote_record_to_local(&mut self, key: TypedKey, safety_selection: SafetySelection) -> VeilidAPIResult<Option<(PublicKey, DHTSchema)>>
{
// Get local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
// Get remote record store
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
let rcb = |r: &Record<RemoteRecordDetail>| {
// Return record details
r.clone()
};
let Some(remote_record) = remote_record_store.with_record(key, rcb) else {
// No local or remote record found, return None
return Ok(None);
};
// Make local record
let cur_ts = get_aligned_timestamp();
let local_record = Record::new(cur_ts, remote_record.descriptor().clone(), LocalRecordDetail {
safety_selection
})?;
local_record_store.new_record(key, local_record).await?;
// Move copy subkey data from remote to local store
for subkey in remote_record.stored_subkeys().iter() {
let Some(subkey_result) = remote_record_store.get_subkey(key, subkey, false).await? else {
// Subkey was missing
warn!("Subkey was missing: {} #{}",key, subkey);
continue;
};
let Some(subkey_data) = subkey_result.value else {
// Subkey was missing
warn!("Subkey data was missing: {} #{}",key, subkey);
continue;
};
local_record_store.set_subkey(key, subkey, subkey_data).await?;
}
// Delete remote record from store
remote_record_store.delete_record(key).await?;
// Return record information as transferred to local record
Ok(Some((remote_record.owner().clone(), remote_record.schema())))
}
pub async fn open_existing_record(
&mut self,
key: TypedKey,
writer: Option<KeyPair>,
@ -235,8 +285,17 @@ impl StorageManagerInner {
// Return record details
(r.owner().clone(), r.schema())
};
let Some((owner, schema)) = local_record_store.with_record_mut(key, cb) else {
return Ok(None);
let (owner, schema) = match local_record_store.with_record_mut(key, cb){
Some(v) => v,
None => {
// If we don't have a local record yet, check to see if we have a remote record
// if so, migrate it to a local record
let Some(v) = self.move_remote_record_to_local(key, safety_selection).await? else {
// No remote record either
return Ok(None);
};
v
}
};
// Had local record
@ -424,7 +483,7 @@ impl StorageManagerInner {
/// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ]
fn get_key<D>(vcrypto: CryptoSystemVersion, record: &Record<D>) -> TypedKey
where
D: Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
D: fmt::Debug + Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, VeilidSharedDeserializeMap>,
{

View File

@ -6,7 +6,7 @@ use super::*;
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct Record<D>
where
D: Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
D: fmt::Debug + Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, VeilidSharedDeserializeMap>,
{
@ -20,7 +20,7 @@ where
impl<D> Record<D>
where
D: Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
D: fmt::Debug + Clone + RkyvArchive + RkyvSerialize<DefaultVeilidRkyvSerializer>,
for<'t> <D as RkyvArchive>::Archived: CheckBytes<RkyvDefaultValidator<'t>>,
<D as RkyvArchive>::Archived: RkyvDeserialize<D, VeilidSharedDeserializeMap>,
{

View File

@ -5,7 +5,6 @@ use super::*;
#[derive(
Clone,
Debug,
PartialOrd,
PartialEq,
Eq,
@ -79,3 +78,13 @@ impl SignedValueDescriptor {
self.schema_data.cmp(&other.schema_data)
}
}
impl fmt::Debug for SignedValueDescriptor {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("SignedValueDescriptor")
.field("owner", &self.owner)
.field("schema_data", &format!("{:?}", &self.schema_data))
.field("signature", &self.signature)
.finish()
}
}

View File

@ -197,8 +197,8 @@ fn config_callback(key: String) -> ConfigCallbackReturn {
"network.reverse_connection_receipt_time_ms" => Ok(Box::new(5_000u32)),
"network.hole_punch_receipt_time_ms" => Ok(Box::new(5_000u32)),
"network.network_key_password" => Ok(Box::new(Option::<String>::None)),
"network.routing_table.node_id" => Ok(Box::new(TypedKeySet::new())),
"network.routing_table.node_id_secret" => Ok(Box::new(TypedSecretSet::new())),
"network.routing_table.node_id" => Ok(Box::new(TypedKeyGroup::new())),
"network.routing_table.node_id_secret" => Ok(Box::new(TypedSecretGroup::new())),
"network.routing_table.bootstrap" => Ok(Box::new(Vec::<String>::new())),
"network.routing_table.limit_over_attached" => Ok(Box::new(64u32)),
"network.routing_table.limit_fully_attached" => Ok(Box::new(32u32)),

View File

@ -31,6 +31,19 @@ fn get_string(text: &str) -> Option<String> {
Some(text.to_owned())
}
fn get_data(text: &str) -> Option<Vec<u8>> {
if text.starts_with("#") {
hex::decode(&text[1..]).ok()
} else if text.starts_with("\"") || text.starts_with("'") {
json::parse(text)
.ok()?
.as_str()
.map(|x| x.to_owned().as_bytes().to_vec())
} else {
Some(text.as_bytes().to_vec())
}
}
fn get_subkeys(text: &str) -> Option<ValueSubkeyRangeSet> {
if let Some(n) = get_number(text) {
Some(ValueSubkeyRangeSet::single(n.try_into().ok()?))
@ -88,44 +101,50 @@ fn get_route_id(
};
}
fn get_safety_selection(text: &str, routing_table: RoutingTable) -> Option<SafetySelection> {
let rss = routing_table.route_spec_store();
let default_route_hop_count =
routing_table.with_config(|c| c.network.rpc.default_route_hop_count as usize);
fn get_dht_schema(text: &str) -> Option<DHTSchema> {
deserialize_json::<DHTSchema>(text).ok()
}
if text.len() != 0 && &text[0..1] == "-" {
// Unsafe
let text = &text[1..];
let seq = get_sequencing(text).unwrap_or_default();
Some(SafetySelection::Unsafe(seq))
} else {
// Safe
let mut preferred_route = None;
let mut hop_count = default_route_hop_count;
let mut stability = Stability::default();
let mut sequencing = Sequencing::default();
for x in text.split(",") {
let x = x.trim();
if let Some(pr) = get_route_id(rss.clone(), true, false)(x) {
preferred_route = Some(pr)
}
if let Some(n) = get_number(x) {
hop_count = n;
}
if let Some(s) = get_stability(x) {
stability = s;
}
if let Some(s) = get_sequencing(x) {
sequencing = s;
fn get_safety_selection(routing_table: RoutingTable) -> impl Fn(&str) -> Option<SafetySelection> {
move |text| {
let rss = routing_table.route_spec_store();
let default_route_hop_count =
routing_table.with_config(|c| c.network.rpc.default_route_hop_count as usize);
if text.len() != 0 && &text[0..1] == "-" {
// Unsafe
let text = &text[1..];
let seq = get_sequencing(text).unwrap_or_default();
Some(SafetySelection::Unsafe(seq))
} else {
// Safe
let mut preferred_route = None;
let mut hop_count = default_route_hop_count;
let mut stability = Stability::default();
let mut sequencing = Sequencing::default();
for x in text.split(",") {
let x = x.trim();
if let Some(pr) = get_route_id(rss.clone(), true, false)(x) {
preferred_route = Some(pr)
}
if let Some(n) = get_number(x) {
hop_count = n;
}
if let Some(s) = get_stability(x) {
stability = s;
}
if let Some(s) = get_sequencing(x) {
sequencing = s;
}
}
let ss = SafetySpec {
preferred_route,
hop_count,
stability,
sequencing,
};
Some(SafetySelection::Safe(ss))
}
let ss = SafetySpec {
preferred_route,
hop_count,
stability,
sequencing,
};
Some(SafetySelection::Safe(ss))
}
}
@ -150,7 +169,7 @@ fn get_destination(routing_table: RoutingTable) -> impl FnOnce(&str) -> Option<D
move |text| {
// Safety selection
let (text, ss) = if let Some((first, second)) = text.split_once('+') {
let ss = get_safety_selection(second, routing_table.clone())?;
let ss = get_safety_selection(routing_table.clone())(second)?;
(first, Some(ss))
} else {
(text, None)
@ -234,6 +253,44 @@ fn get_typed_key(text: &str) -> Option<TypedKey> {
fn get_public_key(text: &str) -> Option<PublicKey> {
PublicKey::from_str(text).ok()
}
fn get_keypair(text: &str) -> Option<KeyPair> {
KeyPair::from_str(text).ok()
}
fn get_crypto_system_version(crypto: Crypto) -> impl FnOnce(&str) -> Option<CryptoSystemVersion> {
move |text| {
let kindstr = get_string(text)?;
let kind = CryptoKind::from_str(&kindstr).ok()?;
crypto.get(kind)
}
}
fn get_dht_key(
routing_table: RoutingTable,
) -> impl FnOnce(&str) -> Option<(TypedKey, Option<SafetySelection>)> {
move |text| {
// Safety selection
let (text, ss) = if let Some((first, second)) = text.split_once('+') {
let ss = get_safety_selection(routing_table.clone())(second)?;
(first, Some(ss))
} else {
(text, None)
};
if text.len() == 0 {
return None;
}
let key = if let Some(key) = get_public_key(text) {
TypedKey::new(best_crypto_kind(), key)
} else if let Some(key) = get_typed_key(text) {
key
} else {
return None;
};
Some((key, ss))
}
}
fn get_node_ref(routing_table: RoutingTable) -> impl FnOnce(&str) -> Option<NodeRef> {
move |text| {
@ -354,6 +411,40 @@ fn get_debug_argument_at<T, G: FnOnce(&str) -> Option<T>>(
Ok(val)
}
pub fn print_data(data: &[u8], truncate_len: Option<usize>) -> String {
// check is message body is ascii printable
let mut printable = true;
for c in data {
if *c < 32 || *c > 126 {
printable = false;
break;
}
}
let (data, truncated) = if truncate_len.is_some() && data.len() > truncate_len.unwrap() {
(&data[0..64], true)
} else {
(&data[..], false)
};
let strdata = if printable {
format!("{}", String::from_utf8_lossy(&data).to_string())
} else {
let sw = shell_words::quote(&String::from_utf8_lossy(&data).to_string()).to_string();
let h = hex::encode(data);
if h.len() < sw.len() {
h
} else {
sw
}
};
if truncated {
format!("{}...", strdata)
} else {
strdata
}
}
impl VeilidAPI {
async fn debug_buckets(&self, args: String) -> VeilidAPIResult<String> {
let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect();
@ -383,6 +474,24 @@ impl VeilidAPI {
Ok(routing_table.debug_info_txtrecord().await)
}
async fn debug_keypair(&self, args: String) -> VeilidAPIResult<String> {
let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect();
let crypto = self.crypto()?;
let vcrypto = get_debug_argument_at(
&args,
0,
"debug_keypair",
"kind",
get_crypto_system_version(crypto.clone()),
)
.unwrap_or_else(|_| crypto.best());
// Generate a keypair
let out = TypedKeyPair::new(vcrypto.kind(), vcrypto.generate_keypair()).to_string();
Ok(out)
}
async fn debug_entries(&self, args: String) -> VeilidAPIResult<String> {
let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect();
@ -922,20 +1031,244 @@ impl VeilidAPI {
};
return Ok(out);
}
async fn debug_record_create(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let netman = self.network_manager()?;
let routing_table = netman.routing_table();
let crypto = self.crypto()?;
let csv = get_debug_argument_at(
&args,
1,
"debug_record_create",
"kind",
get_crypto_system_version(crypto.clone()),
)
.unwrap_or_else(|_| crypto.best());
let schema = get_debug_argument_at(
&args,
2,
"debug_record_create",
"dht_schema",
get_dht_schema,
)
.unwrap_or_else(|_| DHTSchema::dflt(1));
let ss = get_debug_argument_at(
&args,
3,
"debug_record_create",
"safety_selection",
get_safety_selection(routing_table),
)
.ok();
// Get routing context with optional privacy
let rc = self.routing_context();
let rc = if let Some(ss) = ss {
let rcp = match rc.with_custom_privacy(ss) {
Err(e) => return Ok(format!("Can't use safety selection: {}", e)),
Ok(v) => v,
};
rcp
} else {
rc
};
// Do a record get
let record = match rc.create_dht_record(csv.kind(), schema).await {
Err(e) => return Ok(format!("Can't open DHT record: {}", e)),
Ok(v) => v,
};
match rc.close_dht_record(*record.key()).await {
Err(e) => return Ok(format!("Can't close DHT record: {}", e)),
Ok(v) => v,
};
debug!("DHT Record Created:\n{:#?}", record);
return Ok(format!("{:?}", record));
}
async fn debug_record_get(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let netman = self.network_manager()?;
let routing_table = netman.routing_table();
let (key, ss) = get_debug_argument_at(
&args,
1,
"debug_record_get",
"key",
get_dht_key(routing_table),
)?;
let subkey = get_debug_argument_at(&args, 2, "debug_record_get", "subkey", get_number)?;
let force_refresh = if args.len() >= 4 {
Some(get_debug_argument_at(
&args,
3,
"debug_record_get",
"force_refresh",
get_string,
)?)
} else {
None
};
let force_refresh = if let Some(force_refresh) = force_refresh {
if &force_refresh == "force" {
true
} else {
return Ok(format!("Unknown force: {}", force_refresh));
}
} else {
false
};
// Get routing context with optional privacy
let rc = self.routing_context();
let rc = if let Some(ss) = ss {
let rcp = match rc.with_custom_privacy(ss) {
Err(e) => return Ok(format!("Can't use safety selection: {}", e)),
Ok(v) => v,
};
rcp
} else {
rc
};
// Do a record get
let _record = match rc.open_dht_record(key, None).await {
Err(e) => return Ok(format!("Can't open DHT record: {}", e)),
Ok(v) => v,
};
let value = match rc
.get_dht_value(key, subkey as ValueSubkey, force_refresh)
.await
{
Err(e) => {
match rc.close_dht_record(key).await {
Err(e) => {
return Ok(format!(
"Can't get DHT value and can't close DHT record: {}",
e
))
}
Ok(v) => v,
};
return Ok(format!("Can't get DHT value: {}", e));
}
Ok(v) => v,
};
let out = if let Some(value) = value {
format!("{:?}", value)
} else {
"No value data returned".to_owned()
};
match rc.close_dht_record(key).await {
Err(e) => return Ok(format!("Can't close DHT record: {}", e)),
Ok(v) => v,
};
return Ok(out);
}
async fn debug_record_set(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let netman = self.network_manager()?;
let routing_table = netman.routing_table();
let (key, ss) = get_debug_argument_at(
&args,
1,
"debug_record_set",
"key",
get_dht_key(routing_table),
)?;
let subkey = get_debug_argument_at(&args, 2, "debug_record_set", "subkey", get_number)?;
let writer = get_debug_argument_at(&args, 3, "debug_record_set", "writer", get_keypair)?;
let data = get_debug_argument_at(&args, 4, "debug_record_set", "data", get_data)?;
// Get routing context with optional privacy
let rc = self.routing_context();
let rc = if let Some(ss) = ss {
let rcp = match rc.with_custom_privacy(ss) {
Err(e) => return Ok(format!("Can't use safety selection: {}", e)),
Ok(v) => v,
};
rcp
} else {
rc
};
// Do a record get
let _record = match rc.open_dht_record(key, Some(writer)).await {
Err(e) => return Ok(format!("Can't open DHT record: {}", e)),
Ok(v) => v,
};
let value = match rc.set_dht_value(key, subkey as ValueSubkey, data).await {
Err(e) => {
match rc.close_dht_record(key).await {
Err(e) => {
return Ok(format!(
"Can't set DHT value and can't close DHT record: {}",
e
))
}
Ok(v) => v,
};
return Ok(format!("Can't set DHT value: {}", e));
}
Ok(v) => v,
};
let out = if let Some(value) = value {
format!("{:?}", value)
} else {
"No value data returned".to_owned()
};
match rc.close_dht_record(key).await {
Err(e) => return Ok(format!("Can't close DHT record: {}", e)),
Ok(v) => v,
};
return Ok(out);
}
async fn debug_record_delete(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let key = get_debug_argument_at(&args, 1, "debug_record_delete", "key", get_typed_key)?;
// Do a record delete
let rc = self.routing_context();
match rc.delete_dht_record(key).await {
Err(e) => return Ok(format!("Can't delete DHT record: {}", e)),
Ok(v) => v,
};
Ok(format!("DHT record deleted"))
}
async fn debug_record_info(&self, args: Vec<String>) -> VeilidAPIResult<String> {
let storage_manager = self.storage_manager()?;
let key = get_debug_argument_at(&args, 1, "debug_record_get", "key", get_typed_key)?;
let subkeys =
get_debug_argument_at(&args, 2, "debug_record_subkeys", "subkeys", get_string)?;
let key = get_debug_argument_at(&args, 1, "debug_record_info", "key", get_typed_key)?;
// let rc = self.routing_context();
let subkey =
get_debug_argument_at(&args, 2, "debug_record_info", "subkey", get_number).ok();
return Ok("TODO".to_owned());
let out = if let Some(subkey) = subkey {
let li = storage_manager
.debug_local_record_subkey_info(key, subkey as ValueSubkey)
.await;
let ri = storage_manager
.debug_remote_record_subkey_info(key, subkey as ValueSubkey)
.await;
format!(
"Local Subkey Info:\n{}\n\nRemote Subkey Info:\n{}\n",
li, ri
)
} else {
let li = storage_manager.debug_local_record_info(key).await;
let ri = storage_manager.debug_remote_record_info(key).await;
format!("Local Info:\n{}\n\nRemote Info:\n{}\n", li, ri)
};
return Ok(out);
}
async fn debug_record(&self, args: String) -> VeilidAPIResult<String> {
let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect();
let args: Vec<String> =
shell_words::split(&args).map_err(|e| VeilidAPIError::parse_error(e, args))?;
let command = get_debug_argument_at(&args, 0, "debug_record", "command", get_string)?;
@ -943,55 +1276,77 @@ impl VeilidAPI {
self.debug_record_list(args).await
} else if command == "purge" {
self.debug_record_purge(args).await
} else if command == "create" {
self.debug_record_create(args).await
} else if command == "get" {
self.debug_record_get(args).await
} else if command == "set" {
self.debug_record_set(args).await
} else if command == "delete" {
self.debug_record_delete(args).await
} else if command == "info" {
self.debug_record_info(args).await
} else {
Ok(">>> Unknown command\n".to_owned())
}
}
pub async fn debug_help(&self, _args: String) -> VeilidAPIResult<String> {
Ok(r#">>> Debug commands:
help
buckets [dead|reliable]
dialinfo
entries [dead|reliable]
entry <node>
nodeinfo
config [key [new value]]
purge <buckets|connections|routes>
attach
detach
restart network
ping <destination>
contact <node>[<modifiers>]
route allocate [ord|*ord] [rel] [<count>] [in|out]
release <route>
publish <route> [full]
unpublish <route>
print <route>
list
import <blob>
test <route>
record list <local|remote>
purge <local|remote> [bytes]
get <key> <subkeys>
<destination> is:
* direct: <node>[+<safety>][<modifiers>]
* relay: <relay>@<target>[+<safety>][<modifiers>]
* private: #<id>[+<safety>]
<safety> is:
* unsafe: -[ord|*ord]
* safe: [route][,ord|*ord][,rel][,<count>]
<modifiers> is: [/<protocoltype>][/<addresstype>][/<routingdomain>]
<protocoltype> is: udp|tcp|ws|wss
<addresstype> is: ipv4|ipv6
<routingdomain> is: public|local
<subkeys> is:
* a number: 2
* a comma-separated inclusive range list: 1..=3,5..=8
"#
Ok(r#"buckets [dead|reliable]
dialinfo
entries [dead|reliable]
entry <node>
nodeinfo
config [configkey [new value]]
txtrecord
keypair
purge <buckets|connections|routes>
attach
detach
restart network
contact <node>[<modifiers>]
ping <destination>
route allocate [ord|*ord] [rel] [<count>] [in|out]
release <route>
publish <route> [full]
unpublish <route>
print <route>
list
import <blob>
test <route>
record list <local|remote>
purge <local|remote> [bytes]
create <cryptokind> <dhtschema> <safety>
set <key>[+<safety>] <subkey> <writer> <data>
get <key>[+<safety>] <subkey> [force]
delete <key>
info <key> [subkey]
--------------------------------------------------------------------
<key> is: VLD0:GsgXCRPrzSK6oBNgxhNpm-rTYFd02R0ySx6j9vbQBG4
* also <node>, <relay>, <target>, <route>
<configkey> is: dot path like network.protocol.udp.enabled
<destination> is:
* direct: <node>[+<safety>][<modifiers>]
* relay: <relay>@<target>[+<safety>][<modifiers>]
* private: #<id>[+<safety>]
<safety> is:
* unsafe: -[ord|*ord]
* safe: [route][,ord|*ord][,rel][,<count>]
<modifiers> is: [/<protocoltype>][/<addresstype>][/<routingdomain>]
<protocoltype> is: udp|tcp|ws|wss
<addresstype> is: ipv4|ipv6
<routingdomain> is: public|local
<cryptokind> is: VLD0
<dhtschema> is: a json dht schema, default is '{"kind":"DFLT","o_cnt":1}'
<subkey> is: a number: 2
<subkeys> is:
* a number: 2
* a comma-separated inclusive range list: 1..=3,5..=8
<data> is:
* a single-word string: foobar
* a shell-quoted string: "foo\nbar\n"
* a '#' followed by hex data: #12AB34CD...
"#
.to_owned())
}
@ -1013,6 +1368,8 @@ impl VeilidAPI {
self.debug_dialinfo(rest).await
} else if arg == "txtrecord" {
self.debug_txtrecord(rest).await
} else if arg == "keypair" {
self.debug_keypair(rest).await
} else if arg == "entries" {
self.debug_entries(rest).await
} else if arg == "entry" {
@ -1038,7 +1395,7 @@ impl VeilidAPI {
} else if arg == "record" {
self.debug_record(rest).await
} else {
Err(VeilidAPIError::generic("Unknown debug command"))
Err(VeilidAPIError::generic("Unknown server debug command"))
}
};
res

View File

@ -200,7 +200,7 @@ pub enum ResponseOp {
VerifySignatures {
#[serde(flatten)]
#[schemars(with = "ApiResult<Vec<String>>")]
result: ApiResultWithVecString<TypedKeySet>,
result: ApiResultWithVecString<TypedKeyGroup>,
},
GenerateSignatures {
#[serde(flatten)]

View File

@ -243,12 +243,12 @@ impl JsonRequestProcessor {
.map(|new_rc| self.add_routing_context(new_rc)),
),
},
RoutingContextRequestOp::WithCustomPrivacy { stability } => {
RoutingContextRequestOp::WithCustomPrivacy { safety_selection } => {
RoutingContextResponseOp::WithCustomPrivacy {
result: to_json_api_result(
routing_context
.clone()
.with_custom_privacy(stability)
.with_custom_privacy(safety_selection)
.map(|new_rc| self.add_routing_context(new_rc)),
),
}

View File

@ -20,7 +20,7 @@ pub enum RoutingContextRequestOp {
Release,
WithPrivacy,
WithCustomPrivacy {
stability: Stability,
safety_selection: SafetySelection,
},
WithSequencing {
sequencing: Sequencing,

View File

@ -46,24 +46,22 @@ impl RoutingContext {
}
pub fn with_privacy(self) -> VeilidAPIResult<Self> {
self.with_custom_privacy(Stability::default())
}
pub fn with_custom_privacy(self, stability: Stability) -> VeilidAPIResult<Self> {
let config = self.api.config()?;
let c = config.get();
self.with_custom_privacy(SafetySelection::Safe(SafetySpec {
preferred_route: None,
hop_count: c.network.rpc.default_route_hop_count as usize,
stability: Stability::default(),
sequencing: Sequencing::default(),
}))
}
pub fn with_custom_privacy(self, safety_selection: SafetySelection) -> VeilidAPIResult<Self> {
Ok(Self {
api: self.api.clone(),
inner: Arc::new(Mutex::new(RoutingContextInner {})),
unlocked_inner: Arc::new(RoutingContextUnlockedInner {
safety_selection: SafetySelection::Safe(SafetySpec {
preferred_route: None,
hop_count: c.network.rpc.default_route_hop_count as usize,
stability,
sequencing: self.sequencing(),
}),
}),
unlocked_inner: Arc::new(RoutingContextUnlockedInner { safety_selection }),
})
}

View File

@ -112,8 +112,8 @@ pub fn fix_veilidconfiginner() -> VeilidConfigInner {
hole_punch_receipt_time_ms: 9000,
network_key_password: None,
routing_table: VeilidConfigRoutingTable {
node_id: TypedKeySet::new(),
node_id_secret: TypedSecretSet::new(),
node_id: TypedKeyGroup::new(),
node_id_secret: TypedSecretGroup::new(),
bootstrap: vec!["boots".to_string()],
limit_over_attached: 1,
limit_fully_attached: 2,

View File

@ -46,6 +46,9 @@ impl DHTRecordDescriptor {
}
}
pub fn key(&self) -> &TypedKey {
&self.key
}
pub fn owner(&self) -> &PublicKey {
&self.owner
}

View File

@ -2,7 +2,6 @@ use super::*;
#[derive(
Clone,
Debug,
Default,
PartialOrd,
PartialEq,
@ -61,3 +60,13 @@ impl ValueData {
mem::size_of::<Self>() + self.data.len()
}
}
impl fmt::Debug for ValueData {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("ValueData")
.field("seq", &self.seq)
.field("data", &print_data(&self.data, None))
.field("writer", &self.writer)
.finish()
}
}

View File

@ -348,9 +348,9 @@ pub struct VeilidConfigRPC {
)]
pub struct VeilidConfigRoutingTable {
#[schemars(with = "Vec<String>")]
pub node_id: TypedKeySet,
pub node_id: TypedKeyGroup,
#[schemars(with = "Vec<String>")]
pub node_id_secret: TypedSecretSet,
pub node_id_secret: TypedSecretGroup,
pub bootstrap: Vec<String>,
pub limit_over_attached: u32,
pub limit_fully_attached: u32,
@ -785,7 +785,7 @@ impl VeilidConfig {
let mut safe_cfg = self.inner.read().clone();
// Remove secrets
safe_cfg.network.routing_table.node_id_secret = TypedSecretSet::new();
safe_cfg.network.routing_table.node_id_secret = TypedSecretGroup::new();
safe_cfg.protected_store.device_encryption_key_password = "".to_owned();
safe_cfg.protected_store.new_device_encryption_key_password = None;
@ -1075,8 +1075,8 @@ impl VeilidConfig {
crypto: Crypto,
table_store: TableStore,
) -> VeilidAPIResult<()> {
let mut out_node_id = TypedKeySet::new();
let mut out_node_id_secret = TypedSecretSet::new();
let mut out_node_id = TypedKeyGroup::new();
let mut out_node_id_secret = TypedSecretGroup::new();
for ck in VALID_CRYPTO_KINDS {
let vcrypto = crypto

View File

@ -28,7 +28,7 @@ abstract class DHTSchema {
default:
{
throw VeilidAPIExceptionInternal(
"Invalid VeilidAPIException type: ${json['kind']}");
"Invalid DHTSchema type: ${json['kind']}");
}
}
}
@ -196,6 +196,7 @@ class ValueData {
}
}
//////////////////////////////////////
/// Stability
enum Stability {
@ -228,6 +229,80 @@ enum Sequencing {
}
}
//////////////////////////////////////
/// SafetySelection
abstract class SafetySelection {
factory SafetySelection.fromJson(dynamic json) {
var m = json as Map<String, dynamic>;
if (m.containsKey("Unsafe")) {
return SafetySelectionUnsafe(
sequencing: Sequencing.fromJson(m["Unsafe"]));
} else if (m.containsKey("Safe")) {
return SafetySelectionSafe(safetySpec: SafetySpec.fromJson(m["Safe"]));
} else {
throw VeilidAPIExceptionInternal("Invalid SafetySelection");
}
}
Map<String, dynamic> toJson();
}
class SafetySelectionUnsafe implements SafetySelection {
final Sequencing sequencing;
//
SafetySelectionUnsafe({
required this.sequencing,
});
@override
Map<String, dynamic> toJson() {
return {'Unsafe': sequencing.toJson()};
}
}
class SafetySelectionSafe implements SafetySelection {
final SafetySpec safetySpec;
//
SafetySelectionSafe({
required this.safetySpec,
});
@override
Map<String, dynamic> toJson() {
return {'Safe': safetySpec.toJson()};
}
}
/// Options for safety routes (sender privacy)
class SafetySpec {
final String? preferredRoute;
final int hopCount;
final Stability stability;
final Sequencing sequencing;
//
SafetySpec({
this.preferredRoute,
required this.hopCount,
required this.stability,
required this.sequencing,
});
SafetySpec.fromJson(dynamic json)
: preferredRoute = json['preferred_route'],
hopCount = json['hop_count'],
stability = Stability.fromJson(json['stability']),
sequencing = Sequencing.fromJson(json['sequencing']);
Map<String, dynamic> toJson() {
return {
'preferred_route': preferredRoute,
'hop_count': hopCount,
'stability': stability.toJson(),
'sequencing': sequencing.toJson()
};
}
}
//////////////////////////////////////
/// RouteBlob
class RouteBlob {
@ -251,7 +326,7 @@ class RouteBlob {
abstract class VeilidRoutingContext {
// Modifiers
VeilidRoutingContext withPrivacy();
VeilidRoutingContext withCustomPrivacy(Stability stability);
VeilidRoutingContext withCustomPrivacy(SafetySelection safetySelection);
VeilidRoutingContext withSequencing(Sequencing sequencing);
// App call/message

View File

@ -595,9 +595,9 @@ class VeilidRoutingContextFFI implements VeilidRoutingContext {
}
@override
VeilidRoutingContextFFI withCustomPrivacy(Stability stability) {
VeilidRoutingContextFFI withCustomPrivacy(SafetySelection safetySelection) {
final newId = _ctx.ffi._routingContextWithCustomPrivacy(
_ctx.id, jsonEncode(stability).toNativeUtf8());
_ctx.id, jsonEncode(safetySelection).toNativeUtf8());
return VeilidRoutingContextFFI._(_Ctx(newId, _ctx.ffi));
}

View File

@ -45,11 +45,11 @@ class VeilidRoutingContextJS implements VeilidRoutingContext {
}
@override
VeilidRoutingContextJS withCustomPrivacy(Stability stability) {
VeilidRoutingContextJS withCustomPrivacy(SafetySelection safetySelection) {
final newId = js_util.callMethod(
wasm,
"routing_context_with_custom_privacy",
[_ctx.id, jsonEncode(stability)]);
[_ctx.id, jsonEncode(safetySelection)]);
return VeilidRoutingContextJS._(_Ctx(newId, _ctx.js));
}

View File

@ -410,15 +410,15 @@ pub extern "C" fn routing_context_with_privacy(id: u32) -> u32 {
}
#[no_mangle]
pub extern "C" fn routing_context_with_custom_privacy(id: u32, stability: FfiStr) -> u32 {
let stability: veilid_core::Stability =
veilid_core::deserialize_opt_json(stability.into_opt_string()).unwrap();
pub extern "C" fn routing_context_with_custom_privacy(id: u32, safety_selection: FfiStr) -> u32 {
let safety_selection: veilid_core::SafetySelection =
veilid_core::deserialize_opt_json(safety_selection.into_opt_string()).unwrap();
let rc = ROUTING_CONTEXTS.lock();
let Some(routing_context) = rc.get(&id) else {
return 0;
};
let Ok(routing_context) = routing_context.clone().with_custom_privacy(stability) else {
let Ok(routing_context) = routing_context.clone().with_custom_privacy(safety_selection) else {
return 0;
};
let new_id = add_routing_context(routing_context);

View File

@ -59,7 +59,7 @@ async def test_get_dht_value_nonexistent(api_connection: veilid.VeilidAPI):
async def test_set_get_dht_value(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(2))
vd = await rc.set_dht_value(rec.key, 0, b"BLAH BLAH BLAH")
assert vd != None
@ -70,6 +70,9 @@ async def test_set_get_dht_value(api_connection: veilid.VeilidAPI):
vd3 = await rc.get_dht_value(rec.key, 0, True)
assert vd3 != None
vd4 = await rc.get_dht_value(rec.key, 1, False)
assert vd4 == None
print("vd: {}", vd.__dict__)
print("vd2: {}", vd2.__dict__)
print("vd3: {}", vd3.__dict__)
@ -80,3 +83,124 @@ async def test_set_get_dht_value(api_connection: veilid.VeilidAPI):
await rc.close_dht_record(rec.key)
await rc.delete_dht_record(rec.key)
@pytest.mark.asyncio
async def test_open_writer_dht_value(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(2))
key = rec.key
owner = rec.owner
secret = rec.owner_secret
print(f"key:{key}")
cs = await api_connection.get_crypto_system(veilid.CryptoKind.CRYPTO_KIND_VLD0)
async with cs:
assert await cs.validate_key_pair(owner, secret)
other_keypair = await cs.generate_key_pair()
va = b"Qwertyuiop Asdfghjkl Zxcvbnm"
vb = b"1234567890"
vc = b"!@#$%^&*()"
# Test subkey writes
vdtemp = await rc.set_dht_value(key, 1, va)
assert vdtemp != None
assert vdtemp.data == va
assert vdtemp.seq == 0
assert vdtemp.writer == owner
vdtemp = await rc.get_dht_value(key, 1, False)
assert vdtemp.data == va
assert vdtemp.seq == 0
assert vdtemp.writer == owner
vdtemp = await rc.get_dht_value(key, 0, False)
assert vdtemp == None
vdtemp = await rc.set_dht_value(key, 0, vb)
assert vdtemp.data == vb
assert vdtemp.seq == 0
vdtemp = await rc.get_dht_value(key, 0, True)
assert vdtemp.data == vb
vdtemp = await rc.get_dht_value(key, 1, True)
assert vdtemp.data == va
# Equal value should not trigger sequence number update
vdtemp = await rc.set_dht_value(key, 1, va)
assert vdtemp != None
assert vdtemp.data == va
assert vdtemp.seq == 0
assert vdtemp.writer == owner
# Different value should trigger sequence number update
vdtemp = await rc.set_dht_value(key, 1, vb)
assert vdtemp != None
assert vdtemp.data == vb
assert vdtemp.seq == 1
assert vdtemp.writer == owner
# Now that we initialized some subkeys
# and verified they stored correctly
# Delete things locally and reopen and see if we can write
# with the same writer key
await rc.close_dht_record(key)
await rc.delete_dht_record(key)
rec = await rc.open_dht_record(key, veilid.KeyPair.from_parts(owner, secret))
assert rec != None
assert rec.key == key
assert rec.owner == owner
assert rec.owner_secret == secret
assert rec.schema.kind == veilid.DHTSchemaKind.DFLT
assert rec.schema.o_cnt == 2
# Verify subkey 1 can be set before it is get but newer is available online
vdtemp = await rc.set_dht_value(key, 1, vc)
assert vdtemp != None
assert vdtemp.data == vb
assert vdtemp.seq == 1
assert vdtemp.writer == owner
# Verify subkey 1 can be set a second time and it updates because seq is newer
vdtemp = await rc.set_dht_value(key, 1, vc)
assert vdtemp != None
assert vdtemp.data == vc
assert vdtemp.seq == 2
assert vdtemp.writer == owner
# Verify the network got the subkey update with a refresh check
vdtemp = await rc.get_dht_value(key, 1, True)
assert vdtemp != None
assert vdtemp.data == vc
assert vdtemp.seq == 2
assert vdtemp.writer == owner
# Delete things locally and reopen and see if we can write
# with a different writer key (should fail)
await rc.close_dht_record(key)
await rc.delete_dht_record(key)
rec = await rc.open_dht_record(key, other_keypair)
assert rec != None
assert rec.key == key
assert rec.owner == owner
assert rec.owner_secret == None
assert rec.schema.kind == veilid.DHTSchemaKind.DFLT
assert rec.schema.o_cnt == 2
# Verify subkey 1 can NOT be set because we have the wrong writer
with pytest.raises(veilid.VeilidAPIError):
vdtemp = await rc.set_dht_value(key, 1, va)
# Verify subkey 0 can NOT be set because we have the wrong writer
with pytest.raises(veilid.VeilidAPIError):
vdtemp = await rc.set_dht_value(key, 0, va)
# Clean up
await rc.close_dht_record(key)
await rc.delete_dht_record(key)

View File

@ -16,14 +16,28 @@ from .conftest import server_info
@pytest.mark.asyncio
async def test_routing_contexts(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
pass
rc = await api_connection.new_routing_context()
async with rc:
rcp = await rc.with_privacy(release=False)
async with rcp:
rcps = await rcp.with_sequencing(veilid.Sequencing.ENSURE_ORDERED, release=False)
async with rcps:
rcpscp = await rcps.with_custom_privacy(veilid.Stability.RELIABLE, release=False)
await rcpscp.release()
pass
rc = await (await api_connection.new_routing_context()).with_sequencing(veilid.Sequencing.ENSURE_ORDERED)
async with rc:
pass
rc = await (await api_connection.new_routing_context()).with_custom_privacy(
veilid.SafetySelection.safe(
veilid.SafetySpec(None, 2, veilid.Stability.RELIABLE, veilid.Sequencing.ENSURE_ORDERED)
))
await rc.release()
rc = await (await api_connection.new_routing_context()).with_custom_privacy(veilid.SafetySelection.unsafe(veilid.Sequencing.ENSURE_ORDERED))
await rc.release()
@pytest.mark.asyncio

View File

@ -27,7 +27,7 @@ class RoutingContext(ABC):
pass
@abstractmethod
async def with_custom_privacy(self, stability: types.Stability, release = True) -> Self:
async def with_custom_privacy(self, safety_selection: types.SafetySelection, release = True) -> Self:
pass
@abstractmethod

View File

@ -16,7 +16,7 @@ from .state import VeilidState, VeilidUpdate
from .types import (CryptoKey, CryptoKeyDistance, CryptoKind,
DHTRecordDescriptor, DHTSchema, HashDigest, KeyPair,
NewPrivateRouteResult, Nonce, OperationId, PublicKey,
RouteId, SecretKey, Sequencing, SharedSecret, Signature,
RouteId, SafetySelection, SecretKey, Sequencing, SharedSecret, Signature,
Stability, Timestamp, TypedKey, TypedKeyPair,
TypedSignature, ValueData, ValueSubkey, VeilidJSONEncoder,
VeilidVersion, urlsafe_b64decode_no_pad)
@ -459,14 +459,14 @@ class _JsonRoutingContext(RoutingContext):
await self.release()
return self.__class__(self.api, new_rc_id)
async def with_custom_privacy(self, stability: Stability, release = True) -> Self:
async def with_custom_privacy(self, safety_selection: SafetySelection, release = True) -> Self:
new_rc_id = raise_api_result(
await self.api.send_ndjson_request(
Operation.ROUTING_CONTEXT,
validate=validate_rc_op,
rc_id=self.rc_id,
rc_op=RoutingContextOperation.WITH_CUSTOM_PRIVACY,
stability=stability,
safety_selection=safety_selection,
)
)
if release:

View File

@ -224,7 +224,7 @@
"type": "object",
"required": [
"rc_op",
"stability"
"safety_selection"
],
"properties": {
"rc_op": {
@ -233,8 +233,8 @@
"WithCustomPrivacy"
]
},
"stability": {
"$ref": "#/definitions/Stability"
"safety_selection": {
"$ref": "#/definitions/SafetySelection"
}
}
},
@ -1566,6 +1566,77 @@
}
}
},
"SafetySelection": {
"description": "The choice of safety route to include in compiled routes",
"oneOf": [
{
"description": "Don't use a safety route, only specify the sequencing preference",
"type": "object",
"required": [
"Unsafe"
],
"properties": {
"Unsafe": {
"$ref": "#/definitions/Sequencing"
}
},
"additionalProperties": false
},
{
"description": "Use a safety route and parameters specified by a SafetySpec",
"type": "object",
"required": [
"Safe"
],
"properties": {
"Safe": {
"$ref": "#/definitions/SafetySpec"
}
},
"additionalProperties": false
}
]
},
"SafetySpec": {
"description": "Options for safety routes (sender privacy)",
"type": "object",
"required": [
"hop_count",
"sequencing",
"stability"
],
"properties": {
"hop_count": {
"description": "must be greater than 0",
"type": "integer",
"format": "uint",
"minimum": 0.0
},
"preferred_route": {
"description": "preferred safety route set id if it still exists",
"type": [
"string",
"null"
]
},
"sequencing": {
"description": "prefer connection-oriented sequenced protocols",
"allOf": [
{
"$ref": "#/definitions/Sequencing"
}
]
},
"stability": {
"description": "prefer reliability over speed",
"allOf": [
{
"$ref": "#/definitions/Stability"
}
]
}
}
},
"Sequencing": {
"type": "string",
"enum": [

View File

@ -67,6 +67,9 @@ class DHTSchemaKind(StrEnum):
DFLT = "DFLT"
SMPL = "SMPL"
class SafetySelectionKind(StrEnum):
UNSAFE = "Unsafe"
SAFE = "Safe"
####################################################################
@ -357,7 +360,7 @@ class ValueData:
def __lt__(self, other):
if other is None:
return true
return True
if self.data < other.data:
return True
if self.data > other.data:
@ -383,3 +386,61 @@ class ValueData:
def to_json(self) -> dict:
return self.__dict__
####################################################################
class SafetySpec:
preferred_route: Optional[RouteId]
hop_count: int
stability: Stability
sequencing: Sequencing
def __init__(self, preferred_route: Optional[RouteId], hop_count: int, stability: Stability, sequencing: Sequencing):
self.preferred_route = preferred_route
self.hop_count = hop_count
self.stability = stability
self.sequencing = sequencing
@classmethod
def from_json(cls, j: dict) -> Self:
return cls(RouteId(j["preferred_route"]) if "preferred_route" in j else None,
j["hop_count"],
Stability(j["stability"]),
Sequencing(j["sequencing"]))
def to_json(self) -> dict:
return self.__dict__
class SafetySelection:
kind: SafetySelectionKind
def __init__(self, kind: SafetySelectionKind, **kwargs):
self.kind = kind
for k, v in kwargs.items():
setattr(self, k, v)
@classmethod
def unsafe(cls, sequencing: Sequencing) -> Self:
return cls(SafetySelectionKind.UNSAFE, sequencing=sequencing)
@classmethod
def safe(cls, safety_spec: SafetySpec) -> Self:
return cls(SafetySelectionKind.SAFE, safety_spec=safety_spec)
@classmethod
def from_json(cls, j: dict) -> Self:
if "Safe" in j:
return cls.safe(SafetySpec.from_json(j["Safe"]))
elif "Unsafe" in j:
return cls.unsafe(Sequencing(j["Unsafe"]))
raise Exception("Invalid SafetySelection")
def to_json(self) -> dict:
if self.kind == SafetySelectionKind.UNSAFE:
return {"Unsafe": self.sequencing }
elif self.kind == SafetySelectionKind.SAFE:
return {"Safe": self.safety_spec.to_json() }
else:
raise Exception("Invalid SafetySelection")

View File

@ -4,7 +4,7 @@ use clap::{Arg, ArgMatches, Command};
use std::ffi::OsStr;
use std::path::Path;
use std::str::FromStr;
use veilid_core::{TypedKeySet, TypedSecretSet};
use veilid_core::{TypedKeyGroup, TypedSecretGroup};
fn do_clap_matches(default_config_path: &OsStr) -> Result<clap::ArgMatches, clap::Error> {
let matches = Command::new("veilid-server")
@ -277,12 +277,12 @@ pub fn process_command_line() -> EyreResult<(Settings, ArgMatches)> {
// Split or get secret
let tks =
TypedKeySet::from_str(v).wrap_err("failed to decode node id set from command line")?;
TypedKeyGroup::from_str(v).wrap_err("failed to decode node id set from command line")?;
let buffer = rpassword::prompt_password("Enter secret key set (will not echo): ")
.wrap_err("invalid secret key")?;
let buffer = buffer.trim().to_string();
let tss = TypedSecretSet::from_str(&buffer).wrap_err("failed to decode secret set")?;
let tss = TypedSecretGroup::from_str(&buffer).wrap_err("failed to decode secret set")?;
settingsrw.core.network.routing_table.node_id = Some(tks);
settingsrw.core.network.routing_table.node_id_secret = Some(tss);

View File

@ -38,8 +38,8 @@ fn main() -> EyreResult<()> {
if matches.occurrences_of("generate-key-pair") != 0 {
if let Some(ckstr) = matches.get_one::<String>("generate-key-pair") {
if ckstr == "" {
let mut tks = veilid_core::TypedKeySet::new();
let mut tss = veilid_core::TypedSecretSet::new();
let mut tks = veilid_core::TypedKeyGroup::new();
let mut tss = veilid_core::TypedSecretGroup::new();
for ck in veilid_core::VALID_CRYPTO_KINDS {
let tkp = veilid_core::Crypto::generate_keypair(ck)
.wrap_err("invalid crypto kind")?;

View File

@ -562,8 +562,8 @@ pub struct Dht {
#[derive(Debug, Deserialize, Serialize)]
pub struct RoutingTable {
pub node_id: Option<veilid_core::TypedKeySet>,
pub node_id_secret: Option<veilid_core::TypedSecretSet>,
pub node_id: Option<veilid_core::TypedKeyGroup>,
pub node_id_secret: Option<veilid_core::TypedSecretGroup>,
pub bootstrap: Vec<String>,
pub limit_over_attached: u32,
pub limit_fully_attached: u32,

View File

@ -112,7 +112,7 @@ pub fn debug_duration(dur: u64) -> String {
let msecs = dur / MSEC;
format!(
"{}{}{}{}.{:03}",
"{}{}{}{}.{:03}s",
if days != 0 {
format!("{}d", days)
} else {
@ -128,11 +128,7 @@ pub fn debug_duration(dur: u64) -> String {
} else {
"".to_owned()
},
if secs != 0 {
format!("{}s", secs)
} else {
"".to_owned()
},
secs,
msecs
)
}