mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-01-25 14:07:40 -05:00
xfer
This commit is contained in:
parent
60c4648530
commit
ed0049dc22
@ -2,6 +2,7 @@ use crate::xx::*;
|
||||
use crate::*;
|
||||
use data_encoding::BASE64URL_NOPAD;
|
||||
use keyring_manager::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::path::Path;
|
||||
|
||||
pub struct ProtectedStoreInner {
|
||||
@ -31,15 +32,18 @@ impl ProtectedStore {
|
||||
#[instrument(level = "trace", skip(self), err)]
|
||||
pub async fn delete_all(&self) -> EyreResult<()> {
|
||||
// Delete all known keys
|
||||
if self.remove_user_secret_string("node_id").await? {
|
||||
if self.remove_user_secret("node_id").await? {
|
||||
debug!("deleted protected_store key 'node_id'");
|
||||
}
|
||||
if self.remove_user_secret_string("node_id_secret").await? {
|
||||
if self.remove_user_secret("node_id_secret").await? {
|
||||
debug!("deleted protected_store key 'node_id_secret'");
|
||||
}
|
||||
if self.remove_user_secret_string("_test_key").await? {
|
||||
if self.remove_user_secret("_test_key").await? {
|
||||
debug!("deleted protected_store key '_test_key'");
|
||||
}
|
||||
if self.remove_user_secret("RouteSpecStore").await? {
|
||||
debug!("deleted protected_store key 'RouteSpecStore'");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -139,19 +143,30 @@ impl ProtectedStore {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip(self), ret, err)]
|
||||
pub async fn remove_user_secret_string(&self, key: &str) -> EyreResult<bool> {
|
||||
let inner = self.inner.lock();
|
||||
match inner
|
||||
.keyring_manager
|
||||
.as_ref()
|
||||
.ok_or_else(|| eyre!("Protected store not initialized"))?
|
||||
.with_keyring(&self.service_name(), key, |kr| kr.delete_value())
|
||||
#[instrument(level = "trace", skip(self, value), ret, err)]
|
||||
pub async fn save_user_secret_cbor<T>(&self, key: &str, value: &T) -> EyreResult<bool>
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
Ok(_) => Ok(true),
|
||||
Err(KeyringError::NoPasswordFound) => Ok(false),
|
||||
Err(e) => Err(eyre!("Failed to remove user secret: {}", e)),
|
||||
let v = serde_cbor::to_vec(value).wrap_err("couldn't store as CBOR")?;
|
||||
self.save_user_secret(&key, &v).await
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip(self), err)]
|
||||
pub async fn load_user_secret_cbor<T>(&self, key: &str) -> EyreResult<Option<T>>
|
||||
where
|
||||
T: for<'de> Deserialize<'de>,
|
||||
{
|
||||
let out = self.load_user_secret(key).await?;
|
||||
let b = match out {
|
||||
Some(v) => v,
|
||||
None => {
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
let obj = serde_cbor::from_slice::<T>(&b).wrap_err("failed to deserialize")?;
|
||||
Ok(Some(obj))
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip(self, value), ret, err)]
|
||||
@ -195,6 +210,16 @@ impl ProtectedStore {
|
||||
|
||||
#[instrument(level = "trace", skip(self), ret, err)]
|
||||
pub async fn remove_user_secret(&self, key: &str) -> EyreResult<bool> {
|
||||
self.remove_user_secret_string(key).await
|
||||
let inner = self.inner.lock();
|
||||
match inner
|
||||
.keyring_manager
|
||||
.as_ref()
|
||||
.ok_or_else(|| eyre!("Protected store not initialized"))?
|
||||
.with_keyring(&self.service_name(), key, |kr| kr.delete_value())
|
||||
{
|
||||
Ok(_) => Ok(true),
|
||||
Err(KeyringError::NoPasswordFound) => Ok(false),
|
||||
Err(e) => Err(eyre!("Failed to remove user secret: {}", e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,38 +17,40 @@ extern "C" {
|
||||
fn keytar_deletePassword(service: &str, account: &str) -> Result<Promise, JsValue>;
|
||||
}
|
||||
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ProtectedStore {
|
||||
config: VeilidConfig,
|
||||
}
|
||||
|
||||
impl ProtectedStore {
|
||||
|
||||
pub fn new(config: VeilidConfig) -> Self {
|
||||
Self {
|
||||
config,
|
||||
}
|
||||
Self { config }
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip(self), err)]
|
||||
pub async fn delete_all(&self) -> EyreResult<()> {
|
||||
// Delete all known keys
|
||||
if self.remove_user_secret_string("node_id").await? {
|
||||
if self.remove_user_secret("node_id").await? {
|
||||
debug!("deleted protected_store key 'node_id'");
|
||||
}
|
||||
if self.remove_user_secret_string("node_id_secret").await? {
|
||||
if self.remove_user_secret("node_id_secret").await? {
|
||||
debug!("deleted protected_store key 'node_id_secret'");
|
||||
}
|
||||
if self.remove_user_secret_string("_test_key").await? {
|
||||
if self.remove_user_secret("_test_key").await? {
|
||||
debug!("deleted protected_store key '_test_key'");
|
||||
}
|
||||
if self.remove_user_secret("RouteSpecStore").await? {
|
||||
debug!("deleted protected_store key 'RouteSpecStore'");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self), err)]
|
||||
pub async fn init(&self) -> EyreResult<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip(self))]
|
||||
pub async fn terminate(&self) {}
|
||||
|
||||
fn keyring_name(&self) -> String {
|
||||
@ -69,6 +71,7 @@ impl ProtectedStore {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip(self, value), ret, err)]
|
||||
pub async fn save_user_secret_string(&self, key: &str, value: &str) -> EyreResult<bool> {
|
||||
if is_nodejs() {
|
||||
let prev = match JsFuture::from(
|
||||
@ -134,6 +137,7 @@ impl ProtectedStore {
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip(self), err)]
|
||||
pub async fn load_user_secret_string(&self, key: &str) -> EyreResult<Option<String>> {
|
||||
if is_nodejs() {
|
||||
let prev = match JsFuture::from(
|
||||
@ -181,7 +185,73 @@ impl ProtectedStore {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn remove_user_secret_string(&self, key: &str) -> EyreResult<bool> {
|
||||
#[instrument(level = "trace", skip(self, value), ret, err)]
|
||||
pub async fn save_user_secret_cbor<T>(&self, key: &str, value: &T) -> EyreResult<bool>
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
let v = serde_cbor::to_vec(value).wrap_err("couldn't store as CBOR")?;
|
||||
self.save_user_secret(&key, &v).await
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip(self), err)]
|
||||
pub async fn load_user_secret_cbor<T>(&self, key: &str) -> EyreResult<Option<T>>
|
||||
where
|
||||
T: for<'de> Deserialize<'de>,
|
||||
{
|
||||
let out = self.load_user_secret(key).await?;
|
||||
let b = match out {
|
||||
Some(v) => v,
|
||||
None => {
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
let obj = serde_cbor::from_slice::<T>(&b).wrap_err("failed to deserialize")?;
|
||||
Ok(Some(obj))
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip(self, value), ret, err)]
|
||||
pub async fn save_user_secret(&self, key: &str, value: &[u8]) -> EyreResult<bool> {
|
||||
let mut s = BASE64URL_NOPAD.encode(value);
|
||||
s.push('!');
|
||||
|
||||
self.save_user_secret_string(key, s.as_str()).await
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip(self), err)]
|
||||
pub async fn load_user_secret(&self, key: &str) -> EyreResult<Option<Vec<u8>>> {
|
||||
let mut s = match self.load_user_secret_string(key).await? {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
if s.pop() != Some('!') {
|
||||
bail!("User secret is not a buffer");
|
||||
}
|
||||
|
||||
let mut bytes = Vec::<u8>::new();
|
||||
let res = BASE64URL_NOPAD.decode_len(s.len());
|
||||
match res {
|
||||
Ok(l) => {
|
||||
bytes.resize(l, 0u8);
|
||||
}
|
||||
Err(_) => {
|
||||
bail!("Failed to decode");
|
||||
}
|
||||
}
|
||||
|
||||
let res = BASE64URL_NOPAD.decode_mut(s.as_bytes(), &mut bytes);
|
||||
match res {
|
||||
Ok(_) => Ok(Some(bytes)),
|
||||
Err(_) => bail!("Failed to decode"),
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "trace", skip(self), ret, err)]
|
||||
pub async fn remove_user_secret(&self, key: &str) -> EyreResult<bool> {
|
||||
if is_nodejs() {
|
||||
match JsFuture::from(
|
||||
keytar_deletePassword(self.keyring_name().as_str(), key)
|
||||
@ -231,45 +301,4 @@ impl ProtectedStore {
|
||||
unimplemented!();
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn save_user_secret(&self, key: &str, value: &[u8]) -> EyreResult<bool> {
|
||||
let mut s = BASE64URL_NOPAD.encode(value);
|
||||
s.push('!');
|
||||
|
||||
self.save_user_secret_string(key, s.as_str()).await
|
||||
}
|
||||
|
||||
pub async fn load_user_secret(&self, key: &str) -> EyreResult<Option<Vec<u8>>> {
|
||||
let mut s = match self.load_user_secret_string(key).await? {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
if s.pop() != Some('!') {
|
||||
bail!("User secret is not a buffer");
|
||||
}
|
||||
|
||||
let mut bytes = Vec::<u8>::new();
|
||||
let res = BASE64URL_NOPAD.decode_len(s.len());
|
||||
match res {
|
||||
Ok(l) => {
|
||||
bytes.resize(l, 0u8);
|
||||
}
|
||||
Err(_) => {
|
||||
bail!("Failed to decode");
|
||||
}
|
||||
}
|
||||
|
||||
let res = BASE64URL_NOPAD.decode_mut(s.as_bytes(), &mut bytes);
|
||||
match res {
|
||||
Ok(_) => Ok(Some(bytes)),
|
||||
Err(_) => bail!("Failed to decode"),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn remove_user_secret(&self, key: &str) -> EyreResult<bool> {
|
||||
self.remove_user_secret_string(key).await
|
||||
}
|
||||
}
|
@ -1747,7 +1747,7 @@ impl NetworkManager {
|
||||
|
||||
// Ignore these reports if we are currently detecting public dial info
|
||||
let net = self.net();
|
||||
if net.doing_public_dial_info_check() {
|
||||
if net.needs_public_dial_info_check() {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -63,8 +63,6 @@ struct NetworkInner {
|
||||
enable_ipv6_local: bool,
|
||||
/// set if we need to calculate our public dial info again
|
||||
needs_public_dial_info_check: bool,
|
||||
/// set during the actual execution of the public dial info check to ensure we don't do it more than once
|
||||
doing_public_dial_info_check: bool,
|
||||
/// the punishment closure to enax
|
||||
public_dial_info_check_punishment: Option<Box<dyn FnOnce() + Send + 'static>>,
|
||||
/// udp socket record for bound-first sockets, which are used to guarantee a port is available before
|
||||
@ -116,7 +114,6 @@ impl Network {
|
||||
network_started: false,
|
||||
network_needs_restart: false,
|
||||
needs_public_dial_info_check: false,
|
||||
doing_public_dial_info_check: false,
|
||||
public_dial_info_check_punishment: None,
|
||||
protocol_config: Default::default(),
|
||||
static_public_dialinfo: ProtocolTypeSet::empty(),
|
||||
@ -871,16 +868,11 @@ impl Network {
|
||||
inner.public_dial_info_check_punishment = punishment;
|
||||
}
|
||||
|
||||
fn needs_public_dial_info_check(&self) -> bool {
|
||||
pub fn needs_public_dial_info_check(&self) -> bool {
|
||||
let inner = self.inner.lock();
|
||||
inner.needs_public_dial_info_check
|
||||
}
|
||||
|
||||
pub fn doing_public_dial_info_check(&self) -> bool {
|
||||
let inner = self.inner.lock();
|
||||
inner.doing_public_dial_info_check
|
||||
}
|
||||
|
||||
//////////////////////////////////////////
|
||||
|
||||
#[instrument(level = "trace", skip(self), err)]
|
||||
|
@ -883,15 +883,11 @@ impl Network {
|
||||
l: u64,
|
||||
t: u64,
|
||||
) -> EyreResult<()> {
|
||||
// Note that we are doing the public dial info check
|
||||
// We don't have to check this for concurrency, since this routine is run in a TickTask/SingleFuture
|
||||
self.inner.lock().doing_public_dial_info_check = true;
|
||||
|
||||
// Do the public dial info check
|
||||
let out = self.do_public_dial_info_check(stop_token, l, t).await;
|
||||
|
||||
// Done with public dial info check
|
||||
self.inner.lock().doing_public_dial_info_check = false;
|
||||
self.inner.lock().needs_public_dial_info_check = false;
|
||||
|
||||
out
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ impl Network {
|
||||
NetworkUnlockedInner {
|
||||
network_manager,
|
||||
routing_table,
|
||||
connection_manager
|
||||
connection_manager,
|
||||
}
|
||||
}
|
||||
|
||||
@ -58,7 +58,11 @@ impl Network {
|
||||
Self {
|
||||
config: network_manager.config(),
|
||||
inner: Arc::new(Mutex::new(Self::new_inner())),
|
||||
unlocked_inner: Arc::new(Self::new_unlocked_inner(network_manager, routing_table, connection_manager))
|
||||
unlocked_inner: Arc::new(Self::new_unlocked_inner(
|
||||
network_manager,
|
||||
routing_table,
|
||||
connection_manager,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
@ -320,11 +324,14 @@ impl Network {
|
||||
|
||||
//////////////////////////////////////////
|
||||
|
||||
pub fn set_needs_public_dial_info_check(&self, _punishment: Option<Box<dyn FnOnce() + Send + 'static>>) {
|
||||
pub fn set_needs_public_dial_info_check(
|
||||
&self,
|
||||
_punishment: Option<Box<dyn FnOnce() + Send + 'static>>,
|
||||
) {
|
||||
//
|
||||
}
|
||||
|
||||
pub fn doing_public_dial_info_check(&self) -> bool {
|
||||
pub fn needs_public_dial_info_check(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
|
@ -155,7 +155,7 @@ fn with_route_permutations(
|
||||
// initial permutation
|
||||
let mut permutation: Vec<usize> = Vec::with_capacity(hop_count);
|
||||
for n in 0..hop_count {
|
||||
permutation[n] = start + n;
|
||||
permutation.push(start + n);
|
||||
}
|
||||
// if we have one hop or two, then there's only one permutation
|
||||
if hop_count == 1 || hop_count == 2 {
|
||||
@ -229,22 +229,17 @@ impl RouteSpecStore {
|
||||
|
||||
// Load secrets from pstore
|
||||
let pstore = routing_table.network_manager().protected_store();
|
||||
let mut dead_keys = Vec::new();
|
||||
for (k, v) in &mut content.details {
|
||||
if let Some(secret_key) = pstore
|
||||
.load_user_secret(&format!("RouteSpecStore_{}", k.encode()))
|
||||
let out: Vec<(DHTKey, DHTKeySecret)> = pstore
|
||||
.load_user_secret_cbor("RouteSpecStore")
|
||||
.await?
|
||||
{
|
||||
match secret_key.try_into() {
|
||||
Ok(s) => {
|
||||
v.secret_key = DHTKeySecret::new(s);
|
||||
}
|
||||
Err(_) => {
|
||||
dead_keys.push(*k);
|
||||
}
|
||||
}
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut dead_keys = Vec::new();
|
||||
for (k, v) in out {
|
||||
if let Some(rsd) = content.details.get_mut(&k) {
|
||||
rsd.secret_key = v;
|
||||
} else {
|
||||
dead_keys.push(*k);
|
||||
dead_keys.push(k);
|
||||
}
|
||||
}
|
||||
for k in dead_keys {
|
||||
@ -293,18 +288,14 @@ impl RouteSpecStore {
|
||||
.routing_table
|
||||
.network_manager()
|
||||
.protected_store();
|
||||
|
||||
let mut out: Vec<(DHTKey, DHTKeySecret)> = Vec::with_capacity(content.details.len());
|
||||
for (k, v) in &content.details {
|
||||
if pstore
|
||||
.save_user_secret(
|
||||
&format!("RouteSpecStore_{}", k.encode()),
|
||||
&v.secret_key.bytes,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
panic!("route spec should not already have secret key saved");
|
||||
}
|
||||
out.push((*k, v.secret_key));
|
||||
}
|
||||
|
||||
let _ = pstore.save_user_secret_cbor("RouteSpecStore", &out).await?; // ignore if this previously existed or not
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -346,6 +337,14 @@ impl RouteSpecStore {
|
||||
inner.content.details.get_mut(public_key)
|
||||
}
|
||||
|
||||
/// Purge the route spec store
|
||||
pub async fn purge(&self) -> EyreResult<()> {
|
||||
let inner = &mut *self.inner.lock();
|
||||
inner.content = Default::default();
|
||||
inner.cache = Default::default();
|
||||
self.save().await
|
||||
}
|
||||
|
||||
/// Create a new route
|
||||
/// Prefers nodes that are not currently in use by another route
|
||||
/// The route is not yet tested for its reachability
|
||||
@ -675,7 +674,7 @@ impl RouteSpecStore {
|
||||
)))
|
||||
}
|
||||
|
||||
pub fn release_route(&self, public_key: DHTKey) {
|
||||
pub fn release_route(&self, public_key: DHTKey) -> EyreResult<()> {
|
||||
let mut inner = self.inner.lock();
|
||||
if let Some(detail) = inner.content.details.remove(&public_key) {
|
||||
// Remove from hop cache
|
||||
@ -710,8 +709,9 @@ impl RouteSpecStore {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
panic!("can't release route that was never allocated");
|
||||
bail!("can't release route that was never allocated");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Find first matching unpublished route that fits into the selection criteria
|
||||
@ -739,6 +739,22 @@ impl RouteSpecStore {
|
||||
None
|
||||
}
|
||||
|
||||
/// List all routes
|
||||
pub fn list_routes(&self) -> Vec<DHTKey> {
|
||||
let inner = self.inner.lock();
|
||||
let mut out = Vec::with_capacity(inner.content.details.len());
|
||||
for detail in &inner.content.details {
|
||||
out.push(*detail.0);
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
/// Get the debug description of a route
|
||||
pub fn debug_route(&self, key: &DHTKey) -> Option<String> {
|
||||
let inner = &*self.inner.lock();
|
||||
Self::detail(inner, key).map(|rsd| format!("{:#?}", rsd))
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
|
||||
/// Compiles a safety route to the private route, with caching
|
||||
@ -973,17 +989,18 @@ impl RouteSpecStore {
|
||||
/// Assemble private route for publication
|
||||
pub fn assemble_private_route(
|
||||
&self,
|
||||
rti: &RoutingTableInner,
|
||||
routing_table: RoutingTable,
|
||||
key: &DHTKey,
|
||||
optimize: Option<bool>,
|
||||
) -> EyreResult<PrivateRoute> {
|
||||
let inner = &*self.inner.lock();
|
||||
let routing_table = self.unlocked_inner.routing_table.clone();
|
||||
let rti = &*routing_table.inner.read();
|
||||
|
||||
let rsd = Self::detail(inner, &key).ok_or_else(|| eyre!("route does not exist"))?;
|
||||
let rsd = Self::detail(inner, key).ok_or_else(|| eyre!("route does not exist"))?;
|
||||
|
||||
// See if we can optimize this compilation yet
|
||||
// We don't want to include full nodeinfo if we don't have to
|
||||
let optimize = rsd.reachable;
|
||||
let optimize = optimize.unwrap_or(rsd.reachable);
|
||||
|
||||
// Make innermost route hop to our own node
|
||||
let mut route_hop = RouteHop {
|
||||
@ -1053,79 +1070,79 @@ impl RouteSpecStore {
|
||||
/// Mark route as published
|
||||
/// When first deserialized, routes must be re-published in order to ensure they remain
|
||||
/// in the RouteSpecStore.
|
||||
pub fn mark_route_published(&mut self, key: &DHTKey) -> EyreResult<()> {
|
||||
pub fn mark_route_published(&self, key: &DHTKey, published: bool) -> EyreResult<()> {
|
||||
let inner = &mut *self.inner.lock();
|
||||
Self::detail_mut(inner, &key)
|
||||
Self::detail_mut(inner, key)
|
||||
.ok_or_else(|| eyre!("route does not exist"))?
|
||||
.published = true;
|
||||
.published = published;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mark route as reachable
|
||||
/// When first deserialized, routes must be re-tested for reachability
|
||||
/// This can be used to determine if routes need to be sent with full peerinfo or can just use a node id
|
||||
pub fn mark_route_reachable(&mut self, key: &DHTKey) -> EyreResult<()> {
|
||||
pub fn mark_route_reachable(&self, key: &DHTKey, reachable: bool) -> EyreResult<()> {
|
||||
let inner = &mut *self.inner.lock();
|
||||
Self::detail_mut(inner, &key)
|
||||
Self::detail_mut(inner, key)
|
||||
.ok_or_else(|| eyre!("route does not exist"))?
|
||||
.published = true;
|
||||
.reachable = reachable;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mark route as checked
|
||||
pub fn touch_route_checked(&mut self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> {
|
||||
pub fn touch_route_checked(&self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> {
|
||||
let inner = &mut *self.inner.lock();
|
||||
Self::detail_mut(inner, &key)
|
||||
Self::detail_mut(inner, key)
|
||||
.ok_or_else(|| eyre!("route does not exist"))?
|
||||
.last_checked_ts = Some(cur_ts);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mark route as used
|
||||
pub fn touch_route_used(&mut self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> {
|
||||
pub fn touch_route_used(&self, key: &DHTKey, cur_ts: u64) -> EyreResult<()> {
|
||||
let inner = &mut *self.inner.lock();
|
||||
Self::detail_mut(inner, &key)
|
||||
Self::detail_mut(inner, key)
|
||||
.ok_or_else(|| eyre!("route does not exist"))?
|
||||
.last_used_ts = Some(cur_ts);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Record latency on the route
|
||||
pub fn record_latency(&mut self, key: &DHTKey, latency: u64) -> EyreResult<()> {
|
||||
pub fn record_latency(&self, key: &DHTKey, latency: u64) -> EyreResult<()> {
|
||||
let inner = &mut *self.inner.lock();
|
||||
|
||||
let rsd = Self::detail_mut(inner, &key).ok_or_else(|| eyre!("route does not exist"))?;
|
||||
let rsd = Self::detail_mut(inner, key).ok_or_else(|| eyre!("route does not exist"))?;
|
||||
rsd.latency_stats = rsd.latency_stats_accounting.record_latency(latency);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get the calculated latency stats
|
||||
pub fn latency_stats(&mut self, key: &DHTKey) -> EyreResult<LatencyStats> {
|
||||
pub fn latency_stats(&self, key: &DHTKey) -> EyreResult<LatencyStats> {
|
||||
let inner = &mut *self.inner.lock();
|
||||
Ok(Self::detail_mut(inner, &key)
|
||||
Ok(Self::detail_mut(inner, key)
|
||||
.ok_or_else(|| eyre!("route does not exist"))?
|
||||
.latency_stats
|
||||
.clone())
|
||||
}
|
||||
|
||||
/// Add download transfers to route
|
||||
pub fn add_down(&mut self, key: &DHTKey, bytes: u64) -> EyreResult<()> {
|
||||
pub fn add_down(&self, key: &DHTKey, bytes: u64) -> EyreResult<()> {
|
||||
let inner = &mut *self.inner.lock();
|
||||
let rsd = Self::detail_mut(inner, &key).ok_or_else(|| eyre!("route does not exist"))?;
|
||||
let rsd = Self::detail_mut(inner, key).ok_or_else(|| eyre!("route does not exist"))?;
|
||||
rsd.transfer_stats_accounting.add_down(bytes);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Add upload transfers to route
|
||||
pub fn add_up(&mut self, key: &DHTKey, bytes: u64) -> EyreResult<()> {
|
||||
pub fn add_up(&self, key: &DHTKey, bytes: u64) -> EyreResult<()> {
|
||||
let inner = &mut *self.inner.lock();
|
||||
let rsd = Self::detail_mut(inner, &key).ok_or_else(|| eyre!("route does not exist"))?;
|
||||
let rsd = Self::detail_mut(inner, key).ok_or_else(|| eyre!("route does not exist"))?;
|
||||
rsd.transfer_stats_accounting.add_up(bytes);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Process transfer statistics to get averages
|
||||
pub fn roll_transfers(&mut self, last_ts: u64, cur_ts: u64) {
|
||||
pub fn roll_transfers(&self, last_ts: u64, cur_ts: u64) {
|
||||
let inner = &mut *self.inner.lock();
|
||||
for rsd in inner.content.details.values_mut() {
|
||||
rsd.transfer_stats_accounting.roll_transfers(
|
||||
@ -1135,4 +1152,22 @@ impl RouteSpecStore {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert private route to binary blob
|
||||
pub fn private_route_to_blob(private_route: &PrivateRoute) -> EyreResult<Vec<u8>> {
|
||||
let mut pr_message = ::capnp::message::Builder::new_default();
|
||||
let mut pr_builder = pr_message.init_root::<veilid_capnp::private_route::Builder>();
|
||||
encode_private_route(&private_route, &mut pr_builder)
|
||||
.wrap_err("failed to encode private route")?;
|
||||
builder_to_vec(pr_message).wrap_err("failed to convert builder to vec")
|
||||
}
|
||||
|
||||
/// Convert binary blob to private route
|
||||
pub fn blob_to_private_route(blob: Vec<u8>) -> EyreResult<PrivateRoute> {
|
||||
let reader = ::capnp::message::Reader::new(RPCMessageData::new(blob), Default::default());
|
||||
let pr_reader = reader
|
||||
.get_root::<veilid_capnp::private_route::Reader>()
|
||||
.wrap_err("failed to make reader for private_route")?;
|
||||
decode_private_route(&pr_reader).wrap_err("failed to decode private route")
|
||||
}
|
||||
}
|
||||
|
@ -90,10 +90,16 @@ struct RPCMessageHeader {
|
||||
impl RPCMessageHeader {}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct RPCMessageData {
|
||||
pub struct RPCMessageData {
|
||||
contents: Vec<u8>, // rpc messages must be a canonicalized single segment
|
||||
}
|
||||
|
||||
impl RPCMessageData {
|
||||
pub fn new(contents: Vec<u8>) -> Self {
|
||||
Self { contents }
|
||||
}
|
||||
}
|
||||
|
||||
impl ReaderSegments for RPCMessageData {
|
||||
fn get_segment(&self, idx: u32) -> Option<&[u8]> {
|
||||
if idx > 0 {
|
||||
|
@ -16,12 +16,41 @@ fn get_bucket_entry_state(text: &str) -> Option<BucketEntryState> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn get_string(text: &str) -> Option<String> {
|
||||
Some(text.to_owned())
|
||||
}
|
||||
|
||||
fn get_route_id(rss: RouteSpecStore) -> impl Fn(&str) -> Option<DHTKey> {
|
||||
return move |text: &str| {
|
||||
match DHTKey::try_decode(text).ok() {
|
||||
Some(key) => {
|
||||
let routes = rss.list_routes();
|
||||
if routes.contains(&key) {
|
||||
return Some(key);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let routes = rss.list_routes();
|
||||
for r in routes {
|
||||
let rkey = r.encode();
|
||||
if rkey.starts_with(text) {
|
||||
return Some(r);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
};
|
||||
}
|
||||
|
||||
fn get_number(text: &str) -> Option<usize> {
|
||||
usize::from_str(text).ok()
|
||||
}
|
||||
fn get_dht_key(text: &str) -> Option<DHTKey> {
|
||||
DHTKey::try_decode(text).ok()
|
||||
}
|
||||
|
||||
fn get_protocol_type(text: &str) -> Option<ProtocolType> {
|
||||
let lctext = text.to_ascii_lowercase();
|
||||
if lctext == "udp" {
|
||||
@ -36,6 +65,41 @@ fn get_protocol_type(text: &str) -> Option<ProtocolType> {
|
||||
None
|
||||
}
|
||||
}
|
||||
fn get_sequencing(text: &str) -> Option<Sequencing> {
|
||||
let seqtext = text.to_ascii_lowercase();
|
||||
if seqtext == "np" {
|
||||
Some(Sequencing::NoPreference)
|
||||
} else if seqtext == "ord" {
|
||||
Some(Sequencing::PreferOrdered)
|
||||
} else if seqtext == "*ord" {
|
||||
Some(Sequencing::EnsureOrdered)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
fn get_stability(text: &str) -> Option<Stability> {
|
||||
let sttext = text.to_ascii_lowercase();
|
||||
if sttext == "ll" {
|
||||
Some(Stability::LowLatency)
|
||||
} else if sttext == "rel" {
|
||||
Some(Stability::Reliable)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
fn get_direction_set(text: &str) -> Option<DirectionSet> {
|
||||
let dstext = text.to_ascii_lowercase();
|
||||
if dstext == "in" {
|
||||
Some(Direction::Inbound.into())
|
||||
} else if dstext == "out" {
|
||||
Some(Direction::Outbound.into())
|
||||
} else if dstext == "inout" {
|
||||
Some(DirectionSet::all())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
fn get_address_type(text: &str) -> Option<AddressType> {
|
||||
let lctext = text.to_ascii_lowercase();
|
||||
if lctext == "ipv4" {
|
||||
@ -251,6 +315,13 @@ impl VeilidAPI {
|
||||
.purge_last_connections();
|
||||
|
||||
Ok("Connections purged".to_owned())
|
||||
} else if args[0] == "routes" {
|
||||
// Purge route spec store
|
||||
let rss = self.network_manager()?.routing_table().route_spec_store();
|
||||
match rss.purge().await {
|
||||
Ok(_) => Ok("Routes purged".to_owned()),
|
||||
Err(e) => Ok(format!("Routes purged but failed to save: {}", e)),
|
||||
}
|
||||
} else {
|
||||
Err(VeilidAPIError::InvalidArgument {
|
||||
context: "debug_purge".to_owned(),
|
||||
@ -420,6 +491,191 @@ impl VeilidAPI {
|
||||
Ok(format!("{:#?}", out))
|
||||
}
|
||||
|
||||
async fn debug_route_allocate(&self, args: Vec<String>) -> Result<String, VeilidAPIError> {
|
||||
// [ord|*ord] [rel] [<count>] [in|out]
|
||||
|
||||
let netman = self.network_manager()?;
|
||||
let routing_table = netman.routing_table();
|
||||
let rss = routing_table.route_spec_store();
|
||||
let config = self.config().unwrap();
|
||||
let default_route_hop_count = {
|
||||
let c = config.get();
|
||||
c.network.rpc.default_route_hop_count as usize
|
||||
};
|
||||
|
||||
let mut ai = 1;
|
||||
let mut sequencing = Sequencing::NoPreference;
|
||||
let mut stability = Stability::LowLatency;
|
||||
let mut hop_count = default_route_hop_count;
|
||||
let mut directions = DirectionSet::all();
|
||||
|
||||
while ai < args.len() {
|
||||
if let Ok(seq) =
|
||||
get_debug_argument_at(&args, ai, "debug_route", "sequencing", get_sequencing)
|
||||
{
|
||||
sequencing = seq;
|
||||
} else if let Ok(sta) =
|
||||
get_debug_argument_at(&args, ai, "debug_route", "stability", get_stability)
|
||||
{
|
||||
stability = sta;
|
||||
} else if let Ok(hc) =
|
||||
get_debug_argument_at(&args, ai, "debug_route", "hop_count", get_number)
|
||||
{
|
||||
hop_count = hc;
|
||||
} else if let Ok(ds) =
|
||||
get_debug_argument_at(&args, ai, "debug_route", "direction_set", get_direction_set)
|
||||
{
|
||||
directions = ds;
|
||||
} else {
|
||||
return Ok(format!("Invalid argument specified: {}", args[ai]));
|
||||
}
|
||||
ai += 1;
|
||||
}
|
||||
|
||||
// Allocate route
|
||||
let out = match rss.allocate_route(stability, sequencing, hop_count, directions) {
|
||||
Ok(Some(v)) => format!("{}", v.encode()),
|
||||
Ok(None) => format!("<unavailable>"),
|
||||
Err(e) => {
|
||||
format!("Route allocation failed: {}", e)
|
||||
}
|
||||
};
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
async fn debug_route_release(&self, args: Vec<String>) -> Result<String, VeilidAPIError> {
|
||||
// <route id>
|
||||
let netman = self.network_manager()?;
|
||||
let routing_table = netman.routing_table();
|
||||
let rss = routing_table.route_spec_store();
|
||||
|
||||
let route_id = get_debug_argument_at(&args, 1, "debug_route", "route_id", get_dht_key)?;
|
||||
|
||||
// Release route
|
||||
let out = match rss.release_route(route_id) {
|
||||
Ok(()) => format!("Released"),
|
||||
Err(e) => {
|
||||
format!("Route release failed: {}", e)
|
||||
}
|
||||
};
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
async fn debug_route_publish(&self, args: Vec<String>) -> Result<String, VeilidAPIError> {
|
||||
// <route id> [full]
|
||||
let netman = self.network_manager()?;
|
||||
let routing_table = netman.routing_table();
|
||||
let rss = routing_table.route_spec_store();
|
||||
|
||||
let route_id = get_debug_argument_at(&args, 1, "debug_route", "route_id", get_dht_key)?;
|
||||
let full = {
|
||||
if args.len() > 2 {
|
||||
let full_val = get_debug_argument_at(&args, 2, "debug_route", "full", get_string)?
|
||||
.to_ascii_lowercase();
|
||||
if full_val == "full" {
|
||||
true
|
||||
} else {
|
||||
return Err(VeilidAPIError::invalid_argument(
|
||||
"debug_route",
|
||||
"full",
|
||||
full_val,
|
||||
));
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
// Publish route
|
||||
let out = match rss.assemble_private_route(&route_id, Some(!full)) {
|
||||
Ok(private_route) => {
|
||||
if let Err(e) = rss.mark_route_published(&route_id, true) {
|
||||
return Ok(format!("Couldn't mark route published: {}", e));
|
||||
}
|
||||
// Convert to blob
|
||||
let blob_data = RouteSpecStore::private_route_to_blob(&private_route)
|
||||
.map_err(VeilidAPIError::internal)?;
|
||||
data_encoding::BASE64URL_NOPAD.encode(&blob_data)
|
||||
}
|
||||
Err(e) => {
|
||||
format!("Couldn't assemble private route: {}", e)
|
||||
}
|
||||
};
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
async fn debug_route_unpublish(&self, args: Vec<String>) -> Result<String, VeilidAPIError> {
|
||||
// <route id>
|
||||
let netman = self.network_manager()?;
|
||||
let routing_table = netman.routing_table();
|
||||
let rss = routing_table.route_spec_store();
|
||||
|
||||
let route_id = get_debug_argument_at(&args, 1, "debug_route", "route_id", get_dht_key)?;
|
||||
|
||||
// Unpublish route
|
||||
let out = if let Err(e) = rss.mark_route_published(&route_id, false) {
|
||||
return Ok(format!("Couldn't mark route unpublished: {}", e));
|
||||
} else {
|
||||
"Route unpublished".to_owned()
|
||||
};
|
||||
Ok(out)
|
||||
}
|
||||
async fn debug_route_print(&self, args: Vec<String>) -> Result<String, VeilidAPIError> {
|
||||
// <route id>
|
||||
let netman = self.network_manager()?;
|
||||
let routing_table = netman.routing_table();
|
||||
let rss = routing_table.route_spec_store();
|
||||
|
||||
let route_id = get_debug_argument_at(&args, 1, "debug_route", "route_id", get_dht_key)?;
|
||||
|
||||
match rss.debug_route(&route_id) {
|
||||
Some(s) => Ok(s),
|
||||
None => Ok("Route does not exist".to_owned()),
|
||||
}
|
||||
}
|
||||
async fn debug_route_list(&self, _args: Vec<String>) -> Result<String, VeilidAPIError> {
|
||||
//
|
||||
let netman = self.network_manager()?;
|
||||
let routing_table = netman.routing_table();
|
||||
let rss = routing_table.route_spec_store();
|
||||
|
||||
let routes = rss.list_routes();
|
||||
let mut out = format!("Routes: (count = {}):\n", routes.len());
|
||||
for r in routes {
|
||||
out.push_str(&format!("{}\n", r.encode()));
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
async fn debug_route_import(&self, _args: Vec<String>) -> Result<String, VeilidAPIError> {
|
||||
// <blob>
|
||||
let out = format!("");
|
||||
return Ok(out);
|
||||
}
|
||||
|
||||
async fn debug_route(&self, args: String) -> Result<String, VeilidAPIError> {
|
||||
let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect();
|
||||
|
||||
let command = get_debug_argument_at(&args, 0, "debug_route", "command", get_string)?;
|
||||
|
||||
if command == "allocate" {
|
||||
self.debug_route_allocate(args).await
|
||||
} else if command == "release" {
|
||||
self.debug_route_release(args).await
|
||||
} else if command == "publish" {
|
||||
self.debug_route_publish(args).await
|
||||
} else if command == "unpublish" {
|
||||
self.debug_route_unpublish(args).await
|
||||
} else if command == "print" {
|
||||
self.debug_route_print(args).await
|
||||
} else if command == "list" {
|
||||
self.debug_route_list(args).await
|
||||
} else if command == "import" {
|
||||
self.debug_route_import(args).await
|
||||
} else {
|
||||
Ok(">>> Unknown command\n".to_owned())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn debug_help(&self, _args: String) -> Result<String, VeilidAPIError> {
|
||||
Ok(r#">>> Debug commands:
|
||||
help
|
||||
@ -435,6 +691,13 @@ impl VeilidAPI {
|
||||
restart network
|
||||
ping <node_id> [protocol_type][address_type][routing_domain]
|
||||
contact <node_id> [protocol_type [address_type]]
|
||||
route allocate [ord|*ord] [rel] [<count>] [in|out]
|
||||
route release <route id>
|
||||
route publish <route id> [full]
|
||||
route unpublish <route id>
|
||||
route print <route id>
|
||||
route list
|
||||
route import <blob>
|
||||
"#
|
||||
.to_owned())
|
||||
}
|
||||
@ -476,6 +739,8 @@ impl VeilidAPI {
|
||||
self.debug_config(rest).await
|
||||
} else if arg == "restart" {
|
||||
self.debug_restart(rest).await
|
||||
} else if arg == "route" {
|
||||
self.debug_route(rest).await
|
||||
} else {
|
||||
Ok(">>> Unknown command\n".to_owned())
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user