From 1866c0635d0747f88b438cc1d4f284cb43bbcdd6 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 3 Feb 2021 16:45:43 +1100 Subject: [PATCH] Implement new behaviour for execution setup --- Cargo.lock | 10 +++ docs/sequence.puml | 10 +-- swap/Cargo.toml | 1 + swap/src/network/request_response.rs | 2 +- swap/src/protocol/alice.rs | 11 +++ swap/src/protocol/alice/event_loop.rs | 3 + swap/src/protocol/alice/execution_setup.rs | 91 +++++++++++++++++++++ swap/src/protocol/bob.rs | 13 ++- swap/src/protocol/bob/event_loop.rs | 3 + swap/src/protocol/bob/execution_setup.rs | 92 ++++++++++++++++++++++ 10 files changed, 229 insertions(+), 7 deletions(-) create mode 100644 swap/src/protocol/alice/execution_setup.rs create mode 100644 swap/src/protocol/bob/execution_setup.rs diff --git a/Cargo.lock b/Cargo.lock index 3853cca9..ed5439a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1637,6 +1637,15 @@ dependencies = [ "wasm-timer", ] +[[package]] +name = "libp2p-async-await" +version = "0.1.0" +source = "git+https://github.com/comit-network/rust-libp2p-async-await?rev=1429cd780204624b4d244e7d8179fe6ff77988c3#1429cd780204624b4d244e7d8179fe6ff77988c3" +dependencies = [ + "libp2p", + "log", +] + [[package]] name = "libp2p-core" version = "0.27.0" @@ -3415,6 +3424,7 @@ dependencies = [ "get-port", "hyper", "libp2p", + "libp2p-async-await", "log", "miniscript", "monero", diff --git a/docs/sequence.puml b/docs/sequence.puml index 11638d4b..3a59aeb4 100644 --- a/docs/sequence.puml +++ b/docs/sequence.puml @@ -20,22 +20,22 @@ end group Execution Setup group Phase A [Messages can be exchanged in any order] - Bob -> Alice: bob::Message0 + Bob -> Alice: Message0 note left: Pubkeys\ndleq proof s_b\nxmr viewkey v_b\nbtc refund addr - Alice -> Bob: alice::Message0 + Alice -> Bob: Message1 note right: Pubkeys\ndleq proof s_a\nxmr view key v_a\nbtc redeem addr\nbtc punish addr end group Phase B [Messages must be exchanged in the given order] - Bob -> Alice: Message1 + Bob -> Alice: Message2 note left: unsigned btc lock tx - Alice -> Bob: Message2 + Alice -> Bob: Message3 note right: btc cancel tx sig\nbtc refund tx enc sig S_b - Bob -> Alice: Message3 + Bob -> Alice: Message4 note left: btc punish tx sig\nbtc cancel tx sig end diff --git a/swap/Cargo.toml b/swap/Cargo.toml index 94eda471..85609dcc 100644 --- a/swap/Cargo.toml +++ b/swap/Cargo.toml @@ -25,6 +25,7 @@ ecdsa_fun = { git = "https://github.com/LLFourn/secp256kfun", rev = "cdfbc766045 ed25519-dalek = { version = "1.0.0-pre.4", features = ["serde"] }# Cannot be 1 because they depend on curve25519-dalek version 3 futures = { version = "0.3", default-features = false } libp2p = { version = "0.34", default-features = false, features = ["tcp-tokio", "yamux", "mplex", "dns", "noise", "request-response"] } +libp2p-async-await = { git = "https://github.com/comit-network/rust-libp2p-async-await", rev = "1429cd780204624b4d244e7d8179fe6ff77988c3" } log = { version = "0.4", features = ["serde"] } miniscript = { version = "4", features = ["serde"] } monero = { version = "0.9", features = ["serde_support"] } diff --git a/swap/src/network/request_response.rs b/swap/src/network/request_response.rs index f0a01664..f35ac3e0 100644 --- a/swap/src/network/request_response.rs +++ b/swap/src/network/request_response.rs @@ -12,7 +12,7 @@ use std::{fmt::Debug, io, marker::PhantomData}; pub const TIMEOUT: u64 = 3600; // One hour. /// Message receive buffer. -const BUF_SIZE: usize = 1024 * 1024; +pub const BUF_SIZE: usize = 1024 * 1024; // TODO: Think about whether there is a better way to do this, e.g., separate // Codec for each Message and a macro that implements them. diff --git a/swap/src/protocol/alice.rs b/swap/src/protocol/alice.rs index 6f5b02d4..9c9b21c4 100644 --- a/swap/src/protocol/alice.rs +++ b/swap/src/protocol/alice.rs @@ -34,6 +34,7 @@ use uuid::Uuid; mod encrypted_signature; pub mod event_loop; +mod execution_setup; mod message0; mod message1; mod message2; @@ -236,6 +237,7 @@ pub enum OutEvent { msg: Box, bob_peer_id: PeerId, }, + ExecutionSetupDone(Result>), TransferProofAcknowledged, EncryptedSignature(EncryptedSignature), } @@ -286,6 +288,14 @@ impl From for OutEvent { } } +impl From for OutEvent { + fn from(event: execution_setup::OutEvent) -> Self { + match event { + execution_setup::OutEvent::Done(res) => OutEvent::ExecutionSetupDone(res.map(Box::new)), + } + } +} + impl From for OutEvent { fn from(event: transfer_proof::OutEvent) -> Self { match event { @@ -312,6 +322,7 @@ pub struct Behaviour { message0: message0::Behaviour, message1: message1::Behaviour, message2: message2::Behaviour, + execution_setup: execution_setup::Behaviour, transfer_proof: transfer_proof::Behaviour, encrypted_signature: encrypted_signature::Behaviour, } diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 9e28934a..d2bd2f33 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -227,6 +227,9 @@ impl EventLoop { OutEvent::Message2 { msg, bob_peer_id : _} => { let _ = self.recv_message2.send(*msg).await; } + OutEvent::ExecutionSetupDone(_res) => { + todo!() + } OutEvent::TransferProofAcknowledged => { trace!("Bob acknowledged transfer proof"); let _ = self.recv_transfer_proof_ack.send(()).await; diff --git a/swap/src/protocol/alice/execution_setup.rs b/swap/src/protocol/alice/execution_setup.rs new file mode 100644 index 00000000..38233614 --- /dev/null +++ b/swap/src/protocol/alice/execution_setup.rs @@ -0,0 +1,91 @@ +use crate::{ + network::request_response::BUF_SIZE, + protocol::{ + alice::{State0, State3}, + bob, + }, +}; +use anyhow::{Context, Error, Result}; +use libp2p::PeerId; +use libp2p_async_await::BehaviourOutEvent; + +#[derive(Debug)] +pub enum OutEvent { + Done(Result), +} + +impl From> for OutEvent { + fn from(event: BehaviourOutEvent) -> Self { + match event { + BehaviourOutEvent::Inbound(_, Ok(State3)) => OutEvent::Done(Ok(State3)), + BehaviourOutEvent::Inbound(_, Err(e)) => OutEvent::Done(Err(e)), + BehaviourOutEvent::Outbound(..) => unreachable!("Alice only supports inbound"), + } + } +} + +#[derive(libp2p::NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", event_process = false)] +pub struct Behaviour { + inner: libp2p_async_await::Behaviour, +} + +impl Default for Behaviour { + fn default() -> Self { + Self { + inner: libp2p_async_await::Behaviour::new(b"/execution_setup/1.0.0"), + } + } +} + +impl Behaviour { + fn run(&mut self, bob: PeerId, state0: State0) { + self.inner + .do_protocol_listener(bob, move |mut substream| async move { + let alice_message0 = state0.next_message(); + + let state1 = { + let bob_message0 = serde_cbor::from_slice::( + &substream.read_message(BUF_SIZE).await?, + ) + .context("failed to deserialize message0")?; + state0.receive(bob_message0)? + }; + + substream + .write_message( + &serde_cbor::to_vec(&alice_message0) + .context("failed to serialize Message0")?, + ) + .await?; + + let state2 = { + let bob_message1 = serde_cbor::from_slice::( + &substream.read_message(BUF_SIZE).await?, + ) + .context("failed to deserialize message1")?; + state1.receive(bob_message1) + }; + + { + let alice_message2 = state2.next_message(); + substream + .write_message( + &serde_cbor::to_vec(&alice_message2) + .context("failed to serialize Message2")?, + ) + .await?; + } + + let state3 = { + let bob_message2 = serde_cbor::from_slice::( + &substream.read_message(BUF_SIZE).await?, + ) + .context("failed to deserialize message2")?; + state2.receive(bob_message2)? + }; + + Ok(state3) + }) + } +} diff --git a/swap/src/protocol/bob.rs b/swap/src/protocol/bob.rs index 810af8c2..6a7ec09f 100644 --- a/swap/src/protocol/bob.rs +++ b/swap/src/protocol/bob.rs @@ -32,6 +32,7 @@ use crate::{execution_params::ExecutionParams, protocol::alice::TransferProof}; mod encrypted_signature; pub mod event_loop; +mod execution_setup; mod message0; mod message1; mod message2; @@ -203,13 +204,14 @@ impl Builder { } } -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum OutEvent { ConnectionEstablished(PeerId), SwapResponse(alice::SwapResponse), Message0(Box), Message1(Box), Message2, + ExecutionSetupDone(Result>), TransferProof(Box), EncryptedSignatureAcknowledged, } @@ -254,6 +256,14 @@ impl From for OutEvent { } } +impl From for OutEvent { + fn from(event: execution_setup::OutEvent) -> Self { + match event { + execution_setup::OutEvent::Done(res) => OutEvent::ExecutionSetupDone(res.map(Box::new)), + } + } +} + impl From for OutEvent { fn from(event: transfer_proof::OutEvent) -> Self { match event { @@ -280,6 +290,7 @@ pub struct Behaviour { message0: message0::Behaviour, message1: message1::Behaviour, message2: message2::Behaviour, + execution_setup: execution_setup::Behaviour, transfer_proof: transfer_proof::Behaviour, encrypted_signature: encrypted_signature::Behaviour, } diff --git a/swap/src/protocol/bob/event_loop.rs b/swap/src/protocol/bob/event_loop.rs index 86f8bac6..fcb1da17 100644 --- a/swap/src/protocol/bob/event_loop.rs +++ b/swap/src/protocol/bob/event_loop.rs @@ -225,6 +225,9 @@ impl EventLoop { let _ = self.recv_message1.send(*msg).await; } OutEvent::Message2 => info!("Alice acknowledged message 2 received"), + OutEvent::ExecutionSetupDone(_res) => { + todo!() + } OutEvent::TransferProof(msg) => { let _ = self.recv_transfer_proof.send(*msg).await; } diff --git a/swap/src/protocol/bob/execution_setup.rs b/swap/src/protocol/bob/execution_setup.rs new file mode 100644 index 00000000..ed2de167 --- /dev/null +++ b/swap/src/protocol/bob/execution_setup.rs @@ -0,0 +1,92 @@ +use crate::{ + network::request_response::BUF_SIZE, + protocol::{ + alice, + bob::{State0, State2}, + }, +}; +use anyhow::{Context, Error, Result}; +use libp2p::PeerId; +use libp2p_async_await::BehaviourOutEvent; +use std::sync::Arc; + +#[derive(Debug)] +pub enum OutEvent { + Done(Result), +} + +impl From> for OutEvent { + fn from(event: BehaviourOutEvent<(), State2, Error>) -> Self { + match event { + BehaviourOutEvent::Outbound(_, Ok(State2)) => OutEvent::Done(Ok(State2)), + BehaviourOutEvent::Outbound(_, Err(e)) => OutEvent::Done(Err(e)), + BehaviourOutEvent::Inbound(..) => unreachable!("Bob only supports outbound"), + } + } +} + +#[derive(libp2p::NetworkBehaviour)] +#[behaviour(out_event = "OutEvent", event_process = false)] +pub struct Behaviour { + inner: libp2p_async_await::Behaviour<(), State2, anyhow::Error>, +} + +impl Default for Behaviour { + fn default() -> Self { + Self { + inner: libp2p_async_await::Behaviour::new(b"/execution_setup/1.0.0"), + } + } +} + +impl Behaviour { + fn run(&mut self, alice: PeerId, state0: State0, bitcoin_wallet: Arc) { + self.inner + .do_protocol_dialer(alice, move |mut substream| async move { + let bob_message0 = state0.next_message(); + + substream + .write_message( + &serde_cbor::to_vec(&bob_message0) + .context("failed to serialize message0")?, + ) + .await?; + + let alice_message0 = serde_cbor::from_slice::( + &substream.read_message(BUF_SIZE).await?, + ) + .context("failed to deserialize message0")?; + + let state1 = state0 + .receive(bitcoin_wallet.as_ref(), alice_message0) + .await?; + { + let bob_message1 = state1.next_message(); + substream + .write_message( + &serde_cbor::to_vec(&bob_message1) + .context("failed to serialize Message1")?, + ) + .await?; + } + + let alice_message1 = serde_cbor::from_slice::( + &substream.read_message(BUF_SIZE).await?, + ) + .context("failed to deserialize message1")?; + let state2 = state1.receive(alice_message1)?; + + { + let bob_message2 = state2.next_message(); + substream + .write_message( + &serde_cbor::to_vec(&bob_message2) + .context("failed to serialize Message2")?, + ) + .await?; + } + + Ok(state2) + }) + } +}