wasm cleanup

This commit is contained in:
John Smith 2022-07-12 08:02:22 -04:00
parent b73511142a
commit b9acd532db
19 changed files with 248 additions and 335 deletions

20
Cargo.lock generated
View File

@ -378,6 +378,7 @@ dependencies = [
"blanket", "blanket",
"futures-core", "futures-core",
"futures-task", "futures-task",
"futures-timer",
"futures-util", "futures-util",
"pin-project 1.0.11", "pin-project 1.0.11",
"rustc_version", "rustc_version",
@ -1787,6 +1788,16 @@ version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a"
[[package]]
name = "futures-timer"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
dependencies = [
"gloo-timers",
"send_wrapper 0.4.0",
]
[[package]] [[package]]
name = "futures-util" name = "futures-util"
version = "0.3.21" version = "0.3.21"
@ -3869,6 +3880,12 @@ version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2333e6df6d6598f2b1974829f853c2b4c5f4a6e503c10af918081aa6f8564e1" checksum = "a2333e6df6d6598f2b1974829f853c2b4c5f4a6e503c10af918081aa6f8564e1"
[[package]]
name = "send_wrapper"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0"
[[package]] [[package]]
name = "send_wrapper" name = "send_wrapper"
version = "0.5.0" version = "0.5.0"
@ -3880,6 +3897,9 @@ name = "send_wrapper"
version = "0.6.0" version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73"
dependencies = [
"futures-core",
]
[[package]] [[package]]
name = "serde" name = "serde"

View File

@ -104,9 +104,9 @@ serde_cbor = { version = "^0", default-features = false, features = ["alloc"] }
serde_json = { version = "^1", default-features = false, features = ["alloc"] } serde_json = { version = "^1", default-features = false, features = ["alloc"] }
getrandom = { version = "^0", features = ["js"] } getrandom = { version = "^0", features = ["js"] }
ws_stream_wasm = "^0" ws_stream_wasm = "^0"
async_executors = { version = "^0", default-features = false, features = [ "bindgen" ]} async_executors = { version = "^0", default-features = false, features = [ "bindgen", "timer" ]}
async-lock = "^2" async-lock = "^2"
send_wrapper = "^0" send_wrapper = { version = "^0", features = ["futures"] }
wasm-logger = "^0" wasm-logger = "^0"
tracing-wasm = "^0" tracing-wasm = "^0"

View File

@ -5,18 +5,8 @@ use core::fmt::Write;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use tracing_subscriber::*; use tracing_subscriber::*;
cfg_if! { struct ApiLoggerInner {
if #[cfg(target_arch = "wasm32")] {
use send_wrapper::*;
struct ApiLoggerInner {
update_callback: SendWrapper<UpdateCallback>,
}
} else {
struct ApiLoggerInner {
update_callback: UpdateCallback, update_callback: UpdateCallback,
}
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -28,17 +18,7 @@ static API_LOGGER: OnceCell<ApiTracingLayer> = OnceCell::new();
impl ApiTracingLayer { impl ApiTracingLayer {
fn new_inner(update_callback: UpdateCallback) -> ApiLoggerInner { fn new_inner(update_callback: UpdateCallback) -> ApiLoggerInner {
cfg_if! { ApiLoggerInner { update_callback }
if #[cfg(target_arch = "wasm32")] {
ApiLoggerInner {
update_callback: SendWrapper::new(update_callback),
}
} else {
ApiLoggerInner {
update_callback,
}
}
}
} }
#[instrument(level = "debug", skip(update_callback))] #[instrument(level = "debug", skip(update_callback))]

View File

@ -1,21 +1,9 @@
use crate::xx::*; use crate::xx::*;
pub use rust_fsm::*; pub use rust_fsm::*;
cfg_if! { pub type StateChangeCallback<T> = Arc<
if #[cfg(target_arch = "wasm32")] { dyn Fn(<T as StateMachineImpl>::State, <T as StateMachineImpl>::State) + Send + Sync + 'static,
pub type StateChangeCallback<T> = Arc< >;
dyn Fn(<T as StateMachineImpl>::State, <T as StateMachineImpl>::State)
+ 'static,
>;
} else {
pub type StateChangeCallback<T> = Arc<
dyn Fn(<T as StateMachineImpl>::State, <T as StateMachineImpl>::State)
+ Send
+ Sync
+ 'static,
>;
}
}
struct CallbackStateMachineInner<T> struct CallbackStateMachineInner<T>
where where

View File

@ -5,13 +5,7 @@ use crate::veilid_api::*;
use crate::veilid_config::*; use crate::veilid_config::*;
use crate::xx::*; use crate::xx::*;
// cfg_if! {
// if #[cfg(target_arch = "wasm32")] {
// pub type UpdateCallback = Arc<dyn Fn(VeilidUpdate)>;
// } else {
pub type UpdateCallback = Arc<dyn Fn(VeilidUpdate) + Send + Sync>; pub type UpdateCallback = Arc<dyn Fn(VeilidUpdate) + Send + Sync>;
// }
// }
struct ServicesContext { struct ServicesContext {
pub config: VeilidConfig, pub config: VeilidConfig,

View File

@ -11,10 +11,10 @@ pub fn get_timestamp() -> u64 {
} }
} }
pub fn get_timestamp_string() -> String { // pub fn get_timestamp_string() -> String {
let dt = chrono::Utc::now(); // let dt = chrono::Utc::now();
dt.time().format("%H:%M:%S.3f").to_string() // dt.time().format("%H:%M:%S.3f").to_string()
} // }
pub fn random_bytes(dest: &mut [u8]) -> EyreResult<()> { pub fn random_bytes(dest: &mut [u8]) -> EyreResult<()> {
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
@ -83,26 +83,26 @@ where
} }
} }
pub fn spawn_with_local_set<Out>( // pub fn spawn_with_local_set<Out>(
future: impl Future<Output = Out> + Send + 'static, // future: impl Future<Output = Out> + Send + 'static,
) -> MustJoinHandle<Out> // ) -> MustJoinHandle<Out>
where // where
Out: Send + 'static, // Out: Send + 'static,
{ // {
cfg_if! { // cfg_if! {
if #[cfg(feature="rt-async-std")] { // if #[cfg(feature="rt-async-std")] {
spawn(future) // spawn(future)
} else if #[cfg(feature="rt-tokio")] { // } else if #[cfg(feature="rt-tokio")] {
MustJoinHandle::new(tokio::task::spawn_blocking(move || { // MustJoinHandle::new(tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Handle::current(); // let rt = tokio::runtime::Handle::current();
rt.block_on(async { // rt.block_on(async {
let local = tokio::task::LocalSet::new(); // let local = tokio::task::LocalSet::new();
local.run_until(future).await // local.run_until(future).await
}) // })
})) // }))
} // }
} // }
} // }
pub fn spawn_detached<Out>(future: impl Future<Output = Out> + Send + 'static) pub fn spawn_detached<Out>(future: impl Future<Output = Out> + Send + 'static)
where where

