[skip ci] refactor cleanup

This commit is contained in:
Christien Rioux 2025-02-03 14:30:54 -06:00
parent 24f5755d76
commit 1a45dcc769
15 changed files with 131 additions and 186 deletions

View File

@ -1,54 +1,25 @@
use crate::*; use crate::{network_manager::StartupDisposition, *};
use crypto::Crypto; use routing_table::RoutingTableHealth;
use network_manager::*;
use routing_table::*;
use storage_manager::*;
#[derive(Debug)]
struct AttachmentManagerInner { struct AttachmentManagerInner {
last_attachment_state: AttachmentState, last_attachment_state: AttachmentState,
last_routing_table_health: Option<RoutingTableHealth>, last_routing_table_health: Option<RoutingTableHealth>,
maintain_peers: bool, maintain_peers: bool,
started_ts: Timestamp, started_ts: Timestamp,
attach_ts: Option<Timestamp>, attach_ts: Option<Timestamp>,
update_callback: Option<UpdateCallback>,
attachment_maintainer_jh: Option<MustJoinHandle<()>>, attachment_maintainer_jh: Option<MustJoinHandle<()>>,
} }
struct AttachmentManagerUnlockedInner { #[derive(Debug)]
_event_bus: EventBus, pub struct AttachmentManager {
config: VeilidConfig, registry: VeilidComponentRegistry,
network_manager: NetworkManager, inner: Mutex<AttachmentManagerInner>,
} }
#[derive(Clone)] impl_veilid_component!(AttachmentManager);
pub struct AttachmentManager {
inner: Arc<Mutex<AttachmentManagerInner>>,
unlocked_inner: Arc<AttachmentManagerUnlockedInner>,
}
impl AttachmentManager { impl AttachmentManager {
fn new_unlocked_inner(
event_bus: EventBus,
config: VeilidConfig,
storage_manager: StorageManager,
table_store: TableStore,
#[cfg(feature = "unstable-blockstore")] block_store: BlockStore,
crypto: Crypto,
) -> AttachmentManagerUnlockedInner {
AttachmentManagerUnlockedInner {
_event_bus: event_bus.clone(),
config: config.clone(),
network_manager: NetworkManager::new(
event_bus,
config,
storage_manager,
table_store,
#[cfg(feature = "unstable-blockstore")]
block_store,
crypto,
),
}
}
fn new_inner() -> AttachmentManagerInner { fn new_inner() -> AttachmentManagerInner {
AttachmentManagerInner { AttachmentManagerInner {
last_attachment_state: AttachmentState::Detached, last_attachment_state: AttachmentState::Detached,
@ -56,52 +27,29 @@ impl AttachmentManager {
maintain_peers: false, maintain_peers: false,
started_ts: Timestamp::now(), started_ts: Timestamp::now(),
attach_ts: None, attach_ts: None,
update_callback: None,
attachment_maintainer_jh: None, attachment_maintainer_jh: None,
} }
} }
pub fn new( pub fn new(registry: VeilidComponentRegistry) -> Self {
event_bus: EventBus,
config: VeilidConfig,
storage_manager: StorageManager,
table_store: TableStore,
#[cfg(feature = "unstable-blockstore")] block_store: BlockStore,
crypto: Crypto,
) -> Self {
Self { Self {
inner: Arc::new(Mutex::new(Self::new_inner())), registry,
unlocked_inner: Arc::new(Self::new_unlocked_inner( inner: Mutex::new(Self::new_inner()),
event_bus,
config,
storage_manager,
table_store,
#[cfg(feature = "unstable-blockstore")]
block_store,
crypto,
)),
} }
} }
pub fn config(&self) -> VeilidConfig { pub fn is_attached(&self) -> bool {
self.unlocked_inner.config.clone() let s = self.inner.lock().last_attachment_state;
!matches!(s, AttachmentState::Detached | AttachmentState::Detaching)
} }
pub fn network_manager(&self) -> NetworkManager { pub fn is_detached(&self) -> bool {
self.unlocked_inner.network_manager.clone() let s = self.inner.lock().last_attachment_state;
matches!(s, AttachmentState::Detached)
} }
// pub fn is_attached(&self) -> bool { pub fn get_attach_timestamp(&self) -> Option<Timestamp> {
// let s = self.inner.lock().last_attachment_state; self.inner.lock().attach_ts
// !matches!(s, AttachmentState::Detached | AttachmentState::Detaching) }
// }
// pub fn is_detached(&self) -> bool {
// let s = self.inner.lock().last_attachment_state;
// matches!(s, AttachmentState::Detached)
// }
// pub fn get_attach_timestamp(&self) -> Option<Timestamp> {
// self.inner.lock().attach_ts
// }
fn translate_routing_table_health( fn translate_routing_table_health(
health: &RoutingTableHealth, health: &RoutingTableHealth,
@ -155,11 +103,6 @@ impl AttachmentManager {
inner.last_attachment_state = inner.last_attachment_state =
AttachmentManager::translate_routing_table_health(&health, routing_table_config); AttachmentManager::translate_routing_table_health(&health, routing_table_config);
// If we don't have an update callback yet for some reason, just return now
let Some(update_callback) = inner.update_callback.clone() else {
return;
};
// Send update if one of: // Send update if one of:
// * the attachment state has changed // * the attachment state has changed
// * routing domain readiness has changed // * routing domain readiness has changed
@ -172,7 +115,7 @@ impl AttachmentManager {
}) })
.unwrap_or(true); .unwrap_or(true);
if send_update { if send_update {
Some((update_callback, Self::get_veilid_state_inner(&inner))) Some(Self::get_veilid_state_inner(&inner))
} else { } else {
None None
} }
@ -180,15 +123,14 @@ impl AttachmentManager {
// Send the update outside of the lock // Send the update outside of the lock
if let Some(update) = opt_update { if let Some(update) = opt_update {
(update.0)(VeilidUpdate::Attachment(update.1)); (self.update_callback())(VeilidUpdate::Attachment(update));
} }
} }
fn update_attaching_detaching_state(&self, state: AttachmentState) { fn update_attaching_detaching_state(&self, state: AttachmentState) {
let uptime; let uptime;
let attached_uptime; let attached_uptime;
{
let update_callback = {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
// Clear routing table health so when we start measuring it we start from scratch // Clear routing table health so when we start measuring it we start from scratch
@ -211,25 +153,20 @@ impl AttachmentManager {
let now = Timestamp::now(); let now = Timestamp::now();
uptime = now - inner.started_ts; uptime = now - inner.started_ts;
attached_uptime = inner.attach_ts.map(|ts| now - ts); attached_uptime = inner.attach_ts.map(|ts| now - ts);
// Get callback
inner.update_callback.clone()
}; };
// Send update // Send update
if let Some(update_callback) = update_callback { (self.update_callback())(VeilidUpdate::Attachment(Box::new(VeilidStateAttachment {
update_callback(VeilidUpdate::Attachment(Box::new(VeilidStateAttachment { state,
state, public_internet_ready: false,
public_internet_ready: false, local_network_ready: false,
local_network_ready: false, uptime,
uptime, attached_uptime,
attached_uptime, })))
})))
}
} }
#[instrument(parent = None, level = "debug", skip_all)] #[instrument(parent = None, level = "debug", skip_all)]
async fn attachment_maintainer(self) { async fn attachment_maintainer(&self) {
log_net!(debug "attachment starting"); log_net!(debug "attachment starting");
self.update_attaching_detaching_state(AttachmentState::Attaching); self.update_attaching_detaching_state(AttachmentState::Attaching);
@ -313,25 +250,24 @@ impl AttachmentManager {
} }
#[instrument(level = "debug", skip_all, err)] #[instrument(level = "debug", skip_all, err)]
pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> { pub async fn init_async(&self) -> EyreResult<()> {
{ Ok(())
let mut inner = self.inner.lock(); }
inner.update_callback = Some(update_callback.clone());
}
self.network_manager().init(update_callback).await?;
#[instrument(level = "debug", skip_all, err)]
pub async fn post_init_async(&self) -> EyreResult<()> {
Ok(()) Ok(())
} }
#[instrument(level = "debug", skip_all)] #[instrument(level = "debug", skip_all)]
pub async fn terminate(&self) { pub async fn pre_terminate_async(&self) {
// Ensure we detached // Ensure we detached
self.detach().await; self.detach().await;
self.network_manager().terminate().await;
self.inner.lock().update_callback = None;
} }
#[instrument(level = "debug", skip_all)]
pub async fn terminate_async(&self) {}
#[instrument(level = "trace", skip_all)] #[instrument(level = "trace", skip_all)]
pub async fn attach(&self) -> bool { pub async fn attach(&self) -> bool {
// Create long-running connection maintenance routine // Create long-running connection maintenance routine
@ -340,10 +276,11 @@ impl AttachmentManager {
return false; return false;
} }
inner.maintain_peers = true; inner.maintain_peers = true;
inner.attachment_maintainer_jh = Some(spawn( let registry = self.registry();
"attachment maintainer", inner.attachment_maintainer_jh = Some(spawn("attachment maintainer", async move {
self.clone().attachment_maintainer(), let this = registry.attachment_manager();
)); this.attachment_maintainer().await;
}));
true true
} }

View File

@ -43,7 +43,7 @@ struct DiscoveryContextInner {
external_info: Vec<ExternalInfo>, external_info: Vec<ExternalInfo>,
} }
struct DiscoveryContextUnlockedInner { pub(super) struct DiscoveryContextUnlockedInner {
config: DiscoveryContextConfig, config: DiscoveryContextConfig,
// per-protocol // per-protocol

View File

@ -113,7 +113,7 @@ struct NetworkInner {
network_state: Option<NetworkState>, network_state: Option<NetworkState>,
} }
struct NetworkUnlockedInner { pub(super) struct NetworkUnlockedInner {
// Startup lock // Startup lock
startup_lock: StartupLock, startup_lock: StartupLock,

View File

@ -1411,10 +1411,10 @@ pub fn make_closest_noderef_sort<'a>(
} }
} }
pub fn make_closest_node_id_sort<'a>( pub fn make_closest_node_id_sort(
crypto: &'a Crypto, crypto: &Crypto,
node_id: TypedKey, node_id: TypedKey,
) -> impl Fn(&CryptoKey, &CryptoKey) -> core::cmp::Ordering + 'a { ) -> impl Fn(&CryptoKey, &CryptoKey) -> core::cmp::Ordering + '_ {
let kind = node_id.kind; let kind = node_id.kind;
// Get cryptoversion to check distance with // Get cryptoversion to check distance with
let vcrypto = crypto.get(kind).unwrap(); let vcrypto = crypto.get(kind).unwrap();

View File

@ -328,7 +328,7 @@ impl StorageManager {
// If more partial results show up, don't send an update until we're done // If more partial results show up, don't send an update until we're done
return true; return true;
} }
// If we processed the final result, possibly send an update // If we processed the final result, possibly send an update
// if the sequence number changed since our first partial update // if the sequence number changed since our first partial update
// Send with a max count as this is not attached to any watch // Send with a max count as this is not attached to any watch
if last_seq != value_data.seq() { if last_seq != value_data.seq() {
@ -361,7 +361,7 @@ impl StorageManager {
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
Self::process_fanout_results_inner( Self::process_fanout_results_inner(
&mut *inner, &mut inner,
key, key,
core::iter::once((subkey, &result.fanout_result)), core::iter::once((subkey, &result.fanout_result)),
false, false,
@ -372,7 +372,7 @@ impl StorageManager {
// If we got a new value back then write it to the opened record // If we got a new value back then write it to the opened record
if Some(get_result_value.value_data().seq()) != opt_last_seq { if Some(get_result_value.value_data().seq()) != opt_last_seq {
Self::handle_set_local_value_inner( Self::handle_set_local_value_inner(
&mut *inner, &mut inner,
key, key,
subkey, subkey,
get_result_value.clone(), get_result_value.clone(),
@ -397,7 +397,7 @@ impl StorageManager {
let (_is_local, last_get_result) = { let (_is_local, last_get_result) = {
// See if the subkey we are getting has a last known local value // See if the subkey we are getting has a last known local value
let mut last_get_result = let mut last_get_result =
Self::handle_get_local_value_inner(&mut *inner, key, subkey, true).await?; Self::handle_get_local_value_inner(&mut inner, key, subkey, true).await?;
// If this is local, it must have a descriptor already // If this is local, it must have a descriptor already
if last_get_result.opt_descriptor.is_some() { if last_get_result.opt_descriptor.is_some() {
if !want_descriptor { if !want_descriptor {
@ -407,7 +407,7 @@ impl StorageManager {
} else { } else {
// See if the subkey we are getting has a last known remote value // See if the subkey we are getting has a last known remote value
let last_get_result = let last_get_result =
Self::handle_get_remote_value_inner(&mut *inner, key, subkey, want_descriptor) Self::handle_get_remote_value_inner(&mut inner, key, subkey, want_descriptor)
.await?; .await?;
(false, last_get_result) (false, last_get_result)
} }

View File

@ -334,7 +334,7 @@ impl StorageManager {
let (_is_local, inspect_result) = { let (_is_local, inspect_result) = {
// See if the subkey we are getting has a last known local value // See if the subkey we are getting has a last known local value
let mut local_inspect_result = let mut local_inspect_result =
Self::handle_inspect_local_value_inner(&mut *inner, key, subkeys.clone(), true) Self::handle_inspect_local_value_inner(&mut inner, key, subkeys.clone(), true)
.await?; .await?;
// If this is local, it must have a descriptor already // If this is local, it must have a descriptor already
if local_inspect_result.opt_descriptor.is_some() { if local_inspect_result.opt_descriptor.is_some() {
@ -345,7 +345,7 @@ impl StorageManager {
} else { } else {
// See if the subkey we are getting has a last known remote value // See if the subkey we are getting has a last known remote value
let remote_inspect_result = Self::handle_inspect_remote_value_inner( let remote_inspect_result = Self::handle_inspect_remote_value_inner(
&mut *inner, &mut inner,
key, key,
subkeys, subkeys,
want_descriptor, want_descriptor,

View File

@ -231,7 +231,7 @@ impl StorageManager {
inner.metadata_db = Some(metadata_db); inner.metadata_db = Some(metadata_db);
inner.local_record_store = Some(local_record_store); inner.local_record_store = Some(local_record_store);
inner.remote_record_store = Some(remote_record_store); inner.remote_record_store = Some(remote_record_store);
Self::load_metadata(&mut *inner).await?; Self::load_metadata(&mut inner).await?;
} }
// Start deferred results processors // Start deferred results processors
@ -300,7 +300,7 @@ impl StorageManager {
} }
// Save metadata // Save metadata
if let Err(e) = Self::save_metadata(&mut *inner).await { if let Err(e) = Self::save_metadata(&mut inner).await {
log_stor!(error "termination metadata save failed: {}", e); log_stor!(error "termination metadata save failed: {}", e);
} }
@ -411,12 +411,12 @@ impl StorageManager {
// Create a new owned local record from scratch // Create a new owned local record from scratch
let (key, owner) = self let (key, owner) = self
.create_new_owned_local_record_inner(&mut *inner, kind, schema, owner, safety_selection) .create_new_owned_local_record_inner(&mut inner, kind, schema, owner, safety_selection)
.await?; .await?;
// Now that the record is made we should always succeed to open the existing record // Now that the record is made we should always succeed to open the existing record
// The initial writer is the owner of the record // The initial writer is the owner of the record
Self::open_existing_record_inner(&mut *inner, key, Some(owner), safety_selection) Self::open_existing_record_inner(&mut inner, key, Some(owner), safety_selection)
.await .await
.map(|r| r.unwrap()) .map(|r| r.unwrap())
} }
@ -433,7 +433,7 @@ impl StorageManager {
// See if we have a local record already or not // See if we have a local record already or not
if let Some(res) = if let Some(res) =
Self::open_existing_record_inner(&mut *inner, key, writer, safety_selection).await? Self::open_existing_record_inner(&mut inner, key, writer, safety_selection).await?
{ {
return Ok(res); return Ok(res);
} }
@ -478,14 +478,14 @@ impl StorageManager {
// via some parallel process // via some parallel process
if let Some(res) = if let Some(res) =
Self::open_existing_record_inner(&mut *inner, key, writer, safety_selection).await? Self::open_existing_record_inner(&mut inner, key, writer, safety_selection).await?
{ {
return Ok(res); return Ok(res);
} }
// Open the new record // Open the new record
Self::open_new_record_inner( Self::open_new_record_inner(
&mut *inner, &mut inner,
key, key,
writer, writer,
subkey, subkey,
@ -509,7 +509,7 @@ impl StorageManager {
// Attempt to close the record, returning the opened record if it wasn't already closed // Attempt to close the record, returning the opened record if it wasn't already closed
let opened_record = { let opened_record = {
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
let Some(opened_record) = Self::close_record_inner(&mut *inner, key)? else { let Some(opened_record) = Self::close_record_inner(&mut inner, key)? else {
return Ok(()); return Ok(());
}; };
opened_record opened_record
@ -593,7 +593,7 @@ impl StorageManager {
// See if the requested subkey is our local record store // See if the requested subkey is our local record store
let last_get_result = let last_get_result =
Self::handle_get_local_value_inner(&mut *inner, key, subkey, true).await?; Self::handle_get_local_value_inner(&mut inner, key, subkey, true).await?;
// Return the existing value if we have one unless we are forcing a refresh // Return the existing value if we have one unless we are forcing a refresh
if !force_refresh { if !force_refresh {
@ -683,7 +683,7 @@ impl StorageManager {
// See if the subkey we are modifying has a last known local value // See if the subkey we are modifying has a last known local value
let last_get_result = let last_get_result =
Self::handle_get_local_value_inner(&mut *inner, key, subkey, true).await?; Self::handle_get_local_value_inner(&mut inner, key, subkey, true).await?;
// Get the descriptor and schema for the key // Get the descriptor and schema for the key
let Some(descriptor) = last_get_result.opt_descriptor else { let Some(descriptor) = last_get_result.opt_descriptor else {
@ -724,7 +724,7 @@ impl StorageManager {
// Write the value locally first // Write the value locally first
log_stor!(debug "Writing subkey locally: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() ); log_stor!(debug "Writing subkey locally: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() );
Self::handle_set_local_value_inner( Self::handle_set_local_value_inner(
&mut *inner, &mut inner,
key, key,
subkey, subkey,
signed_value_data.clone(), signed_value_data.clone(),
@ -735,7 +735,7 @@ impl StorageManager {
if !self.dht_is_online() { if !self.dht_is_online() {
log_stor!(debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() ); log_stor!(debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() );
// Add to offline writes to flush // Add to offline writes to flush
Self::add_offline_subkey_write_inner(&mut *inner, key, subkey, safety_selection); Self::add_offline_subkey_write_inner(&mut inner, key, subkey, safety_selection);
return Ok(None); return Ok(None);
}; };
@ -759,7 +759,7 @@ impl StorageManager {
Err(e) => { Err(e) => {
// Failed to write, try again later // Failed to write, try again later
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
Self::add_offline_subkey_write_inner(&mut *inner, key, subkey, safety_selection); Self::add_offline_subkey_write_inner(&mut inner, key, subkey, safety_selection);
return Err(e); return Err(e);
} }
}; };
@ -1001,7 +1001,7 @@ impl StorageManager {
// See if the requested record is our local record store // See if the requested record is our local record store
let mut local_inspect_result = let mut local_inspect_result =
Self::handle_inspect_local_value_inner(&mut *inner, key, subkeys.clone(), true).await?; Self::handle_inspect_local_value_inner(&mut inner, key, subkeys.clone(), true).await?;
#[allow(clippy::unnecessary_cast)] #[allow(clippy::unnecessary_cast)]
{ {
@ -1089,7 +1089,7 @@ impl StorageManager {
.zip(result.fanout_results.iter()); .zip(result.fanout_results.iter());
Self::process_fanout_results_inner( Self::process_fanout_results_inner(
&mut *inner, &mut inner,
key, key,
results_iter, results_iter,
false, false,
@ -1527,7 +1527,7 @@ impl StorageManager {
}); });
} }
pub fn close_record_inner( fn close_record_inner(
inner: &mut StorageManagerInner, inner: &mut StorageManagerInner,
key: TypedKey, key: TypedKey,
) -> VeilidAPIResult<Option<OpenedRecord>> { ) -> VeilidAPIResult<Option<OpenedRecord>> {
@ -1542,7 +1542,7 @@ impl StorageManager {
} }
#[instrument(level = "trace", target = "stor", skip_all, err)] #[instrument(level = "trace", target = "stor", skip_all, err)]
pub(super) async fn handle_get_local_value_inner( async fn handle_get_local_value_inner(
inner: &mut StorageManagerInner, inner: &mut StorageManagerInner,
key: TypedKey, key: TypedKey,
subkey: ValueSubkey, subkey: ValueSubkey,

View File

@ -374,12 +374,12 @@ impl StorageManager {
let was_offline = self.check_fanout_set_offline(key, subkey, &result.fanout_result); let was_offline = self.check_fanout_set_offline(key, subkey, &result.fanout_result);
if was_offline { if was_offline {
// Failed to write, try again later // Failed to write, try again later
Self::add_offline_subkey_write_inner(&mut *inner, key, subkey, safety_selection); Self::add_offline_subkey_write_inner(&mut inner, key, subkey, safety_selection);
} }
// Keep the list of nodes that returned a value for later reference // Keep the list of nodes that returned a value for later reference
Self::process_fanout_results_inner( Self::process_fanout_results_inner(
&mut *inner, &mut inner,
key, key,
core::iter::once((subkey, &result.fanout_result)), core::iter::once((subkey, &result.fanout_result)),
true, true,
@ -392,7 +392,7 @@ impl StorageManager {
// Record the newer value and send and update since it is different than what we just set // Record the newer value and send and update since it is different than what we just set
Self::handle_set_local_value_inner( Self::handle_set_local_value_inner(
&mut *inner, &mut inner,
key, key,
subkey, subkey,
result.signed_value_data.clone(), result.signed_value_data.clone(),
@ -425,14 +425,14 @@ impl StorageManager {
let (is_local, last_get_result) = { let (is_local, last_get_result) = {
// See if the subkey we are modifying has a last known local value // See if the subkey we are modifying has a last known local value
let last_get_result = let last_get_result =
Self::handle_get_local_value_inner(&mut *inner, key, subkey, true).await?; Self::handle_get_local_value_inner(&mut inner, key, subkey, true).await?;
// If this is local, it must have a descriptor already // If this is local, it must have a descriptor already
if last_get_result.opt_descriptor.is_some() { if last_get_result.opt_descriptor.is_some() {
(true, last_get_result) (true, last_get_result)
} else { } else {
// See if the subkey we are modifying has a last known remote value // See if the subkey we are modifying has a last known remote value
let last_get_result = let last_get_result =
Self::handle_get_remote_value_inner(&mut *inner, key, subkey, true).await?; Self::handle_get_remote_value_inner(&mut inner, key, subkey, true).await?;
(false, last_get_result) (false, last_get_result)
} }
}; };
@ -493,7 +493,7 @@ impl StorageManager {
// Do the set and return no new value // Do the set and return no new value
let res = if is_local { let res = if is_local {
Self::handle_set_local_value_inner( Self::handle_set_local_value_inner(
&mut *inner, &mut inner,
key, key,
subkey, subkey,
value, value,
@ -502,7 +502,7 @@ impl StorageManager {
.await .await
} else { } else {
Self::handle_set_remote_value_inner( Self::handle_set_remote_value_inner(
&mut *inner, &mut inner,
key, key,
subkey, subkey,
value, value,

View File

@ -47,7 +47,7 @@ impl StorageManager {
}; };
let get_result = { let get_result = {
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
Self::handle_get_local_value_inner(&mut *inner, key, subkey, true).await Self::handle_get_local_value_inner(&mut inner, key, subkey, true).await
}; };
let Ok(get_result) = get_result else { let Ok(get_result) = get_result else {
log_stor!(debug "Offline subkey write had no subkey result: {}:{}", key, subkey); log_stor!(debug "Offline subkey write had no subkey result: {}:{}", key, subkey);
@ -84,7 +84,7 @@ impl StorageManager {
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
Self::handle_set_local_value_inner( Self::handle_set_local_value_inner(
&mut *inner, &mut inner,
key, key,
subkey, subkey,
result.signed_value_data.clone(), result.signed_value_data.clone(),
@ -217,7 +217,7 @@ impl StorageManager {
// Keep the list of nodes that returned a value for later reference // Keep the list of nodes that returned a value for later reference
Self::process_fanout_results_inner( Self::process_fanout_results_inner(
&mut *inner, &mut inner,
result.key, result.key,
result.fanout_results.iter().map(|x| (x.0, &x.1)), result.fanout_results.iter().map(|x| (x.0, &x.1)),
true, true,

View File

@ -466,8 +466,7 @@ impl StorageManager {
}; };
let last_get_result = let last_get_result =
Self::handle_get_local_value_inner(&mut *inner, key, first_subkey, true) Self::handle_get_local_value_inner(&mut inner, key, first_subkey, true).await?;
.await?;
let descriptor = last_get_result.opt_descriptor.unwrap(); let descriptor = last_get_result.opt_descriptor.unwrap();
let schema = descriptor.schema()?; let schema = descriptor.schema()?;
@ -496,7 +495,7 @@ impl StorageManager {
} }
if is_value_seq_newer { if is_value_seq_newer {
Self::handle_set_local_value_inner( Self::handle_set_local_value_inner(
&mut *inner, &mut inner,
key, key,
first_subkey, first_subkey,
value.clone(), value.clone(),

View File

@ -298,16 +298,13 @@ pub fn config_callback(key: String) -> ConfigCallbackReturn {
} }
pub fn get_config() -> VeilidConfig { pub fn get_config() -> VeilidConfig {
let vc = match VeilidConfig::new_from_callback(Arc::new(config_callback), Arc::new(update_callback)) {
match VeilidConfig::new_from_callback(Arc::new(config_callback), Arc::new(update_callback)) Ok(vc) => vc,
{ Err(e) => {
Ok(vc) => vc, error!("Error: {}", e);
Err(e) => { unreachable!();
error!("Error: {}", e); }
unreachable!(); }
}
};
vc
} }
pub async fn test_config() { pub async fn test_config() {

View File

@ -224,7 +224,7 @@ impl VeilidAPI {
// Is this a route id? // Is this a route id?
if let Ok(rrid) = RouteId::from_str(&s) { if let Ok(rrid) = RouteId::from_str(&s) {
let routing_table = self.routing_table()?; let routing_table = self.core_context()?.routing_table();
let rss = routing_table.route_spec_store(); let rss = routing_table.route_spec_store();
// Is this a valid remote route id? (can't target allocated routes) // Is this a valid remote route id? (can't target allocated routes)
@ -299,7 +299,8 @@ impl VeilidAPI {
sequencing, sequencing,
}; };
let rss = self.routing_table()?.route_spec_store(); let routing_table = self.core_context()?.routing_table();
let rss = routing_table.route_spec_store();
let route_id = let route_id =
rss.allocate_route(crypto_kinds, &safety_spec, DirectionSet::all(), &[], false)?; rss.allocate_route(crypto_kinds, &safety_spec, DirectionSet::all(), &[], false)?;
match rss.test_route(route_id).await? { match rss.test_route(route_id).await? {
@ -336,7 +337,8 @@ impl VeilidAPI {
pub fn import_remote_private_route(&self, blob: Vec<u8>) -> VeilidAPIResult<RouteId> { pub fn import_remote_private_route(&self, blob: Vec<u8>) -> VeilidAPIResult<RouteId> {
event!(target: "veilid_api", Level::DEBUG, event!(target: "veilid_api", Level::DEBUG,
"VeilidAPI::import_remote_private_route(blob: {:?})", blob); "VeilidAPI::import_remote_private_route(blob: {:?})", blob);
let rss = self.routing_table()?.route_spec_store(); let routing_table = self.core_context()?.routing_table();
let rss = routing_table.route_spec_store();
rss.import_remote_private_route_blob(blob) rss.import_remote_private_route_blob(blob)
} }
@ -348,7 +350,8 @@ impl VeilidAPI {
pub fn release_private_route(&self, route_id: RouteId) -> VeilidAPIResult<()> { pub fn release_private_route(&self, route_id: RouteId) -> VeilidAPIResult<()> {
event!(target: "veilid_api", Level::DEBUG, event!(target: "veilid_api", Level::DEBUG,
"VeilidAPI::release_private_route(route_id: {:?})", route_id); "VeilidAPI::release_private_route(route_id: {:?})", route_id);
let rss = self.routing_table()?.route_spec_store(); let routing_table = self.core_context()?.routing_table();
let rss = routing_table.route_spec_store();
if !rss.release_route(route_id) { if !rss.release_route(route_id) {
apibail_invalid_argument!("release_private_route", "key", route_id); apibail_invalid_argument!("release_private_route", "key", route_id);
} }
@ -371,7 +374,7 @@ impl VeilidAPI {
event!(target: "veilid_api", Level::DEBUG, event!(target: "veilid_api", Level::DEBUG,
"VeilidAPI::app_call_reply(call_id: {:?}, message: {:?})", call_id, message); "VeilidAPI::app_call_reply(call_id: {:?}, message: {:?})", call_id, message);
let rpc_processor = self.rpc_processor()?; let rpc_processor = self.core_context()?.rpc_processor();
rpc_processor rpc_processor
.app_call_reply(call_id, message) .app_call_reply(call_id, message)
.map_err(|e| e.into()) .map_err(|e| e.into())

View File

@ -190,7 +190,7 @@ impl JsonRequestProcessor {
} }
fn lookup_crypto_system(&self, id: u32, cs_id: u32) -> Result<CryptoKind, Response> { fn lookup_crypto_system(&self, id: u32, cs_id: u32) -> Result<CryptoKind, Response> {
let inner = self.inner.lock(); let inner = self.inner.lock();
let Some(crypto_system) = inner.crypto_kinds.get(&cs_id).cloned() else { let Some(crypto_kind) = inner.crypto_kinds.get(&cs_id).cloned() else {
return Err(Response { return Err(Response {
id, id,
op: ResponseOp::CryptoSystem(CryptoSystemResponse { op: ResponseOp::CryptoSystem(CryptoSystemResponse {
@ -199,7 +199,7 @@ impl JsonRequestProcessor {
}), }),
}); });
}; };
Ok(crypto_system) Ok(crypto_kind)
} }
fn release_crypto_system(&self, id: u32) -> i32 { fn release_crypto_system(&self, id: u32) -> i32 {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
@ -467,10 +467,9 @@ impl JsonRequestProcessor {
#[instrument(level = "trace", target = "json_api", skip_all)] #[instrument(level = "trace", target = "json_api", skip_all)]
pub async fn process_crypto_system_request( pub async fn process_crypto_system_request(
&self, &self,
kind: CryptoKind, csv: &CryptoSystemGuard<'_>,
csr: CryptoSystemRequest, csr: CryptoSystemRequest,
) -> CryptoSystemResponse { ) -> CryptoSystemResponse {
xxx continue here
let cs_op = match csr.cs_op { let cs_op = match csr.cs_op {
CryptoSystemRequestOp::Release => { CryptoSystemRequestOp::Release => {
self.release_crypto_system(csr.cs_id); self.release_crypto_system(csr.cs_id);
@ -692,7 +691,7 @@ impl JsonRequestProcessor {
Err(e) => { Err(e) => {
return Response { return Response {
id, id,
op: ResponseOp::OpenTableDb { op: ResponseOp::DeleteTableDb {
result: to_json_api_result(Err(e)), result: to_json_api_result(Err(e)),
}, },
} }
@ -742,11 +741,31 @@ impl JsonRequestProcessor {
kind, kind,
) )
}) })
.map(|csv| self.add_crypto_system(csv)), .map(|csv| self.add_crypto_system(csv.kind())),
), ),
} }
} }
RequestOp::BestCryptoSystem => { RequestOp::BestCryptoSystem => {
let crypto = match self.api.crypto() {
Ok(v) => v,
Err(e) => {
return Response {
id,
op: ResponseOp::BestCryptoSystem {
result: to_json_api_result(Err(e)),
},
}
}
};
ResponseOp::BestCryptoSystem {
result: to_json_api_result(Ok(self.add_crypto_system(crypto.best().kind()))),
}
}
RequestOp::CryptoSystem(csr) => {
let crypto_kind = match self.lookup_crypto_system(id, csr.cs_id) {
Ok(v) => v,
Err(e) => return e,
};
let crypto = match self.api.crypto() { let crypto = match self.api.crypto() {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
@ -758,16 +777,9 @@ impl JsonRequestProcessor {
} }
} }
}; };
ResponseOp::BestCryptoSystem { let csv = crypto.get(crypto_kind).unwrap();
result: to_json_api_result(Ok(self.add_crypto_system(crypto.best()))),
} ResponseOp::CryptoSystem(self.process_crypto_system_request(&csv, csr).await)
}
RequestOp::CryptoSystem(csr) => {
let csv = match self.lookup_crypto_system(id, csr.cs_id) {
Ok(v) => v,
Err(e) => return e,
};
ResponseOp::CryptoSystem(self.process_crypto_system_request(csv, csr).await)
} }
RequestOp::VerifySignatures { RequestOp::VerifySignatures {
node_ids, node_ids,

View File

@ -23,11 +23,8 @@ pub use types::*;
use crate::*; use crate::*;
use attachment_manager::AttachmentManager;
use core_context::{api_shutdown, VeilidCoreContext}; use core_context::{api_shutdown, VeilidCoreContext};
use network_manager::NetworkManager; use routing_table::{DirectionSet, RouteSpecStore};
use routing_table::{DirectionSet, RouteSpecStore, RoutingTable};
use rpc_processor::*; use rpc_processor::*;
use storage_manager::StorageManager;
///////////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////////

View File

@ -1057,7 +1057,7 @@ impl VeilidConfig {
F: FnOnce(&VeilidConfigInner) -> R, F: FnOnce(&VeilidConfigInner) -> R,
{ {
let inner = self.inner.read(); let inner = self.inner.read();
f(&*inner) f(&inner)
} }
pub fn try_with_mut<F, R>(&self, f: F) -> VeilidAPIResult<R> pub fn try_with_mut<F, R>(&self, f: F) -> VeilidAPIResult<R>