fix wasm websocket

This commit is contained in:
John Smith 2022-01-05 19:15:45 -05:00
parent ea2300c32b
commit 2eb598e68c
2 changed files with 16 additions and 20 deletions

View File

@ -8,13 +8,12 @@ use futures_util::{StreamExt, SinkExt};
struct WebsocketNetworkConnectionInner { struct WebsocketNetworkConnectionInner {
ws_meta: WsMeta, ws_meta: WsMeta,
ws_stream: WsStream, ws_stream: CloneStream<WsStream>,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct WebsocketNetworkConnection { pub struct WebsocketNetworkConnection {
tls: bool, inner: Arc<WebsocketNetworkConnectionInner>,
inner: Arc<AsyncMutex<WebsocketNetworkConnectionInner>>,
} }
impl fmt::Debug for WebsocketNetworkConnection { impl fmt::Debug for WebsocketNetworkConnection {
@ -24,34 +23,30 @@ impl fmt::Debug for WebsocketNetworkConnection {
} }
impl WebsocketNetworkConnection { impl WebsocketNetworkConnection {
pub fn new(tls: bool, ws_meta: WsMeta, ws_stream: WsStream) -> Self { pub fn new(ws_meta: WsMeta, ws_stream: WsStream) -> Self {
Self { Self {
tls, inner: Arc::new(WebsocketNetworkConnectionInner {
inner: Arc::new(AsyncMutex::new(WebsocketNetworkConnectionInner {
ws_meta, ws_meta,
ws_stream, ws_stream: CloneStream::new(ws_stream),
})), }),
} }
} }
pub async fn close(&self) -> Result<(), String> { pub async fn close(&self) -> Result<(), String> {
let inner = self.inner.lock().await; self.inner.ws_meta.close().await.map_err(map_to_string).map(drop)
inner.ws_meta.close().await.map_err(map_to_string).map(drop)
} }
pub async fn send(&self, message: Vec<u8>) -> Result<(), String> { pub async fn send(&self, message: Vec<u8>) -> Result<(), String> {
if message.len() > MAX_MESSAGE_SIZE { if message.len() > MAX_MESSAGE_SIZE {
return Err("sending too large WS message".to_owned()).map_err(logthru_net!(error)); return Err("sending too large WS message".to_owned()).map_err(logthru_net!(error));
} }
let mut inner = self.inner.lock().await; self.inner.ws_stream.clone()
inner.ws_stream
.send(WsMessage::Binary(message)).await .send(WsMessage::Binary(message)).await
.map_err(|_| "failed to send to websocket".to_owned()) .map_err(|_| "failed to send to websocket".to_owned())
.map_err(logthru_net!(error)) .map_err(logthru_net!(error))
} }
pub async fn recv(&self) -> Result<Vec<u8>, String> { pub async fn recv(&self) -> Result<Vec<u8>, String> {
let mut inner = self.inner.lock().await; let out = match self.inner.ws_stream.clone().next().await {
let out = match inner.ws_stream.next().await {
Some(WsMessage::Binary(v)) => v, Some(WsMessage::Binary(v)) => v,
Some(_) => { Some(_) => {
return Err("Unexpected WS message type".to_owned()) return Err("Unexpected WS message type".to_owned())
@ -104,7 +99,7 @@ impl WebsocketProtocolHandler {
Ok(NetworkConnection::from_protocol(ConnectionDescriptor { Ok(NetworkConnection::from_protocol(ConnectionDescriptor {
local: None, local: None,
remote: dial_info.to_peer_address(), remote: dial_info.to_peer_address(),
},ProtocolNetworkConnection::Ws(WebsocketNetworkConnection::new(tls, wsmeta, wsio)))) },ProtocolNetworkConnection::Ws(WebsocketNetworkConnection::new(wsmeta, wsio))))
} }
pub async fn send_unbound_message(dial_info: DialInfo, data: Vec<u8>) -> Result<(), String> { pub async fn send_unbound_message(dial_info: DialInfo, data: Vec<u8>) -> Result<(), String> {

View File

@ -1,11 +1,12 @@
use crate::xx::*; use crate::xx::*;
use core::time::Duration;
static MESSAGE: &[u8; 62] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
use async_std::io; use async_std::io;
use async_std::net::{TcpListener, TcpStream}; use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*; use async_std::prelude::*;
use async_std::task; use async_std::task;
use std::time::Duration;
static MESSAGE: &[u8; 62] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
async fn make_tcp_loopback() -> Result<(TcpStream, TcpStream), io::Error> { async fn make_tcp_loopback() -> Result<(TcpStream, TcpStream), io::Error> {
let listener = TcpListener::bind("127.0.0.1:0").await?; let listener = TcpListener::bind("127.0.0.1:0").await?;
@ -35,13 +36,13 @@ async fn make_async_peek_stream_loopback() -> (AsyncPeekStream, AsyncPeekStream)
(aps_a, aps_c) (aps_a, aps_c)
} }
async fn make_tcpstream_loopback() -> (TcpStream, TcpStream) { async fn make_stream_loopback() -> (TcpStream, TcpStream) {
make_tcp_loopback().await.unwrap() make_tcp_loopback().await.unwrap()
} }
pub async fn test_nothing() { pub async fn test_nothing() {
info!("test_nothing"); info!("test_nothing");
let (mut a, mut c) = make_tcpstream_loopback().await; let (mut a, mut c) = make_stream_loopback().await;
let outbuf = MESSAGE.to_vec(); let outbuf = MESSAGE.to_vec();
a.write_all(&outbuf).await.unwrap(); a.write_all(&outbuf).await.unwrap();