View File

@ -1,11 +1,11 @@
use super::utils; use super::utils;
use crate::xx::*; use crate::xx::*;
use crate::*; use crate::*;
use async_executors::{Bindgen, LocalSpawnHandleExt /*, SpawnHandleExt*/}; use async_executors::{Bindgen, LocalSpawnHandleExt, SpawnHandleExt, Timer};
use futures_util::future::{select, Either}; use futures_util::future::{select, Either};
use js_sys::*; use js_sys::*;
use wasm_bindgen_futures::*; use wasm_bindgen_futures::*;
use web_sys::*; //use web_sys::*;
#[wasm_bindgen] #[wasm_bindgen]
extern "C" { extern "C" {
@ -26,17 +26,17 @@ pub fn get_timestamp() -> u64 {
} }
} }
pub fn get_timestamp_string() -> String { // pub fn get_timestamp_string() -> String {
let date = Date::new_0(); // let date = Date::new_0();
let hours = Date::get_utc_hours(&date); // let hours = Date::get_utc_hours(&date);
let minutes = Date::get_utc_minutes(&date); // let minutes = Date::get_utc_minutes(&date);
let seconds = Date::get_utc_seconds(&date); // let seconds = Date::get_utc_seconds(&date);
let milliseconds = Date::get_utc_milliseconds(&date); // let milliseconds = Date::get_utc_milliseconds(&date);
format!( // format!(
"{:02}:{:02}:{:02}.{}", // "{:02}:{:02}:{:02}.{}",
hours, minutes, seconds, milliseconds // hours, minutes, seconds, milliseconds
) // )
} // }
pub fn random_bytes(dest: &mut [u8]) -> EyreResult<()> { pub fn random_bytes(dest: &mut [u8]) -> EyreResult<()> {
let len = dest.len(); let len = dest.len();
@ -72,43 +72,21 @@ pub fn get_random_u64() -> u64 {
} }
pub async fn sleep(millis: u32) { pub async fn sleep(millis: u32) {
if utils::is_browser() { Bindgen.sleep(Duration::from_millis(millis.into())).await
let wait_millis = if millis > u32::MAX {
i32::MAX
} else {
millis as i32
};
let promise = Promise::new(&mut |yes, _| {
let win = window().unwrap();
win.set_timeout_with_callback_and_timeout_and_arguments_0(&yes, wait_millis)
.unwrap();
});
JsFuture::from(promise).await.unwrap();
} else if utils::is_nodejs() {
let promise = Promise::new(&mut |yes, _| {
nodejs_global_set_timeout_with_callback_and_timeout_and_arguments_0(&yes, millis)
.unwrap();
});
JsFuture::from(promise).await.unwrap();
} else {
panic!("WASM requires browser or nodejs environment");
}
} }
pub fn system_boxed<'a, Out>( pub fn system_boxed<'a, Out>(
future: impl Future<Output = Out> + 'a, future: impl Future<Output = Out> + Send + 'a,
) -> SystemPinBoxFutureLifetime<'a, Out> { ) -> SystemPinBoxFutureLifetime<'a, Out> {
Box::pin(future) Box::pin(future)
} }
pub fn spawn<Out>(future: impl Future<Output = Out> + 'static) -> MustJoinHandle<Out> pub fn spawn<Out>(future: impl Future<Output = Out> + Send + 'static) -> MustJoinHandle<Out>
where where
Out: Send + 'static, Out: Send + 'static,
{ {
MustJoinHandle::new(Bindgen MustJoinHandle::new(Bindgen
.spawn_handle_local(future) .spawn_handle(future)
.expect("wasm-bindgen-futures spawn should never error out")) .expect("wasm-bindgen-futures spawn should never error out"))
} }
@ -121,16 +99,16 @@ where
.expect("wasm-bindgen-futures spawn_local should never error out")) .expect("wasm-bindgen-futures spawn_local should never error out"))
} }
pub fn spawn_with_local_set<Out>( // pub fn spawn_with_local_set<Out>(
future: impl Future<Output = Out> + 'static, // future: impl Future<Output = Out> + Send + 'static,
) -> MustJoinHandle<Out> // ) -> MustJoinHandle<Out>
where // where
Out: Send + 'static, // Out: Send + 'static,
{ // {
spawn(future) // spawn(future)
} // }
pub fn spawn_detached<Out>(future: impl Future<Output = Out> + 'static) pub fn spawn_detached<Out>(future: impl Future<Output = Out> + Send + 'static)
where where
Out: Send + 'static, Out: Send + 'static,
{ {
@ -142,13 +120,13 @@ where
pub fn interval<F, FUT>(freq_ms: u32, callback: F) -> SystemPinBoxFuture<()> pub fn interval<F, FUT>(freq_ms: u32, callback: F) -> SystemPinBoxFuture<()>
where where
F: Fn() -> FUT + 'static, F: Fn() -> FUT + Send + Sync + 'static,
FUT: Future<Output = ()>, FUT: Future<Output = ()> + Send,
{ {
let e = Eventual::new(); let e = Eventual::new();
let ie = e.clone(); let ie = e.clone();
let jh = spawn_local(Box::pin(async move { let jh = spawn(Box::pin(async move {
while timeout(freq_ms, ie.instance_clone(())).await.is_err() { while timeout(freq_ms, ie.instance_clone(())).await.is_err() {
callback().await; callback().await;
} }

View File

@ -13,6 +13,7 @@ struct TableStoreInner {
pub struct TableStore { pub struct TableStore {
config: VeilidConfig, config: VeilidConfig,
inner: Arc<Mutex<TableStoreInner>>, inner: Arc<Mutex<TableStoreInner>>,
async_lock: Arc<AsyncMutex<()>>,
} }
impl TableStore { impl TableStore {
@ -25,14 +26,17 @@ impl TableStore {
Self { Self {
config, config,
inner: Arc::new(Mutex::new(Self::new_inner())), inner: Arc::new(Mutex::new(Self::new_inner())),
async_lock: Arc::new(AsyncMutex::new(())),
} }
} }
pub async fn init(&self) -> EyreResult<()> { pub async fn init(&self) -> EyreResult<()> {
let _async_guard = self.async_lock.lock().await;
Ok(()) Ok(())
} }
pub async fn terminate(&self) { pub async fn terminate(&self) {
let _async_guard = self.async_lock.lock().await;
assert!( assert!(
self.inner.lock().opened.len() == 0, self.inner.lock().opened.len() == 0,
"all open databases should have been closed" "all open databases should have been closed"
@ -66,8 +70,10 @@ impl TableStore {
} }
pub async fn open(&self, name: &str, column_count: u32) -> EyreResult<TableDB> { pub async fn open(&self, name: &str, column_count: u32) -> EyreResult<TableDB> {
let _async_guard = self.async_lock.lock().await;
let table_name = self.get_table_name(name)?; let table_name = self.get_table_name(name)?;
{
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
if let Some(table_db_weak_inner) = inner.opened.get(&table_name) { if let Some(table_db_weak_inner) = inner.opened.get(&table_name) {
match TableDB::try_new_from_weak_inner(table_db_weak_inner.clone()) { match TableDB::try_new_from_weak_inner(table_db_weak_inner.clone()) {
@ -79,6 +85,7 @@ impl TableStore {
} }
}; };
} }
}
let db = Database::open(table_name.clone(), column_count) let db = Database::open(table_name.clone(), column_count)
.await .await
.wrap_err("failed to open tabledb")?; .wrap_err("failed to open tabledb")?;
@ -86,15 +93,20 @@ impl TableStore {
let table_db = TableDB::new(table_name.clone(), self.clone(), db); let table_db = TableDB::new(table_name.clone(), self.clone(), db);
{
let mut inner = self.inner.lock();
inner.opened.insert(table_name, table_db.weak_inner()); inner.opened.insert(table_name, table_db.weak_inner());
}
Ok(table_db) Ok(table_db)
} }
pub async fn delete(&self, name: &str) -> EyreResult<bool> { pub async fn delete(&self, name: &str) -> EyreResult<bool> {
let _async_guard = self.async_lock.lock().await;
trace!("TableStore::delete {}", name); trace!("TableStore::delete {}", name);
let table_name = self.get_table_name(name)?; let table_name = self.get_table_name(name)?;
{
let inner = self.inner.lock(); let inner = self.inner.lock();
if inner.opened.contains_key(&table_name) { if inner.opened.contains_key(&table_name) {
trace!( trace!(
@ -103,6 +115,7 @@ impl TableStore {
); );
bail!("Not deleting table that is still opened"); bail!("Not deleting table that is still opened");
} }
}
if utils::is_nodejs() { if utils::is_nodejs() {
unimplemented!(); unimplemented!();

View File

@ -44,39 +44,39 @@ pub fn is_browser() -> bool {
res res
} }
pub fn is_browser_https() -> bool { // pub fn is_browser_https() -> bool {
static CACHE: AtomicI8 = AtomicI8::new(-1); // static CACHE: AtomicI8 = AtomicI8::new(-1);
let cache = CACHE.load(Ordering::Relaxed); // let cache = CACHE.load(Ordering::Relaxed);
if cache != -1 { // if cache != -1 {
return cache != 0; // return cache != 0;
} // }
let res = js_sys::eval("window.location.protocol === 'https'") // let res = js_sys::eval("window.location.protocol === 'https'")
.map(|res| res.is_truthy()) // .map(|res| res.is_truthy())
.unwrap_or_default(); // .unwrap_or_default();
CACHE.store(res as i8, Ordering::Relaxed); // CACHE.store(res as i8, Ordering::Relaxed);
res // res
} // }
pub fn node_require(module: &str) -> JsValue { // pub fn node_require(module: &str) -> JsValue {
if !is_nodejs() { // if !is_nodejs() {
return JsValue::UNDEFINED; // return JsValue::UNDEFINED;
} // }
let mut home = env!("CARGO_MANIFEST_DIR"); // let mut home = env!("CARGO_MANIFEST_DIR");
if home.len() == 0 { // if home.len() == 0 {
home = "."; // home = ".";
} // }
match js_sys::eval(format!("require(\"{}/{}\")", home, module).as_str()) { // match js_sys::eval(format!("require(\"{}/{}\")", home, module).as_str()) {
Ok(v) => v, // Ok(v) => v,
Err(e) => { // Err(e) => {
panic!("node_require failed: {:?}", e); // panic!("node_require failed: {:?}", e);
} // }
} // }
} // }
#[derive(ThisError, Debug, Clone, Eq, PartialEq)] #[derive(ThisError, Debug, Clone, Eq, PartialEq)]
#[error("JsValue error")] #[error("JsValue error")]

View File

@ -333,6 +333,7 @@ impl ConnectionManager {
// Called by low-level network when any connection-oriented protocol connection appears // Called by low-level network when any connection-oriented protocol connection appears
// either from incoming connections. // either from incoming connections.
#[cfg_attr(target_os = "wasm32", allow(dead_code))]
pub(super) async fn on_accepted_protocol_network_connection( pub(super) async fn on_accepted_protocol_network_connection(
&self, &self,
conn: ProtocolNetworkConnection, conn: ProtocolNetworkConnection,

View File

@ -7,6 +7,7 @@ use std::io;
#[derive(Debug)] #[derive(Debug)]
pub enum ProtocolNetworkConnection { pub enum ProtocolNetworkConnection {
#[allow(dead_code)]
Dummy(DummyNetworkConnection), Dummy(DummyNetworkConnection),
Ws(ws::WebsocketNetworkConnection), Ws(ws::WebsocketNetworkConnection),
//WebRTC(wrtc::WebRTCNetworkConnection), //WebRTC(wrtc::WebRTCNetworkConnection),

View File

@ -2,9 +2,10 @@ use super::*;
use futures_util::{SinkExt, StreamExt}; use futures_util::{SinkExt, StreamExt};
use std::io; use std::io;
use ws_stream_wasm::*; use ws_stream_wasm::*;
use send_wrapper::*;
struct WebsocketNetworkConnectionInner { struct WebsocketNetworkConnectionInner {
ws_meta: WsMeta, _ws_meta: WsMeta,
ws_stream: CloneStream<WsStream>, ws_stream: CloneStream<WsStream>,
} }
@ -29,7 +30,7 @@ impl WebsocketNetworkConnection {
Self { Self {
descriptor, descriptor,
inner: Arc::new(WebsocketNetworkConnectionInner { inner: Arc::new(WebsocketNetworkConnectionInner {
ws_meta, _ws_meta: ws_meta,
ws_stream: CloneStream::new(ws_stream), ws_stream: CloneStream::new(ws_stream),
}), }),
} }
@ -59,7 +60,7 @@ impl WebsocketNetworkConnection {
#[instrument(level = "trace", err, skip(self), fields(ret.len))] #[instrument(level = "trace", err, skip(self), fields(ret.len))]
pub async fn recv(&self) -> io::Result<Vec<u8>> { pub async fn recv(&self) -> io::Result<Vec<u8>> {
let out = match self.inner.ws_stream.clone().next().await { let out = match SendWrapper::new(self.inner.ws_stream.clone().next()).await {
Some(WsMessage::Binary(v)) => v, Some(WsMessage::Binary(v)) => v,
Some(x) => { Some(x) => {
bail_io_error_other!("Unexpected WS message type"); bail_io_error_other!("Unexpected WS message type");
@ -100,7 +101,8 @@ impl WebsocketProtocolHandler {
bail_io_error_other!("invalid websocket url scheme"); bail_io_error_other!("invalid websocket url scheme");
} }
let (wsmeta, wsio) = WsMeta::connect(request, None).await.map_err(to_io)?; let fut = spawn_local(WsMeta::connect(request, None));
let (wsmeta, wsio) = fut.await.map_err(to_io)?;
// Make our connection descriptor // Make our connection descriptor
Ok(ProtocolNetworkConnection::Ws( Ok(ProtocolNetworkConnection::Ws(

View File

@ -15,9 +15,7 @@ pub enum ReceiptEvent {
Cancelled, Cancelled,
} }
cfg_if! { pub trait ReceiptCallback: Send + 'static {
if #[cfg(target_arch = "wasm32")] {
pub trait ReceiptCallback: 'static {
fn call( fn call(
&self, &self,
event: ReceiptEvent, event: ReceiptEvent,
@ -25,37 +23,12 @@ cfg_if! {
returns_so_far: u32, returns_so_far: u32,
expected_returns: u32, expected_returns: u32,
) -> SystemPinBoxFuture<()>; ) -> SystemPinBoxFuture<()>;
} }
impl<T, F> ReceiptCallback for T impl<F, T> ReceiptCallback for T
where where
T: Fn(ReceiptEvent, Receipt, u32, u32) -> F + 'static,
F: Future<Output = ()> + 'static,
{
fn call(
&self,
event: ReceiptEvent,
receipt: Receipt,
returns_so_far: u32,
expected_returns: u32,
) -> SystemPinBoxFuture<()> {
Box::pin(self(event, receipt, returns_so_far, expected_returns))
}
}
} else {
pub trait ReceiptCallback: Send + 'static {
fn call(
&self,
event: ReceiptEvent,
receipt: Receipt,
returns_so_far: u32,
expected_returns: u32,
) -> SystemPinBoxFuture<()>;
}
impl<F, T> ReceiptCallback for T
where
T: Fn(ReceiptEvent, Receipt, u32, u32) -> F + Send + 'static, T: Fn(ReceiptEvent, Receipt, u32, u32) -> F + Send + 'static,
F: Future<Output = ()> + Send + 'static F: Future<Output = ()> + Send + 'static,
{ {
fn call( fn call(
&self, &self,
event: ReceiptEvent, event: ReceiptEvent,
@ -65,8 +38,6 @@ cfg_if! {
) -> SystemPinBoxFuture<()> { ) -> SystemPinBoxFuture<()> {
Box::pin(self(event, receipt, returns_so_far, expected_returns)) Box::pin(self(event, receipt, returns_so_far, expected_returns))
} }
}
}
} }
type ReceiptCallbackType = Box<dyn ReceiptCallback>; type ReceiptCallbackType = Box<dyn ReceiptCallback>;

View File

@ -1593,15 +1593,9 @@ pub struct PeerStats {
pub status: Option<NodeStatus>, // Last known node status pub status: Option<NodeStatus>, // Last known node status
} }
cfg_if! { pub type ValueChangeCallback =
if #[cfg(target_arch = "wasm32")] {
pub type ValueChangeCallback =
Arc<dyn Fn(ValueKey, Vec<u8>) -> SystemPinBoxFuture<()> + 'static>;
} else {
pub type ValueChangeCallback =
Arc<dyn Fn(ValueKey, Vec<u8>) -> SystemPinBoxFuture<()> + Send + Sync + 'static>; Arc<dyn Fn(ValueKey, Vec<u8>) -> SystemPinBoxFuture<()> + Send + Sync + 'static>;
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////////
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]

View File

@ -3,16 +3,8 @@ use crate::*;
use serde::*; use serde::*;
//////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////
cfg_if! { pub type ConfigCallbackReturn = Result<Box<dyn core::any::Any + Send>, VeilidAPIError>;
if #[cfg(target_arch = "wasm32")] { pub type ConfigCallback = Arc<dyn Fn(String) -> ConfigCallbackReturn + Send + Sync>;
pub type ConfigCallbackReturn = Result<Box<dyn core::any::Any>, VeilidAPIError>;
pub type ConfigCallback = Arc<dyn Fn(String) -> ConfigCallbackReturn>;
} else {
pub type ConfigCallbackReturn = Result<Box<dyn core::any::Any + Send>, VeilidAPIError>;
pub type ConfigCallback = Arc<dyn Fn(String) -> ConfigCallbackReturn + Send + Sync>;
}
}
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct VeilidConfigHTTPS { pub struct VeilidConfigHTTPS {

View File

@ -69,8 +69,8 @@ cfg_if! {
pub use async_lock::Mutex as AsyncMutex; pub use async_lock::Mutex as AsyncMutex;
pub use async_lock::MutexGuard as AsyncMutexGuard; pub use async_lock::MutexGuard as AsyncMutexGuard;
pub use no_std_net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr }; pub use no_std_net::{ SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs, IpAddr, Ipv4Addr, Ipv6Addr };
pub type SystemPinBoxFuture<T> = PinBox<dyn Future<Output = T> + 'static>; pub type SystemPinBoxFuture<T> = PinBox<dyn Future<Output = T> + Send + 'static>;
pub type SystemPinBoxFutureLifetime<'a, T> = PinBox<dyn Future<Output = T> + 'a>; pub type SystemPinBoxFutureLifetime<'a, T> = PinBox<dyn Future<Output = T> + Send + 'a>;
pub use async_executors::JoinHandle as LowLevelJoinHandle; pub use async_executors::JoinHandle as LowLevelJoinHandle;
} else { } else {
pub use std::string::String; pub use std::string::String;

View File

@ -125,12 +125,10 @@ where
} }
// Possibly spawn the future possibly returning the value of the last execution // Possibly spawn the future possibly returning the value of the last execution
cfg_if! { pub async fn single_spawn_local(
if #[cfg(target_arch = "wasm32")] {
pub async fn single_spawn(
&self, &self,
future: impl Future<Output = T> + 'static, future: impl Future<Output = T> + 'static,
) -> Result<(Option<T>,bool), ()> { ) -> Result<(Option<T>, bool), ()> {
let mut out: Option<T> = None; let mut out: Option<T> = None;
// See if we have a result we can return // See if we have a result we can return
@ -165,15 +163,12 @@ where
// Return the prior result if we have one // Return the prior result if we have one
Ok((out, run)) Ok((out, run))
} }
}
}
} }
cfg_if! {
if #[cfg(not(target_arch = "wasm32"))] { impl<T> MustJoinSingleFuture<T>
impl<T> MustJoinSingleFuture<T> where
where
T: 'static + Send, T: 'static + Send,
{ {
pub async fn single_spawn( pub async fn single_spawn(
&self, &self,
future: impl Future<Output = T> + Send + 'static, future: impl Future<Output = T> + Send + 'static,
@ -207,6 +202,4 @@ cfg_if! {
// Return the prior result if we have one // Return the prior result if we have one
Ok((out, run)) Ok((out, run))
} }
}
}
} }

View File

@ -3,23 +3,13 @@ use crate::*;
use core::sync::atomic::{AtomicU64, Ordering}; use core::sync::atomic::{AtomicU64, Ordering};
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
cfg_if! { type TickTaskRoutine<E> =
if #[cfg(target_arch = "wasm32")] {
type TickTaskRoutine<E> =
dyn Fn(StopToken, u64, u64) -> PinBoxFuture<Result<(), E>> + 'static;
} else {
type TickTaskRoutine<E> =
dyn Fn(StopToken, u64, u64) -> SendPinBoxFuture<Result<(), E>> + Send + Sync + 'static; dyn Fn(StopToken, u64, u64) -> SendPinBoxFuture<Result<(), E>> + Send + Sync + 'static;
}
}
/// Runs a single-future background processing task, attempting to run it once every 'tick period' microseconds. /// Runs a single-future background processing task, attempting to run it once every 'tick period' microseconds.
/// If the prior tick is still running, it will allow it to finish, and do another tick when the timer comes around again. /// If the prior tick is still running, it will allow it to finish, and do another tick when the timer comes around again.
/// One should attempt to make tasks short-lived things that run in less than the tick period if you want things to happen with regular periodicity. /// One should attempt to make tasks short-lived things that run in less than the tick period if you want things to happen with regular periodicity.
pub struct TickTask< pub struct TickTask<E: Send + 'static> {
#[cfg(target_arch = "wasm32")] E: 'static,
#[cfg(not(target_arch = "wasm32"))] E: Send + 'static,
> {
last_timestamp_us: AtomicU64, last_timestamp_us: AtomicU64,
tick_period_us: u64, tick_period_us: u64,
routine: OnceCell<Box<TickTaskRoutine<E>>>, routine: OnceCell<Box<TickTaskRoutine<E>>>,
@ -27,11 +17,7 @@ pub struct TickTask<
single_future: MustJoinSingleFuture<Result<(), E>>, single_future: MustJoinSingleFuture<Result<(), E>>,
} }
impl< impl<E: Send + 'static> TickTask<E> {
#[cfg(target_arch = "wasm32")] E: 'static,
#[cfg(not(target_arch = "wasm32"))] E: Send + 'static,
> TickTask<E>
{
pub fn new_us(tick_period_us: u64) -> Self { pub fn new_us(tick_period_us: u64) -> Self {
Self { Self {
last_timestamp_us: AtomicU64::new(0), last_timestamp_us: AtomicU64::new(0),

View File

@ -167,7 +167,7 @@ pub fn listen_address_to_socket_addrs(listen_address: &str) -> EyreResult<Vec<So
cfg_if! { cfg_if! {
if #[cfg(target_arch = "wasm32")] { if #[cfg(target_arch = "wasm32")] {
use core::str::FromStr; use core::str::FromStr;
vec![SocketAddr::from_str(listen_address).wrap_err("Unable to parse address")?] vec![SocketAddr::from_str(listen_address).map_err(|e| io_error_other!(e)).wrap_err("Unable to parse address")?]
} else { } else {
listen_address listen_address
.to_socket_addrs() .to_socket_addrs()