network debugging

This commit is contained in:
John Smith 2022-03-08 22:32:12 -05:00
parent 98799b4d3a
commit 64ea00f8cc
21 changed files with 891 additions and 345 deletions

7
Cargo.lock generated
View File

@ -1849,6 +1849,12 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "json"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd"
[[package]]
name = "keccak-hash"
version = "0.8.0"
@ -3898,6 +3904,7 @@ dependencies = [
"jni",
"jni-sys",
"js-sys",
"json",
"keyring-manager",
"keyvaluedb-sqlite",
"keyvaluedb-web",

View File

@ -21,7 +21,6 @@ log = "^0"
cfg-if = "^1"
anyhow = "^1"
thiserror = "^1"
hex = "^0"
generic-array = "^0"
secrecy = "^0"
@ -33,6 +32,7 @@ parking_lot = "^0"
lazy_static = "^1"
directories = "^4"
once_cell = "^1"
json = "^0"
ed25519-dalek = { version = "^1", default_features = false, features = ["alloc", "u64_backend"] }
x25519-dalek = { package = "x25519-dalek-ng", version = "^1", default_features = false, features = ["u64_backend"] }

View File

@ -8,7 +8,7 @@ use once_cell::sync::OnceCell;
struct ApiLoggerInner {
level: LevelFilter,
filter_ignore: Cow<'static, [Cow<'static, str>]>,
_join_handle: JoinHandle<()>,
join_handle: Option<JoinHandle<()>>,
tx: async_channel::Sender<(VeilidLogLevel, String)>,
}
@ -22,7 +22,7 @@ static API_LOGGER: OnceCell<ApiLogger> = OnceCell::new();
impl ApiLogger {
fn new_inner(level: LevelFilter, update_callback: UpdateCallback) -> ApiLoggerInner {
let (tx, rx) = async_channel::unbounded::<(VeilidLogLevel, String)>();
let _join_handle: JoinHandle<()> = spawn(async move {
let join_handle: Option<JoinHandle<()>> = Some(spawn(async move {
while let Ok(v) = rx.recv().await {
(update_callback)(VeilidUpdate::Log {
log_level: v.0,
@ -30,16 +30,16 @@ impl ApiLogger {
})
.await;
}
});
}));
ApiLoggerInner {
level,
filter_ignore: Default::default(),
_join_handle,
join_handle,
tx,
}
}
pub fn init(log_level: LevelFilter, update_callback: UpdateCallback) {
pub async fn init(log_level: LevelFilter, update_callback: UpdateCallback) {
set_max_level(log_level);
let api_logger = API_LOGGER.get_or_init(|| {
let api_logger = ApiLogger {
@ -48,15 +48,28 @@ impl ApiLogger {
set_boxed_logger(Box::new(api_logger.clone())).expect("failed to set api logger");
api_logger
});
let mut inner = api_logger.inner.lock();
*inner = Some(Self::new_inner(log_level, update_callback));
let apilogger_inner = Some(Self::new_inner(log_level, update_callback));
*api_logger.inner.lock() = apilogger_inner;
}
pub fn terminate() {
pub async fn terminate() {
if let Some(api_logger) = API_LOGGER.get() {
let mut inner = api_logger.inner.lock();
*inner = None;
let mut join_handle = None;
{
let mut inner = api_logger.inner.lock();
// Terminate channel
if let Some(inner) = (*inner).as_mut() {
inner.tx.close();
join_handle = inner.join_handle.take();
}
*inner = None;
}
if let Some(jh) = join_handle {
jh.await;
}
// Clear everything and we're done
set_max_level(LevelFilter::Off);
}
}

View File

@ -263,6 +263,7 @@ impl AttachmentManager {
&self,
state_change_callback: StateChangeCallback<Attachment>,
) -> Result<(), String> {
trace!("init");
let network_manager = {
let inner = self.inner.lock();
inner
@ -311,84 +312,39 @@ impl AttachmentManager {
}
}
async fn process_input(&self, input: &AttachmentInput) -> bool {
async fn process_input(&self, input: &AttachmentInput) -> Result<(), String> {
let attachment_machine = self.inner.lock().attachment_machine.clone();
let output = attachment_machine.consume(input).await;
match output {
Err(_) => {
error!("invalid input for state machine: {:?}", input);
false
}
Err(e) => Err(format!(
"invalid input '{:?}' for state machine in state '{:?}': {:?}",
input,
attachment_machine.state(),
e
)),
Ok(v) => {
if let Some(o) = v {
self.handle_output(&o).await;
}
true
Ok(())
}
}
}
pub async fn request_attach(&self) {
if !self.is_detached() {
trace!("attach request ignored");
return;
}
if self.process_input(&AttachmentInput::AttachRequested).await {
trace!("attach requested");
} else {
error!("attach request failed");
}
pub async fn request_attach(&self) -> Result<(), String> {
self.process_input(&AttachmentInput::AttachRequested)
.await
.map_err(|e| format!("Attach request failed: {}", e))
}
pub async fn request_detach(&self) {
if !self.is_attached() {
trace!("detach request ignored");
return;
}
if self.process_input(&AttachmentInput::DetachRequested).await {
trace!("detach requested");
} else {
error!("detach request failed");
}
pub async fn request_detach(&self) -> Result<(), String> {
self.process_input(&AttachmentInput::DetachRequested)
.await
.map_err(|e| format!("Attach request failed: {}", e))
}
pub fn get_state(&self) -> AttachmentState {
let attachment_machine = self.inner.lock().attachment_machine.clone();
attachment_machine.state()
}
// pub async fn wait_for_state(&self, state: AttachmentState, timeout_ms: Option<u32>) -> bool {
// let start_time = intf::get_timestamp();
// loop {
// let (current_state, eventual) = self
// .inner
// .lock()
// .attachment_machine
// .state_eventual_instance();
// if current_state == state {
// break;
// }
// if let Some(timeout_ms) = timeout_ms {
// let timeout_time = start_time + (timeout_ms as u64 * 1000);
// let cur_time = intf::get_timestamp();
// if timeout_time > cur_time {
// let timeout_dur_ms = ((timeout_time - cur_time) / 1000) as u32;
// if match intf::timeout(timeout_dur_ms, eventual).await {
// Ok(v) => v,
// Err(_) => return false,
// } == state
// {
// return true;
// }
// } else {
// return false;
// }
// } else if eventual.await == state {
// break;
// }
// }
// true
// }
}

View File

@ -69,10 +69,10 @@ where
// self.inner.lock().callback = None;
// }
pub fn state_eventual_instance(&self) -> (T::State, EventualValueCloneFuture<T::State>) {
let inner = self.inner.lock();
(inner.state, inner.eventual.instance())
}
// pub fn state_eventual_instance(&self) -> (T::State, EventualValueCloneFuture<T::State>) {
// let inner = self.inner.lock();
// (inner.state, inner.eventual.instance())
// }
pub async fn consume(&self, input: &T::Input) -> Result<Option<T::Output>, ()> {
let current_state = self.inner.lock().state;

View File

@ -14,14 +14,175 @@ cfg_if! {
}
}
struct ServicesContext {
pub config: VeilidConfig,
pub update_callback: UpdateCallback,
pub protected_store: Option<ProtectedStore>,
pub table_store: Option<TableStore>,
pub block_store: Option<BlockStore>,
pub crypto: Option<Crypto>,
pub attachment_manager: Option<AttachmentManager>,
}
impl ServicesContext {
pub fn new_empty(config: VeilidConfig, update_callback: UpdateCallback) -> Self {
Self {
config,
update_callback,
protected_store: None,
table_store: None,
block_store: None,
crypto: None,
attachment_manager: None,
}
}
pub fn new_full(
config: VeilidConfig,
update_callback: UpdateCallback,
protected_store: ProtectedStore,
table_store: TableStore,
block_store: BlockStore,
crypto: Crypto,
attachment_manager: AttachmentManager,
) -> Self {
Self {
config,
update_callback,
protected_store: Some(protected_store),
table_store: Some(table_store),
block_store: Some(block_store),
crypto: Some(crypto),
attachment_manager: Some(attachment_manager),
}
}
pub async fn startup(&mut self) -> Result<(), VeilidAPIError> {
let api_log_level: VeilidConfigLogLevel = self.config.get().api_log_level;
if api_log_level != VeilidConfigLogLevel::Off {
ApiLogger::init(
api_log_level.to_level_filter(),
self.update_callback.clone(),
)
.await;
for ig in crate::DEFAULT_LOG_IGNORE_LIST {
ApiLogger::add_filter_ignore_str(ig);
}
info!("Veilid API logging initialized");
}
trace!("startup starting");
// Set up protected store
trace!("init protected store");
let protected_store = ProtectedStore::new(self.config.clone());
if let Err(e) = protected_store.init().await {
self.shutdown().await;
return Err(VeilidAPIError::Internal { message: e });
}
self.protected_store = Some(protected_store.clone());
// Init node id from config now that protected store is set up
if let Err(e) = self.config.init_node_id(protected_store.clone()).await {
self.shutdown().await;
return Err(VeilidAPIError::Internal { message: e });
}
// Set up tablestore
trace!("init table store");
let table_store = TableStore::new(self.config.clone());
if let Err(e) = table_store.init().await {
self.shutdown().await;
return Err(VeilidAPIError::Internal { message: e });
}
self.table_store = Some(table_store.clone());
// Set up crypto
trace!("init crypto");
let crypto = Crypto::new(self.config.clone(), table_store.clone());
if let Err(e) = crypto.init().await {
self.shutdown().await;
return Err(VeilidAPIError::Internal { message: e });
}
self.crypto = Some(crypto.clone());
// Set up block store
trace!("init block store");
let block_store = BlockStore::new(self.config.clone());
if let Err(e) = block_store.init().await {
self.shutdown().await;
return Err(VeilidAPIError::Internal { message: e });
}
self.block_store = Some(block_store.clone());
// Set up attachment manager
trace!("init attachment manager");
let update_callback_move = self.update_callback.clone();
let attachment_manager = AttachmentManager::new(self.config.clone(), table_store, crypto);
if let Err(e) = attachment_manager
.init(Arc::new(
move |_old_state: AttachmentState, new_state: AttachmentState| {
update_callback_move(VeilidUpdate::Attachment { state: new_state })
},
))
.await
{
self.shutdown().await;
return Err(VeilidAPIError::Internal { message: e });
}
self.attachment_manager = Some(attachment_manager);
trace!("startup complete");
Ok(())
}
pub async fn shutdown(&mut self) {
trace!("shutdown starting");
if let Some(attachment_manager) = &mut self.attachment_manager {
trace!("terminate attachment manager");
attachment_manager.terminate().await;
}
if let Some(block_store) = &mut self.block_store {
trace!("terminate block store");
block_store.terminate().await;
}
if let Some(crypto) = &mut self.crypto {
trace!("terminate crypto");
crypto.terminate().await;
}
if let Some(table_store) = &mut self.table_store {
trace!("terminate table store");
table_store.terminate().await;
}
if let Some(protected_store) = &mut self.protected_store {
trace!("terminate protected store");
protected_store.terminate().await;
}
trace!("shutdown complete");
// api logger terminate is idempotent
ApiLogger::terminate().await;
// send final shutdown update
(self.update_callback)(VeilidUpdate::Shutdown).await;
}
}
/////////////////////////////////////////////////////////////////////////////
///
pub struct VeilidCoreContext {
pub config: VeilidConfig,
pub update_callback: UpdateCallback,
// Services
pub protected_store: ProtectedStore,
pub table_store: TableStore,
pub block_store: BlockStore,
pub crypto: Crypto,
pub attachment_manager: AttachmentManager,
pub update_callback: UpdateCallback,
}
impl VeilidCoreContext {
@ -30,9 +191,9 @@ impl VeilidCoreContext {
config_callback: ConfigCallback,
) -> Result<VeilidCoreContext, VeilidAPIError> {
// Set up config from callback
trace!("VeilidCoreContext::new_with_config_callback init config");
trace!("setup config with callback");
let mut config = VeilidConfig::new();
if let Err(e) = config.init(config_callback).await {
if let Err(e) = config.setup(config_callback) {
return Err(VeilidAPIError::Internal { message: e });
}
@ -44,12 +205,11 @@ impl VeilidCoreContext {
config_json: String,
) -> Result<VeilidCoreContext, VeilidAPIError> {
// Set up config from callback
trace!("VeilidCoreContext::new_with_config_json init config");
trace!("setup config with json");
let mut config = VeilidConfig::new();
if let Err(e) = config.init_from_json(config_json).await {
if let Err(e) = config.setup_from_json(config_json) {
return Err(VeilidAPIError::Internal { message: e });
}
Self::new_common(update_callback, config).await
}
@ -67,114 +227,31 @@ impl VeilidCoreContext {
}
}
// Start up api logging
let api_log_level: VeilidConfigLogLevel = config.get().api_log_level;
if api_log_level != VeilidConfigLogLevel::Off {
ApiLogger::init(api_log_level.to_level_filter(), update_callback.clone());
for ig in crate::DEFAULT_LOG_IGNORE_LIST {
ApiLogger::add_filter_ignore_str(ig);
}
info!("Veilid API logging initialized");
}
// Set up protected store
trace!("VeilidCoreContext::new init protected store");
let protected_store = ProtectedStore::new(config.clone());
if let Err(e) = protected_store.init().await {
config.terminate().await;
ApiLogger::terminate();
return Err(VeilidAPIError::Internal { message: e });
}
// Init node id from config now that protected store is set up
if let Err(e) = config.init_node_id(protected_store.clone()).await {
protected_store.terminate().await;
config.terminate().await;
ApiLogger::terminate();
return Err(VeilidAPIError::Internal { message: e });
}
// Set up tablestore
trace!("VeilidCoreContext::new init table store");
let table_store = TableStore::new(config.clone());
if let Err(e) = table_store.init().await {
protected_store.terminate().await;
config.terminate().await;
ApiLogger::terminate();
return Err(VeilidAPIError::Internal { message: e });
}
// Set up crypto
trace!("VeilidCoreContext::new init crypto");
let crypto = Crypto::new(config.clone(), table_store.clone());
if let Err(e) = crypto.init().await {
table_store.terminate().await;
protected_store.terminate().await;
config.terminate().await;
ApiLogger::terminate();
return Err(VeilidAPIError::Internal { message: e });
}
// Set up block store
trace!("VeilidCoreContext::new init block store");
let block_store = BlockStore::new(config.clone());
if let Err(e) = block_store.init().await {
crypto.terminate().await;
table_store.terminate().await;
protected_store.terminate().await;
config.terminate().await;
ApiLogger::terminate();
return Err(VeilidAPIError::Internal { message: e });
}
// Set up attachment manager
trace!("VeilidCoreContext::new init attachment manager");
let update_callback_move = update_callback.clone();
let attachment_manager =
AttachmentManager::new(config.clone(), table_store.clone(), crypto.clone());
if let Err(e) = attachment_manager
.init(Arc::new(
move |_old_state: AttachmentState, new_state: AttachmentState| {
update_callback_move(VeilidUpdate::Attachment { state: new_state })
},
))
.await
{
block_store.terminate().await;
crypto.terminate().await;
table_store.terminate().await;
protected_store.terminate().await;
config.terminate().await;
ApiLogger::terminate();
return Err(VeilidAPIError::Internal { message: e });
}
let mut sc = ServicesContext::new_empty(config.clone(), update_callback);
sc.startup().await?;
Ok(VeilidCoreContext {
config,
protected_store,
table_store,
block_store,
crypto,
attachment_manager,
update_callback,
update_callback: sc.update_callback,
config: sc.config,
protected_store: sc.protected_store.unwrap(),
table_store: sc.table_store.unwrap(),
block_store: sc.block_store.unwrap(),
crypto: sc.crypto.unwrap(),
attachment_manager: sc.attachment_manager.unwrap(),
})
}
async fn shutdown(self) {
trace!("VeilidCoreContext::terminate_core_context starting");
self.attachment_manager.terminate().await;
self.block_store.terminate().await;
self.crypto.terminate().await;
self.table_store.terminate().await;
self.protected_store.terminate().await;
self.config.terminate().await;
// send final shutdown update
(self.update_callback)(VeilidUpdate::Shutdown).await;
trace!("VeilidCoreContext::shutdown complete");
ApiLogger::terminate();
let mut sc = ServicesContext::new_full(
self.config.clone(),
self.update_callback.clone(),
self.protected_store,
self.table_store,
self.block_store,
self.crypto,
self.attachment_manager,
);
sc.shutdown().await;
}
}
@ -225,7 +302,7 @@ pub async fn api_startup_json(
Ok(veilid_api)
}
pub async fn api_shutdown(context: VeilidCoreContext) {
pub(crate) async fn api_shutdown(context: VeilidCoreContext) {
let mut initialized_lock = INITIALIZED.lock().await;
context.shutdown().await;
*initialized_lock = false;

View File

@ -145,7 +145,6 @@ impl Crypto {
let db = table_store.open("crypto_caches", 1).await?;
db.store(0, b"dh_cache", &cache_bytes).await?;
Ok(())
}

View File

@ -52,10 +52,12 @@ struct NetworkInner {
wss_port: u16,
interfaces: NetworkInterfaces,
// udp
bound_first_udp: BTreeMap<u16, (socket2::Socket, socket2::Socket)>,
inbound_udp_protocol_handlers: BTreeMap<SocketAddr, RawUdpProtocolHandler>,
outbound_udpv4_protocol_handler: Option<RawUdpProtocolHandler>,
outbound_udpv6_protocol_handler: Option<RawUdpProtocolHandler>,
//tcp
bound_first_tcp: BTreeMap<u16, (socket2::Socket, socket2::Socket)>,
tls_acceptor: Option<TlsAcceptor>,
listener_states: BTreeMap<SocketAddr, Arc<RwLock<ListenerState>>>,
}
@ -91,9 +93,11 @@ impl Network {
ws_port: 0u16,
wss_port: 0u16,
interfaces: NetworkInterfaces::new(),
bound_first_udp: BTreeMap::new(),
inbound_udp_protocol_handlers: BTreeMap::new(),
outbound_udpv4_protocol_handler: None,
outbound_udpv6_protocol_handler: None,
bound_first_tcp: BTreeMap::new(),
tls_acceptor: None,
listener_states: BTreeMap::new(),
}
@ -420,6 +424,10 @@ impl Network {
if protocol_config.tcp_listen {
self.start_tcp_listeners().await?;
}
// release caches of available listener ports
// this releases the 'first bound' ports we use to guarantee
// that we have ports available to us
self.free_bound_first_ports();
info!("network started");
self.inner.lock().network_started = true;

View File

@ -89,15 +89,17 @@ impl ProtocolNetworkConnection {
pub fn new_unbound_shared_udp_socket(domain: Domain) -> Result<socket2::Socket, String> {
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))
.map_err(|e| format!("Couldn't create UDP socket: {}", e))?;
if let Err(e) = socket.set_reuse_address(true) {
log_net!(error "Couldn't set reuse address: {}", e);
if domain == Domain::IPV6 {
socket
.set_only_v6(true)
.map_err(|e| format!("Couldn't set IPV6_V6ONLY: {}", e))?;
}
socket
.set_reuse_address(true)
.map_err(|e| format!("Couldn't set reuse address: {}", e))?;
cfg_if! {
if #[cfg(unix)] {
if let Err(e) = socket.set_reuse_port(true) {
log_net!(error "Couldn't set reuse port: {}", e);
}
socket.set_reuse_port(true).map_err(|e| format!("Couldn't set reuse port: {}", e))?;
}
}
Ok(socket)
@ -116,6 +118,36 @@ pub fn new_bound_shared_udp_socket(local_address: SocketAddr) -> Result<socket2:
Ok(socket)
}
pub fn new_bound_first_udp_socket(local_address: SocketAddr) -> Result<socket2::Socket, String> {
let domain = Domain::for_address(local_address);
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))
.map_err(|e| format!("Couldn't create UDP socket: {}", e))?;
if domain == Domain::IPV6 {
socket
.set_only_v6(true)
.map_err(|e| format!("Couldn't set IPV6_V6ONLY: {}", e))?;
}
// Bind the socket -first- before turning on 'reuse address' this way it will
// fail if the port is already taken
let socket2_addr = socket2::SockAddr::from(local_address);
socket
.bind(&socket2_addr)
.map_err(|e| format!("failed to bind UDP socket: {}", e))?;
// Set 'reuse address' so future binds to this port will succeed
socket
.set_reuse_address(true)
.map_err(|e| format!("Couldn't set reuse address: {}", e))?;
cfg_if! {
if #[cfg(unix)] {
socket.set_reuse_port(true).map_err(|e| format!("Couldn't set reuse port: {}", e))?;
}
}
log_net!("created shared udp socket on {:?}", &local_address);
Ok(socket)
}
pub fn new_unbound_shared_tcp_socket(domain: Domain) -> Result<socket2::Socket, String> {
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
.map_err(map_to_string)
@ -126,14 +158,17 @@ pub fn new_unbound_shared_tcp_socket(domain: Domain) -> Result<socket2::Socket,
if let Err(e) = socket.set_nodelay(true) {
log_net!(error "Couldn't set TCP nodelay: {}", e);
}
if let Err(e) = socket.set_reuse_address(true) {
log_net!(error "Couldn't set reuse address: {}", e);
if domain == Domain::IPV6 {
socket
.set_only_v6(true)
.map_err(|e| format!("Couldn't set IPV6_V6ONLY: {}", e))?;
}
socket
.set_reuse_address(true)
.map_err(|e| format!("Couldn't set reuse address: {}", e))?;
cfg_if! {
if #[cfg(unix)] {
if let Err(e) = socket.set_reuse_port(true) {
log_net!(error "Couldn't set reuse port: {}", e);
}
socket.set_reuse_port(true).map_err(|e| format!("Couldn't set reuse port: {}", e))?;
}
}
Ok(socket)
@ -151,3 +186,40 @@ pub fn new_bound_shared_tcp_socket(local_address: SocketAddr) -> Result<socket2:
Ok(socket)
}
pub fn new_bound_first_tcp_socket(local_address: SocketAddr) -> Result<socket2::Socket, String> {
let domain = Domain::for_address(local_address);
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
.map_err(map_to_string)
.map_err(logthru_net!("failed to create TCP socket"))?;
if let Err(e) = socket.set_linger(None) {
log_net!(error "Couldn't set TCP linger: {}", e);
}
if let Err(e) = socket.set_nodelay(true) {
log_net!(error "Couldn't set TCP nodelay: {}", e);
}
if domain == Domain::IPV6 {
socket
.set_only_v6(true)
.map_err(|e| format!("Couldn't set IPV6_V6ONLY: {}", e))?;
}
// Bind the socket -first- before turning on 'reuse address' this way it will
// fail if the port is already taken
let socket2_addr = socket2::SockAddr::from(local_address);
socket
.bind(&socket2_addr)
.map_err(|e| format!("failed to bind TCP socket: {}", e))?;
// Set 'reuse address' so future binds to this port will succeed
socket
.set_reuse_address(true)
.map_err(|e| format!("Couldn't set reuse address: {}", e))?;
cfg_if! {
if #[cfg(unix)] {
socket.set_reuse_port(true).map_err(|e| format!("Couldn't set reuse port: {}", e))?;
}
}
Ok(socket)
}

View File

@ -29,7 +29,6 @@ impl RawUdpProtocolHandler {
remote_addr
);
// Process envelope
let peer_addr = PeerAddress::new(
SocketAddress::from_socket_addr(remote_addr),
ProtocolType::UDP,

View File

@ -1,11 +1,148 @@
use super::*;
impl Network {
pub(super) async fn start_udp_listeners(&self) -> Result<(), String> {
// First, create outbound sockets and we'll listen on them too
self.create_udp_outbound_sockets().await?;
/////////////////////////////////////////////////////
// Support for binding first on ports to ensure nobody binds ahead of us
// or two copies of the app don't accidentally collide. This is tricky
// because we use 'reuseaddr/port' and we can accidentally bind in front of ourselves :P
// Now create udp inbound sockets for whatever interfaces we're listening on
fn bind_first_udp_port(&self, udp_port: u16) -> bool {
let mut inner = self.inner.lock();
if inner.bound_first_udp.contains_key(&udp_port) {
return true;
}
// If the address is specified, only use the specified port and fail otherwise
let mut bound_first_socket_v4 = None;
let mut bound_first_socket_v6 = None;
if let Ok(bfs4) =
new_bound_first_udp_socket(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), udp_port))
{
if let Ok(bfs6) = new_bound_first_udp_socket(SocketAddr::new(
IpAddr::V6(Ipv6Addr::UNSPECIFIED),
udp_port,
)) {
bound_first_socket_v4 = Some(bfs4);
bound_first_socket_v6 = Some(bfs6);
}
}
if let (Some(bfs4), Some(bfs6)) = (bound_first_socket_v4, bound_first_socket_v6) {
inner.bound_first_udp.insert(udp_port, (bfs4, bfs6));
true
} else {
false
}
}
fn bind_first_tcp_port(&self, tcp_port: u16) -> bool {
let mut inner = self.inner.lock();
if inner.bound_first_tcp.contains_key(&tcp_port) {
return true;
}
// If the address is specified, only use the specified port and fail otherwise
let mut bound_first_socket_v4 = None;
let mut bound_first_socket_v6 = None;
if let Ok(bfs4) =
new_bound_first_tcp_socket(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), tcp_port))
{
if let Ok(bfs6) = new_bound_first_tcp_socket(SocketAddr::new(
IpAddr::V6(Ipv6Addr::UNSPECIFIED),
tcp_port,
)) {
bound_first_socket_v4 = Some(bfs4);
bound_first_socket_v6 = Some(bfs6);
}
}
if let (Some(bfs4), Some(bfs6)) = (bound_first_socket_v4, bound_first_socket_v6) {
inner.bound_first_tcp.insert(tcp_port, (bfs4, bfs6));
true
} else {
false
}
}
pub(super) fn free_bound_first_ports(&self) {
let mut inner = self.inner.lock();
inner.bound_first_udp.clear();
inner.bound_first_tcp.clear();
}
/////////////////////////////////////////////////////
fn find_available_udp_port(&self) -> Result<u16, String> {
// If the address is empty, iterate ports until we find one we can use.
let mut udp_port = 5150u16;
loop {
if self.bind_first_udp_port(udp_port) {
break;
}
if udp_port == 65535 {
return Err("Could not find free udp port to listen on".to_owned());
}
udp_port += 1;
}
Ok(udp_port)
}
fn find_available_tcp_port(&self) -> Result<u16, String> {
// If the address is empty, iterate ports until we find one we can use.
let mut tcp_port = 5150u16;
loop {
if self.bind_first_tcp_port(tcp_port) {
break;
}
if tcp_port == 65535 {
return Err("Could not find free tcp port to listen on".to_owned());
}
tcp_port += 1;
}
Ok(tcp_port)
}
async fn allocate_udp_port(&self, listen_address: String) -> Result<u16, String> {
if listen_address.is_empty() {
// If listen address is empty, find us a port iteratively
self.find_available_udp_port()
} else if let Some(sa) = listen_address
.to_socket_addrs()
.await
.map_err(|e| format!("Unable to resolve address: {}\n{}", listen_address, e))?
.next()
{
// If the address is specified, only use the specified port and fail otherwise
if self.bind_first_udp_port(sa.port()) {
Ok(sa.port())
} else {
Err("Could not find free udp port to listen on".to_owned())
}
} else {
Err(format!("No valid listen address: {}", listen_address))
}
}
async fn allocate_tcp_port(&self, listen_address: String) -> Result<u16, String> {
if listen_address.is_empty() {
// If listen address is empty, find us a port iteratively
self.find_available_tcp_port()
} else if let Some(sa) = listen_address
.to_socket_addrs()
.await
.map_err(|e| format!("Unable to resolve address: {}\n{}", listen_address, e))?
.next()
{
// If the address is specified, only use the specified port and fail otherwise
if self.bind_first_tcp_port(sa.port()) {
Ok(sa.port())
} else {
Err("Could not find free tcp port to listen on".to_owned())
}
} else {
Err(format!("No valid listen address: {}", listen_address))
}
}
/////////////////////////////////////////////////////
pub(super) async fn start_udp_listeners(&self) -> Result<(), String> {
let routing_table = self.routing_table();
let (listen_address, public_address) = {
let c = self.config.get();
@ -14,15 +151,27 @@ impl Network {
c.network.protocol.udp.public_address.clone(),
)
};
// Pick out UDP port we're going to use everywhere
// Keep sockets around until the end of this function
// to keep anyone else from binding in front of us
let udp_port = self.allocate_udp_port(listen_address.clone()).await?;
// Save the bound udp port for use later on
self.inner.lock().udp_port = udp_port;
// First, create outbound sockets
// (unlike tcp where we create sockets for every connection)
// and we'll add protocol handlers for them too
self.create_udp_outbound_sockets().await?;
// Now create udp inbound sockets for whatever interfaces we're listening on
info!("UDP: starting listener at {:?}", listen_address);
let dial_infos = self
.create_udp_inbound_sockets(listen_address.clone())
.await?;
let mut static_public = false;
for di in &dial_infos {
// Pick out UDP port for outbound connections (they will all be the same)
self.inner.lock().udp_port = di.port();
// Register local dial info only here if we specify a public address
if public_address.is_none() && di.is_global() {
// Register global dial info if no public address is specified
@ -73,6 +222,15 @@ impl Network {
c.network.protocol.ws.path.clone(),
)
};
// Pick out TCP port we're going to use everywhere
// Keep sockets around until the end of this function
// to keep anyone else from binding in front of us
let ws_port = self.allocate_tcp_port(listen_address.clone()).await?;
// Save the bound ws port for use later on
self.inner.lock().ws_port = ws_port;
trace!("WS: starting listener at {:?}", listen_address);
let socket_addresses = self
.start_tcp_listener(
@ -85,9 +243,6 @@ impl Network {
let mut static_public = false;
for socket_address in socket_addresses {
// Pick out WS port for outbound connections (they will all be the same)
self.inner.lock().ws_port = socket_address.port();
if url.is_none() && socket_address.address().is_global() {
// Build global dial info request url
let global_url = format!("ws://{}/{}", socket_address, path);
@ -155,8 +310,17 @@ impl Network {
c.network.protocol.wss.url.clone(),
)
};
// Pick out TCP port we're going to use everywhere
// Keep sockets around until the end of this function
// to keep anyone else from binding in front of us
let wss_port = self.allocate_tcp_port(listen_address.clone()).await?;
// Save the bound wss port for use later on
self.inner.lock().wss_port = wss_port;
trace!("WSS: starting listener at {}", listen_address);
let socket_addresses = self
let _socket_addresses = self
.start_tcp_listener(
listen_address.clone(),
true,
@ -169,11 +333,6 @@ impl Network {
// If the hostname is specified, it is the public dialinfo via the URL. If no hostname
// is specified, then TLS won't validate, so no local dialinfo is possible.
// This is not the case with unencrypted websockets, which can be specified solely by an IP address
//
if let Some(socket_address) = socket_addresses.first() {
// Pick out WSS port for outbound connections (they will all be the same)
self.inner.lock().wss_port = socket_address.port();
}
// Add static public dialinfo if it's configured
if let Some(url) = url.as_ref() {
@ -217,6 +376,15 @@ impl Network {
c.network.protocol.tcp.public_address.clone(),
)
};
// Pick out TCP port we're going to use everywhere
// Keep sockets around until the end of this function
// to keep anyone else from binding in front of us
let tcp_port = self.allocate_tcp_port(listen_address.clone()).await?;
// Save the bound tcp port for use later on
self.inner.lock().tcp_port = tcp_port;
trace!("TCP: starting listener at {}", &listen_address);
let socket_addresses = self
.start_tcp_listener(
@ -229,9 +397,6 @@ impl Network {
let mut static_public = false;
for socket_address in socket_addresses {
// Pick out TCP port for outbound connections (they will all be the same)
self.inner.lock().tcp_port = socket_address.port();
let di = DialInfo::tcp(socket_address);
// Register local dial info only here if we specify a public address

View File

@ -289,6 +289,23 @@ impl RoutingTable {
*self.inner.lock() = Self::new_inner(self.network_manager());
}
// Attempt to empty the routing table
// should only be performed when there are no node_refs (detached)
pub fn purge(&self) {
let mut inner = self.inner.lock();
log_rtab!(
"Starting routing table purge. Table currently has {} nodes",
inner.bucket_entry_count
);
for bucket in &mut inner.buckets {
bucket.kick(0);
}
log_rtab!(
"Routing table purge complete. Routing table now has {} nodes",
inner.bucket_entry_count
);
}
// Attempt to settle buckets and remove entries down to the desired number
// which may not be possible due extant NodeRefs
fn kick_bucket(inner: &mut RoutingTableInner, idx: usize) {

View File

@ -195,7 +195,7 @@ fn config_callback(key: String) -> ConfigCallbackReturn {
"network.connection_initial_timeout_ms" => Ok(Box::new(2_000u32)),
"network.node_id" => Ok(Box::new(dht::key::DHTKey::default())),
"network.node_id_secret" => Ok(Box::new(dht::key::DHTKeySecret::default())),
"network.bootstrap" => Ok(Box::new(vec![String::from("asdf"), String::from("qwer")])),
"network.bootstrap" => Ok(Box::new(Vec::<String>::new())),
"network.rpc.concurrency" => Ok(Box::new(2u32)),
"network.rpc.queue_size" => Ok(Box::new(128u32)),
"network.rpc.max_timestamp_behind_ms" => Ok(Box::new(Some(10_000u32))),
@ -265,7 +265,7 @@ fn config_callback(key: String) -> ConfigCallbackReturn {
pub async fn test_config() {
let mut vc = VeilidConfig::new();
match vc.init(Arc::new(config_callback)).await {
match vc.setup(Arc::new(config_callback)) {
Ok(()) => (),
Err(e) => {
error!("Error: {}", e);
@ -299,10 +299,7 @@ pub async fn test_config() {
assert_eq!(inner.network.connection_initial_timeout_ms, 2_000u32);
assert!(!inner.network.node_id.valid);
assert!(!inner.network.node_id_secret.valid);
assert_eq!(
inner.network.bootstrap,
vec![String::from("asdf"), String::from("qwer")]
);
assert_eq!(inner.network.bootstrap, Vec::<String>::new());
assert_eq!(inner.network.rpc.concurrency, 2u32);
assert_eq!(inner.network.rpc.queue_size, 128u32);
assert_eq!(inner.network.rpc.timeout_ms, 10_000u32);

View File

@ -39,7 +39,7 @@ pub async fn test_attach_detach() {
let api = api_startup(update_callback, config_callback)
.await
.expect("startup failed");
api.detach().await.unwrap();
assert!(api.detach().await.is_err());
api.shutdown().await;
}

View File

@ -63,11 +63,12 @@ fn get_debug_argument_at<T, G: FnOnce(&str) -> Option<T>>(
}
impl VeilidAPI {
async fn debug_buckets(&self, debug_args: &[String]) -> Result<String, VeilidAPIError> {
async fn debug_buckets(&self, args: String) -> Result<String, VeilidAPIError> {
let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect();
let mut min_state = BucketEntryState::Unreliable;
if debug_args.len() == 1 {
if args.len() == 1 {
min_state = get_debug_argument(
&debug_args[0],
&args[0],
"debug_buckets",
"min_state",
get_bucket_entry_state,
@ -79,26 +80,28 @@ impl VeilidAPI {
Ok(routing_table.debug_info_buckets(min_state))
}
async fn debug_dialinfo(&self, _debug_args: &[String]) -> Result<String, VeilidAPIError> {
async fn debug_dialinfo(&self, _args: String) -> Result<String, VeilidAPIError> {
// Dump routing table dialinfo
let rpc = self.rpc_processor()?;
let routing_table = rpc.routing_table();
Ok(routing_table.debug_info_dialinfo())
}
async fn debug_entries(&self, debug_args: &[String]) -> Result<String, VeilidAPIError> {
async fn debug_entries(&self, args: String) -> Result<String, VeilidAPIError> {
let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect();
let mut min_state = BucketEntryState::Unreliable;
let mut limit = 20;
for arg in debug_args {
if let Some(ms) = get_bucket_entry_state(arg) {
for arg in args {
if let Some(ms) = get_bucket_entry_state(&arg) {
min_state = ms;
} else if let Some(lim) = get_number(arg) {
} else if let Some(lim) = get_number(&arg) {
limit = lim;
} else {
return Err(VeilidAPIError::InvalidArgument {
context: "debug_entries".to_owned(),
argument: "unknown".to_owned(),
value: arg.clone(),
value: arg,
});
}
}
@ -109,8 +112,10 @@ impl VeilidAPI {
Ok(routing_table.debug_info_entries(limit, min_state))
}
async fn debug_entry(&self, debug_args: &[String]) -> Result<String, VeilidAPIError> {
let node_id = get_debug_argument_at(debug_args, 0, "debug_entry", "node_id", get_dht_key)?;
async fn debug_entry(&self, args: String) -> Result<String, VeilidAPIError> {
let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect();
let node_id = get_debug_argument_at(&args, 0, "debug_entry", "node_id", get_dht_key)?;
// Dump routing table entry
let rpc = self.rpc_processor()?;
@ -118,41 +123,149 @@ impl VeilidAPI {
Ok(routing_table.debug_info_entry(node_id))
}
async fn debug_nodeinfo(&self, _debug_args: &[String]) -> Result<String, VeilidAPIError> {
async fn debug_nodeinfo(&self, _args: String) -> Result<String, VeilidAPIError> {
// Dump routing table entry
let rpc = self.rpc_processor()?;
let routing_table = rpc.routing_table();
Ok(routing_table.debug_info_nodeinfo())
}
pub async fn debug(&self, what: String) -> Result<String, VeilidAPIError> {
trace!("VeilidCore::debug");
let debug_args: Vec<String> = what
.split_ascii_whitespace()
.map(|s| s.to_owned())
.collect();
if debug_args.is_empty() {
return Ok(r#">>> Debug commands:
buckets [dead|reliable]
dialinfo
entries [dead|reliable] [limit]
entry [node_id]
nodeinfo
"#
.to_owned());
async fn debug_config(&self, args: String) -> Result<String, VeilidAPIError> {
let config = self.config()?;
let args = args.trim_start();
if args.is_empty() {
return config
.get_key_json("")
.map_err(|e| VeilidAPIError::Internal { message: e });
}
let (arg, rest) = args.split_once(' ').unwrap_or((args, ""));
let rest = rest.trim_start().to_owned();
// Must be detached
if matches!(
self.get_state().await?.attachment,
AttachmentState::Detached | AttachmentState::Detaching
) {
return Err(VeilidAPIError::Internal {
message: "Must be detached to change config".to_owned(),
});
}
// One argument is 'config get'
if rest.is_empty() {
return config
.get_key_json(arg)
.map_err(|e| VeilidAPIError::Internal { message: e });
}
config
.set_key_json(arg, &rest)
.map_err(|e| VeilidAPIError::Internal { message: e })?;
Ok("Config value set".to_owned())
}
async fn debug_purge(&self, args: String) -> Result<String, VeilidAPIError> {
let args: Vec<String> = args.split_whitespace().map(|s| s.to_owned()).collect();
if !args.is_empty() {
if args[0] == "buckets" {
// Must be detached
if matches!(
self.get_state().await?.attachment,
AttachmentState::Detached | AttachmentState::Detaching
) {
return Err(VeilidAPIError::Internal {
message: "Must be detached to purge".to_owned(),
});
}
self.network_manager()?.routing_table().purge();
Ok("Buckets purged".to_owned())
} else {
Err(VeilidAPIError::InvalidArgument {
context: "debug_purge".to_owned(),
argument: "parameter".to_owned(),
value: args[0].clone(),
})
}
} else {
Err(VeilidAPIError::MissingArgument {
context: "debug_purge".to_owned(),
argument: "parameter".to_owned(),
})
}
}
async fn debug_attach(&self, _args: String) -> Result<String, VeilidAPIError> {
if !matches!(
self.get_state().await?.attachment,
AttachmentState::Detached
) {
return Err(VeilidAPIError::Internal {
message: "Not detached".to_owned(),
});
};
self.attach().await?;
Ok("Attached".to_owned())
}
async fn debug_detach(&self, _args: String) -> Result<String, VeilidAPIError> {
if matches!(
self.get_state().await?.attachment,
AttachmentState::Detaching
) {
return Err(VeilidAPIError::Internal {
message: "Not attached".to_owned(),
});
};
self.detach().await?;
Ok("Detached".to_owned())
}
pub async fn debug_help(&self, _args: String) -> Result<String, VeilidAPIError> {
Ok(r#">>> Debug commands:
buckets [dead|reliable]
dialinfo
entries [dead|reliable] [limit]
entry [node_id]
nodeinfo
config [key [new value]]
purge buckets
attach
detach
"#
.to_owned())
}
pub async fn debug(&self, args: String) -> Result<String, VeilidAPIError> {
let args = args.trim_start();
if args.is_empty() {
// No arguments runs help command
return self.debug_help("".to_owned()).await;
}
let (arg, rest) = args.split_once(' ').unwrap_or((args, ""));
let rest = rest.trim_start().to_owned();
let mut out = String::new();
let arg = &debug_args[0];
if arg == "buckets" {
out += self.debug_buckets(&debug_args[1..]).await?.as_str();
out += self.debug_buckets(rest).await?.as_str();
} else if arg == "dialinfo" {
out += self.debug_dialinfo(&debug_args[1..]).await?.as_str();
out += self.debug_dialinfo(rest).await?.as_str();
} else if arg == "entries" {
out += self.debug_entries(&debug_args[1..]).await?.as_str();
out += self.debug_entries(rest).await?.as_str();
} else if arg == "entry" {
out += self.debug_entry(&debug_args[1..]).await?.as_str();
out += self.debug_entry(rest).await?.as_str();
} else if arg == "nodeinfo" {
out += self.debug_nodeinfo(&debug_args[1..]).await?.as_str();
out += self.debug_nodeinfo(rest).await?.as_str();
} else if arg == "purge" {
out += self.debug_purge(rest).await?.as_str();
} else if arg == "attach" {
out += self.debug_attach(rest).await?.as_str();
} else if arg == "detach" {
out += self.debug_detach(rest).await?.as_str();
} else if arg == "config" {
out += self.debug_config(rest).await?.as_str();
} else {
out += ">>> Unknown command\n";
}

View File

@ -1194,7 +1194,6 @@ impl VeilidAPI {
// get a full copy of the current state
pub async fn get_state(&self) -> Result<VeilidState, VeilidAPIError> {
trace!("VeilidCore::get_state");
let attachment_manager = self.attachment_manager()?;
Ok(VeilidState {
attachment: attachment_manager.get_state(),
@ -1203,18 +1202,20 @@ impl VeilidAPI {
// connect to the network
pub async fn attach(&self) -> Result<(), VeilidAPIError> {
trace!("VeilidCore::attach");
let attachment_manager = self.attachment_manager()?;
attachment_manager.request_attach().await;
Ok(())
attachment_manager
.request_attach()
.await
.map_err(|e| VeilidAPIError::Internal { message: e })
}
// disconnect from the network
pub async fn detach(&self) -> Result<(), VeilidAPIError> {
trace!("VeilidCore::detach");
let attachment_manager = self.attachment_manager()?;
attachment_manager.request_detach().await;
Ok(())
attachment_manager
.request_detach()
.await
.map_err(|e| VeilidAPIError::Internal { message: e })
}
// Change api logging level if it is enabled

View File

@ -235,19 +235,19 @@ impl VeilidConfig {
}
}
pub async fn init_from_json(&mut self, config: String) -> Result<(), String> {
pub fn setup_from_json(&mut self, config: String) -> Result<(), String> {
{
let mut inner = self.inner.write();
*inner = serde_json::from_str(&config).map_err(map_to_string)?;
}
// Validate settings
self.validate().await?;
self.validate()?;
Ok(())
}
pub async fn init(&mut self, cb: ConfigCallback) -> Result<(), String> {
pub fn setup(&mut self, cb: ConfigCallback) -> Result<(), String> {
macro_rules! get_config {
($key:expr) => {
let keyname = &stringify!($key)[6..];
@ -258,7 +258,6 @@ impl VeilidConfig {
})?;
};
}
{
let mut inner = self.inner.write();
get_config!(inner.program_name);
@ -345,20 +344,78 @@ impl VeilidConfig {
get_config!(inner.network.leases.max_client_relay_leases);
}
// Validate settings
self.validate().await?;
self.validate()?;
Ok(())
}
pub async fn terminate(&self) {
//
}
pub fn get(&self) -> RwLockReadGuard<VeilidConfigInner> {
self.inner.read()
}
async fn validate(&self) -> Result<(), String> {
pub fn get_mut(&self) -> RwLockWriteGuard<VeilidConfigInner> {
self.inner.write()
}
pub fn get_key_json(&self, key: &str) -> Result<String, String> {
let c = self.get();
// Split key into path parts
let keypath: Vec<&str> = key.split('.').collect();
// Generate json from whole config
let jc = serde_json::to_string(&*c).map_err(map_to_string)?;
let jvc = json::parse(&jc).map_err(map_to_string)?;
// Find requested subkey
let mut out = &jvc;
for k in keypath {
if !jvc.has_key(k) {
return Err(format!("invalid subkey '{}' in key '{}'", k, key));
}
out = &jvc[k];
}
Ok(out.to_string())
}
pub fn set_key_json(&self, key: &str, value: &str) -> Result<(), String> {
let mut c = self.get_mut();
// Split key into path parts
let keypath: Vec<&str> = key.split('.').collect();
// Convert value into jsonvalue
let newval = json::parse(value).map_err(map_to_string)?;
// Generate json from whole config
let jc = serde_json::to_string(&*c).map_err(map_to_string)?;
let mut jvc = json::parse(&jc).map_err(map_to_string)?;
// Find requested subkey
let newconfigstring = if let Some((objkeyname, objkeypath)) = keypath.split_last() {
// Replace subkey
let mut out = &mut jvc;
for k in objkeypath {
if !jvc.has_key(*k) {
return Err(format!("invalid subkey '{}' in key '{}'", *k, key));
}
out = &mut jvc[*k];
}
if !out.has_key(objkeyname) {
return Err(format!("invalid subkey '{}' in key '{}'", objkeyname, key));
}
out[*objkeyname] = newval;
jvc.to_string()
} else {
newval.to_string()
};
// Generate and validate new config
let mut newconfig = VeilidConfig::new();
newconfig.setup_from_json(newconfigstring)?;
// Replace whole config
*c = newconfig.get().clone();
Ok(())
}
fn validate(&self) -> Result<(), String> {
let inner = self.inner.read();
if inner.program_name.is_empty() {

View File

@ -88,21 +88,21 @@ Future<VeilidConfig> getDefaultVeilidConfig() async {
udp: VeilidConfigUDP(
enabled: !kIsWeb,
socketPoolSize: 0,
listenAddress: "[::]:5150",
listenAddress: "",
publicAddress: null,
),
tcp: VeilidConfigTCP(
connect: !kIsWeb,
listen: !kIsWeb,
maxConnections: 32,
listenAddress: "[::]:5150",
listenAddress: "",
publicAddress: null,
),
ws: VeilidConfigWS(
connect: true,
listen: !kIsWeb,
maxConnections: 16,
listenAddress: "[::]:5150",
listenAddress: "",
path: "ws",
url: null,
),
@ -110,7 +110,7 @@ Future<VeilidConfig> getDefaultVeilidConfig() async {
connect: true,
listen: false,
maxConnections: 16,
listenAddress: "[::]:5150",
listenAddress: "",
path: "ws",
url: null,
),

View File

@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:flutter/material.dart';
import 'package:flutter/services.dart';
@ -24,6 +25,7 @@ LogOptions getLogOptions(LogLevel? level) {
}
void setRootLogLevel(LogLevel? level) {
print("setRootLogLevel: $level");
Loggy('').level = getLogOptions(level);
}
@ -62,6 +64,7 @@ class MyApp extends StatefulWidget {
class _MyAppState extends State<MyApp> with UiLoggy {
String _veilidVersion = 'Unknown';
Stream<VeilidUpdate>? _updateStream;
Future<void>? _updateProcessor;
@override
void initState() {
@ -95,6 +98,39 @@ class _MyAppState extends State<MyApp> with UiLoggy {
});
}
Future<void> processUpdateLog(VeilidUpdateLog update) async {
switch (update.logLevel) {
case VeilidLogLevel.error:
loggy.error(update.message);
break;
case VeilidLogLevel.warn:
loggy.warning(update.message);
break;
case VeilidLogLevel.info:
loggy.info(update.message);
break;
case VeilidLogLevel.debug:
loggy.debug(update.message);
break;
case VeilidLogLevel.trace:
loggy.trace(update.message);
break;
}
}
Future<void> processUpdates() async {
var stream = _updateStream;
if (stream != null) {
await for (final update in stream) {
if (update is VeilidUpdateLog) {
await processUpdateLog(update);
} else {
loggy.trace("Update: " + update.toString());
}
}
}
}
@override
Widget build(BuildContext context) {
final ButtonStyle buttonStyle =
@ -116,16 +152,32 @@ class _MyAppState extends State<MyApp> with UiLoggy {
child: Row(children: [
ElevatedButton(
style: buttonStyle,
onPressed: () async {
//var await Veilid.instance.startupVeilidCore(await getDefaultVeilidConfig())
// setState(() {
// };
},
onPressed: _updateStream != null
? null
: () async {
var updateStream = Veilid.instance.startupVeilidCore(
await getDefaultVeilidConfig());
setState(() {
_updateStream = updateStream;
_updateProcessor = processUpdates();
});
},
child: const Text('Startup'),
),
ElevatedButton(
style: buttonStyle,
onPressed: () {},
onPressed: _updateStream == null
? null
: () async {
await Veilid.instance.shutdownVeilidCore();
if (_updateProcessor != null) {
await _updateProcessor;
}
setState(() {
_updateProcessor = null;
_updateStream = null;
});
},
child: const Text('Shutdown'),
),
])),

View File

@ -191,39 +191,47 @@ Future<void> processFutureVoid(Future<dynamic> future) {
}
Stream<T> processStreamJson<T>(
T Function(Map<String, dynamic>) jsonConstructor, Stream<dynamic> stream) {
return stream.map((value) {
final list = value as List<dynamic>;
switch (list[0] as int) {
case messageErr:
{
throw VeilidAPIExceptionInternal("Internal API Error: ${list[1]}");
}
case messageOkJson:
{
if (list[1] == null) {
throw VeilidAPIExceptionInternal("Null MESSAGE_OK_JSON value");
T Function(Map<String, dynamic>) jsonConstructor, ReceivePort port) async* {
try {
await for (var value in port) {
final list = value as List<dynamic>;
switch (list[0] as int) {
case messageStreamItemJson:
{
if (list[1] == null) {
throw VeilidAPIExceptionInternal(
"Null MESSAGE_STREAM_ITEM_JSON value");
}
var ret = jsonDecode(list[1] as String);
yield jsonConstructor(ret);
break;
}
var ret = jsonDecode(list[1] as String);
return jsonConstructor(ret);
}
case messageErrJson:
{
throw VeilidAPIException.fromJson(jsonDecode(list[1]));
}
default:
{
throw VeilidAPIExceptionInternal(
"Unexpected async return message type: ${list[0]}");
}
case messageStreamAbort:
{
port.close();
throw VeilidAPIExceptionInternal("Internal API Error: ${list[1]}");
}
case messageStreamAbortJson:
{
port.close();
throw VeilidAPIException.fromJson(jsonDecode(list[1]));
}
case messageStreamClose:
{
port.close();
break;
}
default:
{
throw VeilidAPIExceptionInternal(
"Unexpected async return message type: ${list[0]}");
}
}
}
}).handleError((e) {
} catch (e) {
// Wrap all other errors in VeilidAPIExceptionInternal
throw VeilidAPIExceptionInternal(e.toString());
}, test: (e) {
// Pass errors that are already VeilidAPIException through without wrapping
return e is! VeilidAPIException;
});
}
}
// FFI implementation of high level Veilid API
@ -287,7 +295,7 @@ class VeilidFFI implements Veilid {
final recvPort = ReceivePort("get_veilid_state");
final sendPort = recvPort.sendPort;
_getVeilidState(sendPort.nativePort);
return processFutureJson(VeilidState.fromJson, recvPort.single);
return processFutureJson(VeilidState.fromJson, recvPort.first);
}
@override
@ -297,7 +305,7 @@ class VeilidFFI implements Veilid {
final sendPort = recvPort.sendPort;
_changeApiLogLevel(sendPort.nativePort, nativeLogLevel);
malloc.free(nativeLogLevel);
return processFutureVoid(recvPort.single);
return processFutureVoid(recvPort.first);
}
@override
@ -305,7 +313,7 @@ class VeilidFFI implements Veilid {
final recvPort = ReceivePort("shutdown_veilid_core");
final sendPort = recvPort.sendPort;
_shutdownVeilidCore(sendPort.nativePort);
return processFutureVoid(recvPort.single);
return processFutureVoid(recvPort.first);
}
@override
@ -314,7 +322,7 @@ class VeilidFFI implements Veilid {
final recvPort = ReceivePort("debug");
final sendPort = recvPort.sendPort;
_debug(sendPort.nativePort, nativeCommand);
return processFuturePlain(recvPort.single);
return processFuturePlain(recvPort.first);
}
@override

View File

@ -51,36 +51,36 @@ impl DartIsolateWrapper {
});
}
pub fn result<T: IntoDart, E: Serialize>(&self, result: Result<T, E>) -> bool {
pub fn result<T: IntoDart, E: Serialize>(self, result: Result<T, E>) -> bool {
match result {
Ok(v) => self.ok(v),
Err(e) => self.err_json(e),
}
}
pub fn result_json<T: Serialize, E: Serialize>(&self, result: Result<T, E>) -> bool {
pub fn result_json<T: Serialize, E: Serialize>(self, result: Result<T, E>) -> bool {
match result {
Ok(v) => self.ok_json(v),
Err(e) => self.err_json(e),
}
}
pub fn ok<T: IntoDart>(&self, value: T) -> bool {
pub fn ok<T: IntoDart>(self, value: T) -> bool {
self.isolate
.post(vec![MESSAGE_OK.into_dart(), value.into_dart()])
}
pub fn ok_json<T: Serialize>(&self, value: T) -> bool {
pub fn ok_json<T: Serialize>(self, value: T) -> bool {
self.isolate.post(vec![
MESSAGE_OK_JSON.into_dart(),
serialize_json(value).into_dart(),
])
}
// pub fn err<E: IntoDart>(&self, error: E) -> bool {
// pub fn err<E: IntoDart>(self, error: E) -> bool {
// self.isolate
// .post(vec![MESSAGE_ERR.into_dart(), error.into_dart()])
// }
pub fn err_json<E: Serialize>(&self, error: E) -> bool {
pub fn err_json<E: Serialize>(self, error: E) -> bool {
self.isolate.post(vec![
MESSAGE_ERR_JSON.into_dart(),
serialize_json(error).into_dart(),
@ -88,21 +88,35 @@ impl DartIsolateWrapper {
}
}
struct DartIsolateStreamInner {
pub isolate: Option<Isolate>,
}
impl Drop for DartIsolateStreamInner {
fn drop(&mut self) {
if let Some(isolate) = self.isolate {
isolate.post(vec![MESSAGE_STREAM_CLOSE.into_dart()]);
}
}
}
#[derive(Clone)]
pub struct DartIsolateStream {
isolate: Arc<Mutex<Option<Isolate>>>,
inner: Arc<Mutex<DartIsolateStreamInner>>,
}
impl DartIsolateStream {
pub fn new(port: i64) -> Self {
DartIsolateStream {
isolate: Arc::new(Mutex::new(Some(Isolate::new(port)))),
inner: Arc::new(Mutex::new(DartIsolateStreamInner {
isolate: Some(Isolate::new(port)),
})),
}
}
// pub fn item<T: IntoDart>(&self, value: T) -> bool {
// let isolate = self.isolate.lock();
// if let Some(isolate) = &*isolate {
// let mut inner = self.inner.lock();
// if let Some(isolate) = inner.isolate.take() {
// isolate.post(vec![MESSAGE_STREAM_ITEM.into_dart(), value.into_dart()])
// } else {
// false
@ -110,8 +124,8 @@ impl DartIsolateStream {
// }
pub fn item_json<T: Serialize>(&self, value: T) -> bool {
let isolate = self.isolate.lock();
if let Some(isolate) = &*isolate {
let inner = self.inner.lock();
if let Some(isolate) = &inner.isolate {
isolate.post(vec![
MESSAGE_STREAM_ITEM_JSON.into_dart(),
serialize_json(value).into_dart(),
@ -122,8 +136,8 @@ impl DartIsolateStream {
}
// pub fn abort<E: IntoDart>(self, error: E) -> bool {
// let mut isolate = self.isolate.lock();
// if let Some(isolate) = isolate.take() {
// let mut inner = self.inner.lock();
// if let Some(isolate) = inner.isolate.take() {
// isolate.post(vec![MESSAGE_STREAM_ABORT.into_dart(), error.into_dart()])
// } else {
// false
@ -131,8 +145,8 @@ impl DartIsolateStream {
// }
pub fn abort_json<E: Serialize>(self, error: E) -> bool {
let mut isolate = self.isolate.lock();
if let Some(isolate) = isolate.take() {
let mut inner = self.inner.lock();
if let Some(isolate) = inner.isolate.take() {
isolate.post(vec![
MESSAGE_STREAM_ABORT_JSON.into_dart(),
serialize_json(error).into_dart(),
@ -143,20 +157,11 @@ impl DartIsolateStream {
}
pub fn close(self) -> bool {
let mut isolate = self.isolate.lock();
if let Some(isolate) = isolate.take() {
let mut inner = self.inner.lock();
if let Some(isolate) = inner.isolate.take() {
isolate.post(vec![MESSAGE_STREAM_CLOSE.into_dart()])
} else {
false
}
}
}
impl Drop for DartIsolateStream {
fn drop(&mut self) {
let mut isolate = self.isolate.lock();
if let Some(isolate) = isolate.take() {
isolate.post(vec![MESSAGE_STREAM_CLOSE.into_dart()]);
}
}
}