422: Websocket support r=da-kami a=da-kami

This allows us to request quotes via js-libp2p.

PoC used for testing: https://github.com/da-kami/js-libp2p-quote-poc

Thanks for all the input @thomaseizinger 😉

Co-authored-by: Daniel Karzel <daniel@comit.network>
This commit is contained in:
bors[bot] 2021-04-16 06:24:47 +00:00 committed by GitHub
commit 6c0dccba30
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 214 additions and 16 deletions

View File

@ -12,6 +12,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Allow multiple concurrent swaps with the same peer on the ASB. - Allow multiple concurrent swaps with the same peer on the ASB.
This is a breaking change because the swap ID is now agreed upon between CLI and ASB during swap setup. This is a breaking change because the swap ID is now agreed upon between CLI and ASB during swap setup.
Resuming swaps started prior to this change can result in unexpected behaviour. Resuming swaps started prior to this change can result in unexpected behaviour.
- Quote protocol returns JSON encoded data instead of CBOR.
This is a breaking change in the protocol handling, old CLI versions will not be able to process quote requests of ASBs running this version.
### Added
- Websocket support for the ASB.
The ASB is now capable to listen on both TCP and Websocket connections.
Default websocket listening port is 9940.
## [0.4.0] - 2021-04-06 ## [0.4.0] - 2021-04-06

76
Cargo.lock generated
View File

@ -1059,6 +1059,7 @@ dependencies = [
"cfg-if 0.1.10", "cfg-if 0.1.10",
"crc32fast", "crc32fast",
"libc", "libc",
"libz-sys",
"miniz_oxide", "miniz_oxide",
] ]
@ -1155,6 +1156,17 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "futures-rustls"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a1387e07917c711fb4ee4f48ea0adb04a3c9739e53ef85bf43ae1edc2937a8b"
dependencies = [
"futures-io",
"rustls 0.19.0",
"webpki",
]
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.14" version = "0.3.14"
@ -1705,6 +1717,7 @@ dependencies = [
"libp2p-swarm", "libp2p-swarm",
"libp2p-swarm-derive", "libp2p-swarm-derive",
"libp2p-tcp", "libp2p-tcp",
"libp2p-websocket",
"libp2p-yamux", "libp2p-yamux",
"parity-multiaddr", "parity-multiaddr",
"parking_lot 0.11.1", "parking_lot 0.11.1",
@ -1872,6 +1885,24 @@ dependencies = [
"tokio 1.5.0", "tokio 1.5.0",
] ]
[[package]]
name = "libp2p-websocket"
version = "0.29.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cace60995ef6f637e4752cccbb2590f6bc358e8741a0d066307636c69a4b3a74"
dependencies = [
"either",
"futures",
"futures-rustls",
"libp2p-core",
"log 0.4.14",
"quicksink",
"rw-stream-sink",
"soketto",
"url 2.2.1",
"webpki-roots 0.21.0",
]
[[package]] [[package]]
name = "libp2p-yamux" name = "libp2p-yamux"
version = "0.31.0" version = "0.31.0"
@ -1901,6 +1932,18 @@ dependencies = [
"typenum", "typenum",
] ]
[[package]]
name = "libz-sys"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "602113192b08db8f38796c4e85c39e960c145965140e918018bcde1952429655"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]] [[package]]
name = "linked-hash-map" name = "linked-hash-map"
version = "0.5.4" version = "0.5.4"
@ -2643,6 +2686,17 @@ version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quicksink"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77de3c815e5a160b1539c6592796801df2043ae35e123b46d73380cfa57af858"
dependencies = [
"futures-core",
"futures-sink",
"pin-project-lite 0.1.12",
]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.9" version = "1.0.9"
@ -3415,6 +3469,22 @@ dependencies = [
"ws2_32-sys", "ws2_32-sys",
] ]
[[package]]
name = "soketto"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5c71ed3d54db0a699f4948e1bb3e45b450fa31fe602621dee6680361d569c88"
dependencies = [
"base64 0.12.3",
"bytes 0.5.6",
"flate2",
"futures",
"httparse",
"log 0.4.14",
"rand 0.7.3",
"sha-1",
]
[[package]] [[package]]
name = "spectral" name = "spectral"
version = "0.6.0" version = "0.6.0"
@ -4265,6 +4335,12 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "vcpkg"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b00bca6106a5e23f3eee943593759b7fcddb00554332e856d990c893966879fb"
[[package]] [[package]]
name = "vec_map" name = "vec_map"
version = "0.8.2" version = "0.8.2"

