mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-03-14 01:46:41 -04:00
cleanup api, fix hang
This commit is contained in:
parent
5a60ceaf97
commit
ebe16cd5da
14
.vscode/launch.json
vendored
14
.vscode/launch.json
vendored
@ -8,7 +8,7 @@
|
||||
"type": "lldb",
|
||||
"request": "attach",
|
||||
"name": "Attach to veilid-server",
|
||||
"program": "${workspaceFolder}/veilid-server/target/debug/veilid-server",
|
||||
"program": "${workspaceFolder}/target/debug/veilid-server",
|
||||
"pid": "${command:pickMyProcess}"
|
||||
},
|
||||
{
|
||||
@ -16,11 +16,11 @@
|
||||
"request": "launch",
|
||||
"name": "Launch veilid-cli",
|
||||
"args": ["--debug"],
|
||||
"program": "${workspaceFolder}/veilid-cli/target/debug/veilid-cli",
|
||||
"program": "${workspaceFolder}/target/debug/veilid-cli",
|
||||
"windows": {
|
||||
"program": "${workspaceFolder}/veilid-cli/target/debug/veilid-cli.exe"
|
||||
"program": "${workspaceFolder}/target/debug/veilid-cli.exe"
|
||||
},
|
||||
"cwd": "${workspaceFolder}",
|
||||
"cwd": "${workspaceFolder}/target/debug/",
|
||||
"sourceLanguages": ["rust"],
|
||||
"terminal": "console"
|
||||
},
|
||||
@ -39,9 +39,9 @@
|
||||
"type": "lldb",
|
||||
"request": "launch",
|
||||
"name": "Debug veilid-server",
|
||||
"program": "${workspaceFolder}/veilid-server/target/debug/veilid-server",
|
||||
"program": "${workspaceFolder}/target/debug/veilid-server",
|
||||
"args": ["--trace", "--attach=true"],
|
||||
"cwd": "${workspaceFolder}/veilid-server/target/debug/",
|
||||
"cwd": "${workspaceFolder}/target/debug/",
|
||||
"env": {
|
||||
"RUST_BACKTRACE": "1"
|
||||
},
|
||||
@ -65,7 +65,7 @@
|
||||
}
|
||||
},
|
||||
"args": ["${selectedText}"],
|
||||
"cwd": "${workspaceFolder}/veilid-core"
|
||||
"cwd": "${workspaceFolder}/target/debug/"
|
||||
},
|
||||
|
||||
{
|
||||
|
2
external/keyvaluedb
vendored
2
external/keyvaluedb
vendored
@ -1 +1 @@
|
||||
Subproject commit 80d4ddffdbf85b6cbcc3cecf96ee536807e48665
|
||||
Subproject commit 27f4defdca5f12b3ef6917cf4698181b3df0026e
|
@ -126,11 +126,7 @@ impl AttachmentManager {
|
||||
table_store: table_store.clone(),
|
||||
crypto: crypto.clone(),
|
||||
attachment_machine: CallbackStateMachine::new(),
|
||||
network_manager: NetworkManager::new(
|
||||
config,
|
||||
table_store,
|
||||
crypto,
|
||||
),
|
||||
network_manager: NetworkManager::new(config, table_store, crypto),
|
||||
maintain_peers: false,
|
||||
peer_count: 0,
|
||||
attach_timestamp: None,
|
||||
|
@ -22,11 +22,12 @@ pub async fn test_attach_detach() {
|
||||
.startup(setup_veilid_core())
|
||||
.await
|
||||
.expect("startup failed");
|
||||
api.attach().await;
|
||||
api.attach().await.unwrap();
|
||||
intf::sleep(5000).await;
|
||||
api.detach().await;
|
||||
api.detach().await.unwrap();
|
||||
api.wait_for_state(VeilidState::Attachment(AttachmentState::Detached))
|
||||
.await;
|
||||
.await
|
||||
.unwrap();
|
||||
api.shutdown().await;
|
||||
|
||||
info!("--- test auto detach ---");
|
||||
@ -34,7 +35,7 @@ pub async fn test_attach_detach() {
|
||||
.startup(setup_veilid_core())
|
||||
.await
|
||||
.expect("startup failed");
|
||||
api.attach().await;
|
||||
api.attach().await.unwrap();
|
||||
intf::sleep(5000).await;
|
||||
api.shutdown().await;
|
||||
|
||||
@ -43,7 +44,7 @@ pub async fn test_attach_detach() {
|
||||
.startup(setup_veilid_core())
|
||||
.await
|
||||
.expect("startup failed");
|
||||
api.detach().await;
|
||||
api.detach().await.unwrap();
|
||||
api.shutdown().await;
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
pub use crate::rpc_processor::InfoAnswer;
|
||||
use crate::*;
|
||||
use attachment_manager::AttachmentManager;
|
||||
use core::fmt;
|
||||
use network_manager::NetworkManager;
|
||||
use rpc_processor::{RPCError, RPCProcessor};
|
||||
use xx::*;
|
||||
@ -876,119 +877,140 @@ impl RoutingContext {
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
struct VeilidAPIInner {
|
||||
config: VeilidConfig,
|
||||
attachment_manager: AttachmentManager,
|
||||
core: VeilidCore,
|
||||
network_manager: NetworkManager,
|
||||
is_shutdown: bool,
|
||||
core: Option<VeilidCore>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for VeilidAPIInner {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"VeilidAPIInner: {}",
|
||||
match self.core {
|
||||
Some(_) => "active",
|
||||
None => "shutdown",
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for VeilidAPIInner {
|
||||
fn drop(&mut self) {
|
||||
if !self.is_shutdown {
|
||||
intf::spawn_local(self.core.clone().internal_shutdown()).detach();
|
||||
if let Some(core) = self.core.take() {
|
||||
intf::spawn_local(core.internal_shutdown()).detach();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct VeilidAPI {
|
||||
inner: Arc<Mutex<VeilidAPIInner>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct VeilidAPIWeak {
|
||||
inner: Weak<Mutex<VeilidAPIInner>>,
|
||||
}
|
||||
|
||||
impl VeilidAPIWeak {
|
||||
pub fn upgrade(&self) -> Option<VeilidAPI> {
|
||||
self.inner.upgrade().map(|v| VeilidAPI { inner: v })
|
||||
}
|
||||
}
|
||||
|
||||
impl VeilidAPI {
|
||||
pub fn new(attachment_manager: AttachmentManager, core: VeilidCore) -> Self {
|
||||
pub(crate) fn new(core: VeilidCore) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Mutex::new(VeilidAPIInner {
|
||||
config: attachment_manager.config(),
|
||||
attachment_manager: attachment_manager.clone(),
|
||||
core,
|
||||
network_manager: attachment_manager.network_manager(),
|
||||
is_shutdown: false,
|
||||
})),
|
||||
inner: Arc::new(Mutex::new(VeilidAPIInner { core: Some(core) })),
|
||||
}
|
||||
}
|
||||
pub fn weak(&self) -> VeilidAPIWeak {
|
||||
VeilidAPIWeak {
|
||||
inner: Arc::downgrade(&self.inner),
|
||||
}
|
||||
}
|
||||
fn core(&self) -> Result<VeilidCore, VeilidAPIError> {
|
||||
Ok(self
|
||||
.inner
|
||||
.lock()
|
||||
.core
|
||||
.as_ref()
|
||||
.ok_or(VeilidAPIError::Shutdown)?
|
||||
.clone())
|
||||
}
|
||||
fn config(&self) -> Result<VeilidConfig, VeilidAPIError> {
|
||||
Ok(self.core()?.config())
|
||||
}
|
||||
fn attachment_manager(&self) -> Result<AttachmentManager, VeilidAPIError> {
|
||||
Ok(self.core()?.attachment_manager())
|
||||
}
|
||||
fn network_manager(&self) -> Result<NetworkManager, VeilidAPIError> {
|
||||
Ok(self.attachment_manager()?.network_manager())
|
||||
}
|
||||
fn rpc_processor(&self) -> Result<RPCProcessor, VeilidAPIError> {
|
||||
Ok(self.network_manager()?.rpc_processor())
|
||||
}
|
||||
|
||||
pub async fn shutdown(self) {
|
||||
let core = { self.inner.lock().core.take() };
|
||||
if let Some(core) = core {
|
||||
core.internal_shutdown().await;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn config(&self) -> VeilidConfig {
|
||||
self.inner.lock().config.clone()
|
||||
}
|
||||
|
||||
fn attachment_manager(&self) -> AttachmentManager {
|
||||
self.inner.lock().attachment_manager.clone()
|
||||
}
|
||||
|
||||
// fn network_manager(&self) -> NetworkManager {
|
||||
// self.inner.lock().network_manager.clone()
|
||||
// }
|
||||
|
||||
fn rpc_processor(&self) -> RPCProcessor {
|
||||
self.inner.lock().network_manager.rpc_processor()
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) {
|
||||
let mut inner = self.inner.lock();
|
||||
if !inner.is_shutdown {
|
||||
inner.core.clone().internal_shutdown().await;
|
||||
inner.is_shutdown = true;
|
||||
}
|
||||
}
|
||||
pub fn is_shutdown(&self) -> bool {
|
||||
self.inner.lock().is_shutdown
|
||||
}
|
||||
|
||||
fn verify_not_shutdown(&self) -> Result<(), VeilidAPIError> {
|
||||
if self.is_shutdown() {
|
||||
return Err(VeilidAPIError::Shutdown);
|
||||
}
|
||||
Ok(())
|
||||
self.inner.lock().core.is_none()
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////
|
||||
// Attach/Detach
|
||||
|
||||
// issue state changed updates for updating clients
|
||||
pub async fn send_state_update(&self) {
|
||||
pub async fn send_state_update(&self) -> Result<(), VeilidAPIError> {
|
||||
trace!("VeilidCore::send_state_update");
|
||||
let attachment_manager = self.attachment_manager().clone();
|
||||
let attachment_manager = self.attachment_manager()?;
|
||||
attachment_manager.send_state_update().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// connect to the network
|
||||
pub async fn attach(&self) {
|
||||
pub async fn attach(&self) -> Result<(), VeilidAPIError> {
|
||||
trace!("VeilidCore::attach");
|
||||
let attachment_manager = self.attachment_manager().clone();
|
||||
let attachment_manager = self.attachment_manager()?;
|
||||
attachment_manager.request_attach().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// disconnect from the network
|
||||
pub async fn detach(&self) {
|
||||
pub async fn detach(&self) -> Result<(), VeilidAPIError> {
|
||||
trace!("VeilidCore::detach");
|
||||
let attachment_manager = self.attachment_manager().clone();
|
||||
let attachment_manager = self.attachment_manager()?;
|
||||
attachment_manager.request_detach().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// wait for state change
|
||||
// xxx: this should not use 'sleep', perhaps this function should be eliminated anyway
|
||||
pub async fn wait_for_state(&self, state: VeilidState) {
|
||||
// xxx: it should really only be used for test anyway, and there is probably a better way to do this regardless
|
||||
// xxx: that doesn't wait forever and can time out
|
||||
pub async fn wait_for_state(&self, state: VeilidState) -> Result<(), VeilidAPIError> {
|
||||
loop {
|
||||
intf::sleep(500).await;
|
||||
match state {
|
||||
VeilidState::Attachment(cs) => {
|
||||
if self.attachment_manager().get_state() == cs {
|
||||
if self.attachment_manager()?.get_state() == cs {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////
|
||||
// Direct Node Access (pretty much for testing only)
|
||||
|
||||
pub async fn info(&self, node_id: NodeId) -> Result<InfoAnswer, VeilidAPIError> {
|
||||
self.verify_not_shutdown()?;
|
||||
|
||||
let rpc = self.rpc_processor();
|
||||
let rpc = self.rpc_processor()?;
|
||||
let routing_table = rpc.routing_table();
|
||||
let node_ref = match routing_table.lookup_node_ref(node_id.key) {
|
||||
None => return Err(VeilidAPIError::NodeNotFound(node_id)),
|
||||
@ -1008,9 +1030,7 @@ impl VeilidAPI {
|
||||
redirect: bool,
|
||||
alternate_port: bool,
|
||||
) -> Result<bool, VeilidAPIError> {
|
||||
self.verify_not_shutdown()?;
|
||||
|
||||
let rpc = self.rpc_processor();
|
||||
let rpc = self.rpc_processor()?;
|
||||
let routing_table = rpc.routing_table();
|
||||
let node_ref = match routing_table.lookup_node_ref(node_id.key) {
|
||||
None => return Err(VeilidAPIError::NodeNotFound(node_id)),
|
||||
@ -1022,9 +1042,8 @@ impl VeilidAPI {
|
||||
}
|
||||
|
||||
pub async fn search_dht(&self, node_id: NodeId) -> Result<SearchDHTAnswer, VeilidAPIError> {
|
||||
self.verify_not_shutdown()?;
|
||||
let rpc_processor = self.rpc_processor();
|
||||
let config = self.config();
|
||||
let rpc_processor = self.rpc_processor()?;
|
||||
let config = self.config()?;
|
||||
let (count, fanout, timeout) = {
|
||||
let c = config.get();
|
||||
(
|
||||
@ -1051,10 +1070,8 @@ impl VeilidAPI {
|
||||
&self,
|
||||
node_id: NodeId,
|
||||
) -> Result<Vec<SearchDHTAnswer>, VeilidAPIError> {
|
||||
self.verify_not_shutdown()?;
|
||||
|
||||
let rpc_processor = self.rpc_processor();
|
||||
let config = self.config();
|
||||
let rpc_processor = self.rpc_processor()?;
|
||||
let config = self.config()?;
|
||||
let (count, fanout, timeout) = {
|
||||
let c = config.get();
|
||||
(
|
||||
@ -1151,7 +1168,6 @@ impl VeilidAPI {
|
||||
_endpoint_mode: TunnelMode,
|
||||
_depth: u8,
|
||||
) -> Result<PartialTunnel, VeilidAPIError> {
|
||||
self.verify_not_shutdown()?;
|
||||
panic!("unimplemented");
|
||||
}
|
||||
|
||||
@ -1161,12 +1177,10 @@ impl VeilidAPI {
|
||||
_depth: u8,
|
||||
_partial_tunnel: PartialTunnel,
|
||||
) -> Result<FullTunnel, VeilidAPIError> {
|
||||
self.verify_not_shutdown()?;
|
||||
panic!("unimplemented");
|
||||
}
|
||||
|
||||
pub async fn cancel_tunnel(&self, _tunnel_id: TunnelId) -> Result<bool, VeilidAPIError> {
|
||||
self.verify_not_shutdown()?;
|
||||
panic!("unimplemented");
|
||||
}
|
||||
}
|
||||
|
@ -36,6 +36,7 @@ struct VeilidCoreInner {
|
||||
table_store: Option<TableStore>,
|
||||
crypto: Option<Crypto>,
|
||||
attachment_manager: Option<AttachmentManager>,
|
||||
api: VeilidAPIWeak,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@ -56,6 +57,7 @@ impl VeilidCore {
|
||||
table_store: None,
|
||||
crypto: None,
|
||||
attachment_manager: None,
|
||||
api: VeilidAPIWeak::default(),
|
||||
}
|
||||
}
|
||||
pub fn new() -> Self {
|
||||
@ -64,18 +66,9 @@ impl VeilidCore {
|
||||
}
|
||||
}
|
||||
|
||||
// pub(crate) fn config(&self) -> VeilidConfig {
|
||||
// self.inner.lock().config.as_ref().unwrap().clone()
|
||||
// }
|
||||
|
||||
// pub(crate) fn attachment_manager(&self) -> AttachmentManager {
|
||||
// self.inner
|
||||
// .lock()
|
||||
// .attachment_manager
|
||||
// .as_ref()
|
||||
// .unwrap()
|
||||
// .clone()
|
||||
// }
|
||||
pub(crate) fn config(&self) -> VeilidConfig {
|
||||
self.inner.lock().config.as_ref().unwrap().clone()
|
||||
}
|
||||
|
||||
pub(crate) fn table_store(&self) -> TableStore {
|
||||
self.inner.lock().table_store.as_ref().unwrap().clone()
|
||||
@ -85,8 +78,21 @@ impl VeilidCore {
|
||||
self.inner.lock().crypto.as_ref().unwrap().clone()
|
||||
}
|
||||
|
||||
pub(crate) fn attachment_manager(&self) -> AttachmentManager {
|
||||
self.inner
|
||||
.lock()
|
||||
.attachment_manager
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.clone()
|
||||
}
|
||||
|
||||
// internal startup
|
||||
async fn internal_startup(&self, setup: VeilidCoreSetup) -> Result<VeilidAPI, String> {
|
||||
async fn internal_startup(
|
||||
&self,
|
||||
inner: &mut VeilidCoreInner,
|
||||
setup: VeilidCoreSetup,
|
||||
) -> Result<VeilidAPI, String> {
|
||||
trace!("VeilidCore::internal_startup starting");
|
||||
|
||||
cfg_if! {
|
||||
@ -102,19 +108,19 @@ impl VeilidCore {
|
||||
trace!("VeilidCore::internal_startup init config");
|
||||
let mut config = VeilidConfig::new();
|
||||
config.init(setup.config_callback).await?;
|
||||
self.inner.lock().config = Some(config.clone());
|
||||
inner.config = Some(config.clone());
|
||||
|
||||
// Set up tablestore
|
||||
trace!("VeilidCore::internal_startup init tablestore");
|
||||
let table_store = TableStore::new(config.clone());
|
||||
table_store.init().await?;
|
||||
self.inner.lock().table_store = Some(table_store.clone());
|
||||
inner.table_store = Some(table_store.clone());
|
||||
|
||||
// Set up crypto
|
||||
trace!("VeilidCore::internal_startup init crypto");
|
||||
let crypto = Crypto::new(config.clone(), table_store.clone());
|
||||
crypto.init().await?;
|
||||
self.inner.lock().crypto = Some(crypto.clone());
|
||||
inner.crypto = Some(crypto.clone());
|
||||
|
||||
// Set up attachment manager
|
||||
trace!("VeilidCore::internal_startup init attachment manager");
|
||||
@ -131,20 +137,30 @@ impl VeilidCore {
|
||||
},
|
||||
))
|
||||
.await?;
|
||||
self.inner.lock().attachment_manager = Some(attachment_manager.clone());
|
||||
inner.attachment_manager = Some(attachment_manager.clone());
|
||||
|
||||
// Set up the API
|
||||
trace!("VeilidCore::internal_startup init API");
|
||||
let this = self.clone();
|
||||
let veilid_api = VeilidAPI::new(attachment_manager, this);
|
||||
let veilid_api = VeilidAPI::new(this);
|
||||
inner.api = veilid_api.weak();
|
||||
|
||||
trace!("VeilidCore::internal_startup complete");
|
||||
|
||||
Ok(veilid_api)
|
||||
}
|
||||
|
||||
// called once at the beginning to start the node
|
||||
pub async fn startup(&self, setup: VeilidCoreSetup) -> Result<VeilidAPI, String> {
|
||||
// See if we have an API started up already
|
||||
let mut inner = self.inner.lock();
|
||||
if inner.api.upgrade().is_some() {
|
||||
// If so, return an error because we shouldn't try to do this more than once
|
||||
return Err("Veilid API is started".to_owned());
|
||||
}
|
||||
|
||||
// Ensure we never end up partially initialized
|
||||
match self.internal_startup(setup).await {
|
||||
match self.internal_startup(&mut *inner, setup).await {
|
||||
Ok(v) => Ok(v),
|
||||
Err(e) => {
|
||||
self.clone().internal_shutdown().await;
|
||||
@ -158,6 +174,9 @@ impl VeilidCore {
|
||||
let mut inner = self.inner.lock();
|
||||
trace!("VeilidCore::internal_shutdown starting");
|
||||
|
||||
// Detach the API object
|
||||
inner.api = VeilidAPIWeak::default();
|
||||
|
||||
// Shut down up attachment manager
|
||||
if let Some(attachment_manager) = &inner.attachment_manager {
|
||||
attachment_manager.terminate().await;
|
||||
|
@ -107,8 +107,10 @@ impl veilid_server::Server for VeilidServerImpl {
|
||||
// Send state update
|
||||
let veilid_api = self.veilid_api.clone();
|
||||
Promise::from_future(async move {
|
||||
veilid_api.send_state_update().await;
|
||||
Ok(())
|
||||
veilid_api
|
||||
.send_state_update()
|
||||
.await
|
||||
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))
|
||||
})
|
||||
}
|
||||
|
||||
@ -120,8 +122,10 @@ impl veilid_server::Server for VeilidServerImpl {
|
||||
trace!("VeilidServerImpl::attach");
|
||||
let veilid_api = self.veilid_api.clone();
|
||||
Promise::from_future(async move {
|
||||
veilid_api.attach().await;
|
||||
Ok(())
|
||||
veilid_api
|
||||
.attach()
|
||||
.await
|
||||
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))
|
||||
})
|
||||
}
|
||||
fn detach(
|
||||
@ -132,8 +136,10 @@ impl veilid_server::Server for VeilidServerImpl {
|
||||
trace!("VeilidServerImpl::detach");
|
||||
let veilid_api = self.veilid_api.clone();
|
||||
Promise::from_future(async move {
|
||||
veilid_api.detach().await;
|
||||
Ok(())
|
||||
veilid_api
|
||||
.detach()
|
||||
.await
|
||||
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))
|
||||
})
|
||||
}
|
||||
fn shutdown(
|
||||
|
@ -89,8 +89,8 @@ lazy_static! {
|
||||
}
|
||||
|
||||
pub fn shutdown() {
|
||||
let mut shutdown_switch_locked = SHUTDOWN_SWITCH.lock();
|
||||
if let Some(shutdown_switch) = shutdown_switch_locked.take() {
|
||||
let shutdown_switch = SHUTDOWN_SWITCH.lock().take();
|
||||
if let Some(shutdown_switch) = shutdown_switch {
|
||||
shutdown_switch.resolve(());
|
||||
}
|
||||
}
|
||||
@ -279,17 +279,22 @@ pub async fn main() -> Result<(), String> {
|
||||
// Handle state changes on main thread for capnproto rpc
|
||||
let capi2 = capi.clone();
|
||||
let capi_jh = async_std::task::spawn_local(async move {
|
||||
trace!("state change processing started");
|
||||
while let Ok(change) = receiver.recv().await {
|
||||
if let Some(c) = capi2.borrow_mut().as_mut().cloned() {
|
||||
c.handle_state_change(change);
|
||||
}
|
||||
}
|
||||
trace!("state change processing stopped");
|
||||
});
|
||||
|
||||
// Auto-attach if desired
|
||||
if auto_attach {
|
||||
info!("Auto-attach to the Veilid network");
|
||||
veilid_api.attach().await;
|
||||
if let Err(e) = veilid_api.attach().await {
|
||||
error!("Auto-attaching to the Veilid network failed: {:?}", e);
|
||||
shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
// Idle while waiting to exit
|
||||
|
Loading…
x
Reference in New Issue
Block a user