View File

@ -27,7 +27,7 @@ directories-next = "2"
ecdsa_fun = { git = "https://github.com/LLFourn/secp256kfun", features = ["libsecp_compat", "serde"] } ecdsa_fun = { git = "https://github.com/LLFourn/secp256kfun", features = ["libsecp_compat", "serde"] }
futures = { version = "0.3", default-features = false } futures = { version = "0.3", default-features = false }
itertools = "0.10" itertools = "0.10"
libp2p = { version = "0.36", default-features = false, features = ["tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "request-response"] } libp2p = { version = "0.36", default-features = false, features = ["tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "request-response", "websocket"] }
libp2p-async-await = { git = "https://github.com/comit-network/rust-libp2p-async-await" } libp2p-async-await = { git = "https://github.com/comit-network/rust-libp2p-async-await" }
miniscript = { version = "5", features = ["serde"] } miniscript = { version = "5", features = ["serde"] }
monero = { version = "0.11", features = ["serde_support"] } monero = { version = "0.11", features = ["serde_support"] }

View File

@ -11,7 +11,8 @@ use std::path::{Path, PathBuf};
use tracing::info; use tracing::info;
use url::Url; use url::Url;
const DEFAULT_LISTEN_ADDRESS: &str = "/ip4/0.0.0.0/tcp/9939"; const DEFAULT_LISTEN_ADDRESS_TCP: &str = "/ip4/0.0.0.0/tcp/9939";
const DEFAULT_LISTEN_ADDRESS_WS: &str = "/ip4/0.0.0.0/tcp/9940/ws";
const DEFAULT_ELECTRUM_RPC_URL: &str = "ssl://electrum.blockstream.info:60002"; const DEFAULT_ELECTRUM_RPC_URL: &str = "ssl://electrum.blockstream.info:60002";
const DEFAULT_MONERO_WALLET_RPC_TESTNET_URL: &str = "http://127.0.0.1:38083/json_rpc"; const DEFAULT_MONERO_WALLET_RPC_TESTNET_URL: &str = "http://127.0.0.1:38083/json_rpc";
@ -45,7 +46,7 @@ pub struct Data {
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(deny_unknown_fields)] #[serde(deny_unknown_fields)]
pub struct Network { pub struct Network {
pub listen: Multiaddr, pub listen: Vec<Multiaddr>,
} }
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
@ -112,11 +113,14 @@ pub fn query_user_for_initial_testnet_config() -> Result<Config> {
.interact_text()?; .interact_text()?;
let data_dir = data_dir.as_str().parse()?; let data_dir = data_dir.as_str().parse()?;
let listen_address = Input::with_theme(&ColorfulTheme::default()) let listen_addresses = Input::with_theme(&ColorfulTheme::default())
.with_prompt("Enter multiaddress on which asb should list for peer-to-peer communications or hit return to use default") .with_prompt("Enter multiaddresses (comma separated) on which asb should list for peer-to-peer communications or hit return to use default")
.default(DEFAULT_LISTEN_ADDRESS.to_owned()) .default( format!("{},{}", DEFAULT_LISTEN_ADDRESS_TCP, DEFAULT_LISTEN_ADDRESS_WS))
.interact_text()?; .interact_text()?;
let listen_address = listen_address.as_str().parse()?; let listen_addresses = listen_addresses
.split(',')
.map(|str| str.parse())
.collect::<Result<Vec<Multiaddr>, _>>()?;
let electrum_rpc_url: String = Input::with_theme(&ColorfulTheme::default()) let electrum_rpc_url: String = Input::with_theme(&ColorfulTheme::default())
.with_prompt("Enter Electrum RPC URL or hit return to use default") .with_prompt("Enter Electrum RPC URL or hit return to use default")
@ -134,7 +138,7 @@ pub fn query_user_for_initial_testnet_config() -> Result<Config> {
Ok(Config { Ok(Config {
data: Data { dir: data_dir }, data: Data { dir: data_dir },
network: Network { network: Network {
listen: listen_address, listen: listen_addresses,
}, },
bitcoin: Bitcoin { electrum_rpc_url }, bitcoin: Bitcoin { electrum_rpc_url },
monero: Monero { monero: Monero {
@ -162,7 +166,10 @@ mod tests {
electrum_rpc_url: Url::from_str(DEFAULT_ELECTRUM_RPC_URL).unwrap(), electrum_rpc_url: Url::from_str(DEFAULT_ELECTRUM_RPC_URL).unwrap(),
}, },
network: Network { network: Network {
listen: DEFAULT_LISTEN_ADDRESS.parse().unwrap(), listen: vec![
DEFAULT_LISTEN_ADDRESS_TCP.parse().unwrap(),
DEFAULT_LISTEN_ADDRESS_WS.parse().unwrap(),
],
}, },
monero: Monero { monero: Monero {

View File

@ -98,8 +98,11 @@ async fn main() -> Result<()> {
let kraken_price_updates = kraken::connect()?; let kraken_price_updates = kraken::connect()?;
let mut swarm = swarm::alice(&seed)?; let mut swarm = swarm::alice(&seed)?;
Swarm::listen_on(&mut swarm, config.network.listen)
.context("Failed to listen network interface")?; for listen in config.network.listen {
Swarm::listen_on(&mut swarm, listen.clone())
.with_context(|| format!("Failed to listen on network interface {}", listen))?;
}
let (event_loop, mut swap_receiver) = EventLoop::new( let (event_loop, mut swap_receiver) = EventLoop::new(
swarm, swarm,

View File

@ -2,6 +2,7 @@ mod impl_from_rr_event;
pub mod cbor_request_response; pub mod cbor_request_response;
pub mod encrypted_signature; pub mod encrypted_signature;
pub mod json_pull_codec;
pub mod quote; pub mod quote;
pub mod redial; pub mod redial;
pub mod spot_price; pub mod spot_price;

View File

@ -0,0 +1,99 @@
use async_trait::async_trait;
use futures::prelude::*;
use libp2p::core::upgrade;
use libp2p::request_response::{ProtocolName, RequestResponseCodec};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::Debug;
use std::io;
use std::marker::PhantomData;
/// Message receive buffer.
pub const BUF_SIZE: usize = 1024 * 1024;
/// A [`RequestResponseCodec`] for pull-based protocols where the response is
/// encoded using JSON.
///
/// A pull-based protocol is a protocol where the dialer doesn't send any
/// message and expects the listener to directly send the response as the
/// substream is opened.
#[derive(Clone, Copy, Debug)]
pub struct JsonPullCodec<P, Res> {
phantom: PhantomData<(P, Res)>,
}
impl<P, Res> Default for JsonPullCodec<P, Res> {
fn default() -> Self {
Self {
phantom: PhantomData::default(),
}
}
}
#[async_trait]
impl<P, Res> RequestResponseCodec for JsonPullCodec<P, Res>
where
P: ProtocolName + Send + Sync + Clone,
Res: DeserializeOwned + Serialize + Send,
{
type Protocol = P;
type Request = ();
type Response = Res;
async fn read_request<T>(&mut self, _: &Self::Protocol, _: &mut T) -> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send,
{
Ok(())
}
async fn read_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
) -> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send,
{
let message = upgrade::read_one(io, BUF_SIZE)
.await
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let mut de = serde_json::Deserializer::from_slice(&message);
let msg = Res::deserialize(&mut de).map_err(|e| {
tracing::debug!("serde read_response error: {:?}", e);
io::Error::new(io::ErrorKind::InvalidData, e)
})?;
Ok(msg)
}
async fn write_request<T>(
&mut self,
_: &Self::Protocol,
_: &mut T,
_: Self::Request,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
Ok(())
}
async fn write_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
res: Self::Response,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let bytes = serde_json::to_vec(&res).map_err(|e| {
tracing::debug!("serde write_response error: {:?}", e);
io::Error::new(io::ErrorKind::InvalidData, e)
})?;
upgrade::write_one(io, &bytes).await?;
Ok(())
}
}

View File

@ -1,5 +1,5 @@
use crate::bitcoin; use crate::bitcoin;
use crate::network::cbor_request_response::CborCodec; use crate::network::json_pull_codec::JsonPullCodec;
use crate::protocol::{alice, bob}; use crate::protocol::{alice, bob};
use libp2p::core::ProtocolName; use libp2p::core::ProtocolName;
use libp2p::request_response::{ use libp2p::request_response::{
@ -13,7 +13,7 @@ const PROTOCOL: &str = "/comit/xmr/btc/bid-quote/1.0.0";
type OutEvent = RequestResponseEvent<(), BidQuote>; type OutEvent = RequestResponseEvent<(), BidQuote>;
type Message = RequestResponseMessage<(), BidQuote>; type Message = RequestResponseMessage<(), BidQuote>;
pub type Behaviour = RequestResponse<CborCodec<BidQuoteProtocol, (), BidQuote>>; pub type Behaviour = RequestResponse<JsonPullCodec<BidQuoteProtocol, BidQuote>>;
#[derive(Debug, Clone, Copy, Default)] #[derive(Debug, Clone, Copy, Default)]
pub struct BidQuoteProtocol; pub struct BidQuoteProtocol;
@ -40,7 +40,7 @@ pub struct BidQuote {
/// Alice only supports inbound connections, i.e. handing out quotes. /// Alice only supports inbound connections, i.e. handing out quotes.
pub fn alice() -> Behaviour { pub fn alice() -> Behaviour {
Behaviour::new( Behaviour::new(
CborCodec::default(), JsonPullCodec::default(),
vec![(BidQuoteProtocol, ProtocolSupport::Inbound)], vec![(BidQuoteProtocol, ProtocolSupport::Inbound)],
RequestResponseConfig::default(), RequestResponseConfig::default(),
) )
@ -51,7 +51,7 @@ pub fn alice() -> Behaviour {
/// Bob only supports outbound connections, i.e. requesting quotes. /// Bob only supports outbound connections, i.e. requesting quotes.
pub fn bob() -> Behaviour { pub fn bob() -> Behaviour {
Behaviour::new( Behaviour::new(
CborCodec::default(), JsonPullCodec::default(),
vec![(BidQuoteProtocol, ProtocolSupport::Outbound)], vec![(BidQuoteProtocol, ProtocolSupport::Outbound)],
RequestResponseConfig::default(), RequestResponseConfig::default(),
) )

View File

@ -6,11 +6,13 @@ use libp2p::core::{identity, Transport};
use libp2p::dns::TokioDnsConfig; use libp2p::dns::TokioDnsConfig;
use libp2p::mplex::MplexConfig; use libp2p::mplex::MplexConfig;
use libp2p::noise::{self, NoiseConfig, X25519Spec}; use libp2p::noise::{self, NoiseConfig, X25519Spec};
use libp2p::websocket::WsConfig;
use libp2p::{yamux, PeerId}; use libp2p::{yamux, PeerId};
use std::time::Duration; use std::time::Duration;
/// Builds a libp2p transport with the following features: /// Builds a libp2p transport with the following features:
/// - TcpConnection /// - TcpConnection
/// - WebSocketConnection
/// - DNS name resolution /// - DNS name resolution
/// - authentication via noise /// - authentication via noise
/// - multiplexing via yamux or mplex /// - multiplexing via yamux or mplex
@ -22,8 +24,10 @@ pub fn build(id_keys: &identity::Keypair) -> Result<SwapTransport> {
let tcp = TokioTcpConfig::new().nodelay(true); let tcp = TokioTcpConfig::new().nodelay(true);
let dns = TokioDnsConfig::system(tcp)?; let dns = TokioDnsConfig::system(tcp)?;
let websocket = WsConfig::new(dns.clone());
let transport = dns let transport = websocket
.or_transport(dns)
.upgrade(Version::V1) .upgrade(Version::V1)
.authenticate(noise) .authenticate(noise)
.multiplex(SelectUpgrade::new( .multiplex(SelectUpgrade::new(